In [37]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.ml.linalg import  Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row
from pyspark.ml.feature import StringIndexer
from pyspark.ml.clustering import KMeans
from pyspark.ml.recommendation import ALS

In [38]:
class DataFrameFilter(object):
    
    @staticmethod
    def Filter(df):
        output = df.spark.sql('select sid,cid,GPA,Grade from '+df.name+' where GPA > 0')
        n_courses_df = output \
        .groupby('sid') \
        .count()
        
        output = output \
        .join(n_courses_df, on='sid', how='inner') \
        .filter('count > 6') \
        .drop('count')
        
        n_students_df = output \
        .groupby('cid') \
        .count() \
        .withColumnRenamed('cid', 'cid')
        output= output \
        .withColumnRenamed('cid', 'cid') \
        .join(n_students_df, on='cid', how='inner') \
        .filter('count > 20') \
        .drop('count')

        output = output \
        .groupby('sid', 'cid','GPA') \
        .avg('Grade') \
        .withColumnRenamed('avg(Grade)', 'Grade')
        output = DataFrame(dataframe=output,name='output',spark=df.spark)
        output.grade_From_Double_To_Int()
        
        return output.dataframe

In [39]:
###################Class Insights######################################
class Insights(object):
    
    def __init__(self,name,spark):
        self.name=name
        self.spark=spark
        
    def showInsight(self,sqlstatement,no_records=10):
        if sqlstatement!='':
            self.spark.sql(sqlstatement).limit(no_records).show()

    def showTopStudents_Fac(self,faculty='',no_records=10):###########This function is to be implemented after clustering##
        print("Under Construction")
        
    def showTopStudents(self,no_records=10):###########This function is to be implemented after clustering####
        self.spark.sql("select * from "+self.name+" order by GPA desc").limit(no_records).show()
        
 #   def showTopTakenCourses(self,faculty='',no_records=10):###########This function is to be implemented after clustering
 #       print("Under Construction")
    
 #   def showSemesterInsight(self,semester=0):###################
 #       print("UnderConstruction")
        
#    def Visualize(self,dataframe):#####################
#        print("UnderConstruction")

In [40]:
########################class DataFrame####################################
class DataFrame(object):
    
    valid_grades = ['A','A-','B+','B','B-','C+','C','C-','D+','D','D-','F']#,'F1','F2','F3','W']
    valid_grades_int = list(range(0,len(valid_grades)))
    grades_dict = dict(zip(valid_grades, valid_grades_int))
    
    def __init__(self,dataframe=None,name='',spark=None):
        if dataframe!=None and name!='' and spark!=None:
            self.dataframe=dataframe
            self.name=name
            self.spark=spark
            #self.insights=Insights(self.name,self.spark)
            self.updateView()
        else:
            self.dataframe=None
            self.name=None
            self.spark=None
            #self.insights=None
            self.clusterer=None
        
    def openFile(self,path='C:',Type='csv',name='',dropna=False,spark=None):
        if self.spark == None and spark!=None:
            self.spark = spark
            filetypes = {
            'csv':self.spark.read.csv(path,header=True,inferSchema=True),
            'json':self.spark.read.json(path)}
            self.dataframe=filetypes[Type]
            self.name=name
            if dropna:
                self.dropNulls()
            self.updateView()
            #self.insights=Insights(self.name,self.spark)
            self.normalize_grades()
           # self.clusterer=Clusterer(self)
            self.updateView()
   # def cluster(self):
   #     self.clusterer.Fac_cluster()      
    def dropNulls(self):
        self.dataframe=self.dataframe.na.drop(how="all")
        self.updateView()
        
    def normalize_grades(self):#### Selects Valid Grades### and sets the data frame to it
        orstring="Grade = "
        for i in range(0,len(self.valid_grades)):
            nstring=" or "+"Grade = "
            orstring +="'"+self.valid_grades[i]+"'"
            if i+1!=len(self.valid_grades):
                orstring+=nstring
        self.dataframe=self.spark.sql("select * from "+self.name+" where "+orstring)
        self.updateView()
        
    def grade_From_String_to_int(self):###### gives each grade a score############
        udfstring_to_int=f.udf(DataFrame.string_to_int,IntegerType())
        self.dataframe = self.dataframe.withColumn("Grade",udfstring_to_int("Grade"))
        self.updateView()
        
    @staticmethod
    def string_to_int(x):#######################Not to be played with######################################
        if DataFrame.valid_grades_int[0]==0:
            DataFrame.valid_grades_int.reverse()
        DataFrame.grades_dict = dict(zip(DataFrame.valid_grades,DataFrame.valid_grades_int))
        return DataFrame.grades_dict[x]
    
    def grade_From_Double_To_Int(self):########################## use after vector Assembler
        udfdouble_to_int=f.udf(DataFrame.double_to_int,IntegerType())
        self.dataframe = self.dataframe.withColumn("Grade",udfdouble_to_int("Grade"))
        self.updateView()
        
    @staticmethod
    def double_to_int(x):#calls double to _int best use after vector assembler
        if DataFrame.valid_grades_int[0]==0:
            DataFrame.valid_grades_int.reverse()
        DataFrame.grades_dict = dict(zip(DataFrame.valid_grades, DataFrame.valid_grades_int))
        return int(round(x))
    
    def grades_From_int_to_String(self):##################### grades_from_int_to_string###################
        udfint_to_str = f.udf(DataFrame.int_to_str, StringType())
        self.dataframe=self.dataframe.withColumn("Grade",udfint_to_str("Grade"))
        #self.renameColumn("predicted Grade","Grade")
        self.dataframe=self.dataframe.select('sid','cid','GPA','Grade')
        self.updateView()
        
    @staticmethod
    def int_to_str(x):###################################not to be played with#########################3
        if DataFrame.valid_grades_int[0]==0:
            DataFrame.valid_grades_int.reverse()
        DataFrame.grades_dict = dict(zip(DataFrame.valid_grades_int,DataFrame.valid_grades))
        return DataFrame.grades_dict[x]
        
    def renameColumn(self,cname,newName): ### Column Name(Key) and its equivelent(Value):
            self.dataframe=self.dataframe.withColumnRenamed(cname,newName)
            self.updateView()
            
    def updateView(self):
        self.dataframe.createOrReplaceTempView(self.name)
        
    def createView(self,name,df):
        df.createOrReplaceTempView(name)
        
    def show(self,limit=10):
        self.dataframe.limit(limit).show()

In [41]:
class GradePredict(object):
        
    def Predict(self,courses=list(),sid=None,GPA=None,spark=None,prediction='ALS'):
        
        if len(courses)>0 and sid!=None and GPA!=None and GPA!=0 and spark!=None:
            records=list()
            for course in courses:
                records.append((int(sid),int(course),float(GPA)))
            rdd = spark.sparkContext.parallelize(records)
            records = rdd.map(lambda x: Row(sid=int(x[0]), cid = int(x[1]),GPA=float(x[2])))
            records = spark.createDataFrame(records)
            #records.show()
            if prediction =='RF':
                predc = self.RandomForestPredict(records,spark)
            elif prediction =='ALS':
                predc = self.ALSPredict(records,spark)
        if predc!=None:
            return predc
        else:
            return None
    def ALSPredict(self,df,spark):
        """ must get the Grade columns in integer Representation
        this function takes a data frame that contains the sid,cid and predicts the Grade""" 
        predc = PredictionModels.ALSmodel.transform(df)
        predc = predc.withColumnRenamed('prediction','Grade')
        predc = DataFrame(dataframe=predc,name='predictions',spark=spark)#####Reduce time by not creating this obj
        predc.grade_From_Double_To_Int()
        predc.grades_From_int_to_String()
        predc.renameColumn('Grade','Predicted Grade')
        #predc.show()
        predc = predc.dataframe
        count = predc.count()
        predc = predc.collect()
        predcRows = list()
        for i in range(0,count):
            predcRows.append(GradePredict.Row_Tuple(predc[i]))
        return predcRows
    def RandomForestPredict(self,df,spark):
        
        """ must get the Grade columns in integer Representation
        this function takes a data frame that contains the sid,cid,GPA and predicts the Grade"""
        ##########################Efred en Dah Course Gded(problem to be thought of) ############################
        assembler = VectorAssembler(inputCols=['cid','GPA'],outputCol='features')
        output = assembler.transform(df)
        predc = PredictionModels.RFmodel.transform(output)
        predc = predc.withColumnRenamed('prediction','Grade')
        predc = DataFrame(dataframe=predc,name='predictions',spark=spark)#####Reduce time by not creating this obj
        predc.grades_From_int_to_String()
        predc.renameColumn('Grade','Predicted Grade')
        #predc.show()
        predc=predc.dataframe
        count = predc.count()
        predc = predc.collect()
        predcRows=list()
        for i in range(0,count):
            predcRows.append(GradePredict.Row_Tuple(predc[i]))
        return predcRows
    @staticmethod
    def Row_Tuple(row):
        """ Takes a row from data frame and returns it's values as a tuple"""
        tupl=list()
        for item in row:
            tupl.append(item)
        return tuple(tupl)
    @staticmethod
    def Cluster(df):
        output=DataFrameFilter.Filter(df)
        ouput = output \
        .groupby('sid') \
        .agg(f.collect_set('cid').alias('courses')) \
        .withColumn('n_courses', f.size('courses')) \
        .filter('n_courses > 15') \
        .select('sid', f.explode('courses').alias('cid'))
        output = output \
        .withColumn('one', f.lit(1)) \
        .toPandas() \
        .pivot_table(index='cid', columns=['sid'], values='one', fill_value=0)
        output = df.spark.createDataFrame(output.reset_index())
        assembler = VectorAssembler(inputCols=output.drop('cid').columns, outputCol="features")
        clustering_df = assembler.transform(output).select('cid', 'features')
        clustered = PredictionModels.Kmodel.transform(clustering_df).select("cid","prediction")
        df.dataframe= df.dataframe.join(clustered,df.dataframe['cid'] == clustered['cid'], how='inner')
        df.show()
        return df
    

In [42]:
class PredictionModels(object):   
    @staticmethod
    def Train(df,modelName='ALS',faculties=9):
        ###########cluster##############################
        PredictionModels.TrainKmodel(df,faculties)
        GradePredict.Cluster(df)
        if modelName == 'RF' or modelName == 'ALL':
            PredictionModels.spark=df.spark
            output=DataFrameFilter.Filter(df)
            assembler=VectorAssembler(inputCols=['cid','GPA'],outputCol='features')
            output=assembler.transform(output)
            traind,testd = output.randomSplit([0.8,0.2])
            maxx=-99999999
            trees = 0
            for i in range(50,51):#############To Be Changed Before Deployment###################
                rfc=RandomForestClassifier(labelCol='Grade',featuresCol='features',numTrees=i)
                PredictionModels.RFmodel = rfc.fit(traind)
                PredictionModels.RFpreds = PredictionModels.RFmodel.transform(testd)
               # PredictionModels.RFpreds.show()
                PredictionModels.accuracy = PredictionModels.Accuracy(modelName)
                if PredictionModels.accuracy > maxx:
                    maxx = PredictionModels.accuracy
                    trees=i
            #print("Number of Trees that Increase Accuracy of classification is {0}".format(trees))
            #print("With Accuracy "+str(self.accuracy))
        if modelName =='ALS' or modelName == 'ALL':
            rank = 20  # number of latent factors
            #maxIter = 10
            #regParam=0.01  # prevent overfitting
            output = DataFrameFilter.Filter(df)
            traind,testd = output.randomSplit([0.8,0.2])
            als = ALS(userCol="sid", itemCol="cid", ratingCol="Grade", 
              rank=rank, 
              coldStartStrategy="drop",
              seed=12)
            PredictionModels.ALSmodel = als.fit(traind)
            PredictionModels.ALSpreds = PredictionModels.ALSmodel.transform(testd)
            #PredictionModels.ALSpreds.show()
            PredictionModels.Accuracy(PredictionModels.ALSpreds)
    @staticmethod
    def TrainKmodel(df,faculties):          
        output=DataFrameFilter.Filter(df)
        ouput = output \
        .groupby('sid') \
        .agg(f.collect_set('cid').alias('courses')) \
        .withColumn('n_courses', f.size('courses')) \
        .filter('n_courses > 15') \
        .select('sid', f.explode('courses').alias('cid'))
        output = output \
        .withColumn('one', f.lit(1)) \
        .toPandas() \
        .pivot_table(index='cid', columns=['sid'], values='one', fill_value=0)
        output = df.spark.createDataFrame(output.reset_index())
        assembler = VectorAssembler(inputCols=output.drop('cid').columns, outputCol="features")
        clustering_df = assembler.transform(output).select('cid', 'features')
        clustering_df.show()
        kmeans = KMeans(featuresCol='features').setK(faculties).setSeed(1)
        PredictionModels.Kmodel = kmeans.fit(clustering_df)
    @staticmethod
    def Accuracy(modelName='ALS'):
        if modelName =='RF' or modelName=='ALL':
            evaluator=MulticlassClassificationEvaluator(labelCol='Grade',predictionCol='prediction',metricName='accuracy')
            print(PredictionModels.RFmodel.featureImportances)
            return evaluator.evaluate(PredictionModels.RFpreds)
        if modelName=='ALS' or modelName=='ALL':
            evaluator=RegressionEvaluator(metricName='rmse',labelCol='Grade',predictionCol='prediction')
            rmse = evaluator.evaluate(PredictionModels.ALSpreds)
            print('rmse is '+str(rmse))
        


In [43]:
spark = SparkSession.builder.appName('MSA Recommender System').getOrCreate() 
df = DataFrame()
df.openFile(path="C:\\Users\\Mostafa\\Desktop\\data.csv",Type='csv',name='students',dropna=True,spark=spark)
#df.show()

In [44]:
df.renameColumn(df.dataframe.columns[0],'sid')
df.renameColumn(df.dataframe.columns[1],'cid')
df.grade_From_String_to_int()
#df.show(5)

In [45]:
PredictionModels.Train(df,modelName='ALL')

+---+--------------------+
|cid|            features|
+---+--------------------+
|  1|(933,[1,10,11,13,...|
|  6|(933,[2,4,5,7,8,1...|
| 14|(933,[2,11,15,18,...|
| 23|(933,[20,76,87,92...|
| 24|(933,[3,10,110,12...|
| 28|(933,[2,4,5,7,8,1...|
| 34|(933,[5,14,31,48,...|
| 35|(933,[1,10,11,13,...|
| 36|(933,[85,87,139,1...|
| 39|(933,[14,81,95,13...|
| 41|(933,[0,1,10,11,1...|
| 44|(933,[1,10,11,13,...|
| 54|(933,[1,6,9,10,11...|
| 55|(933,[6,9,17,19,2...|
| 64|(933,[6,9,17,19,2...|
| 67|(933,[55,59,62,63...|
| 71|(933,[5,6,9,17,18...|
| 74|(933,[1,5,6,9,10,...|
| 76|(933,[9,17,19,29,...|
| 78|(933,[1,6,9,10,11...|
+---+--------------------+
only showing top 20 rows

+----+---+--------+-----+----+---+----------+
| sid|cid|Semester|Grade| GPA|cid|prediction|
+----+---+--------+-----+----+---+----------+
|1195|  1|      56|   10|2.17|  1|         5|
|1092|  1|      49|    9| 2.2|  1|         5|
| 400|  1|      49|   10|2.47|  1|         5|
|1275|  1|      47|    8|2.35|  1|         5|
| 69

In [46]:
gp = GradePredict()
courses=[16,135,243]
sid=463
gpa=2.6
preds=gp.Predict(courses=courses,sid=sid,GPA=gpa,spark=spark)
preds2=list()
if len(preds)<len(courses):
    cmps=list()
    for pred in preds:
        sid,cid,gpa,grade=pred
        cmps.append(cid)
    for course in courses:
        if course not in cmps:
            preds2.append(course)
preds.extend(gp.Predict(courses=preds2,sid=sid,GPA=gpa,spark=spark,prediction='RF'))
print(str(preds))
####return preds

[(463, 243, 2.6, 'B'), (463, 135, 2.6, 'C'), (463, 16, 2.6, 'B')]


In [47]:
#PredictionModels.TrainKmodel(df,9)

In [48]:
#PredictionModels.Cluster(df)