In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Bucketizer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.appName('final_grade').getOrCreate()

In [3]:
#load the datasets
intermediateGrade = spark.read.csv('intermediate_grades.csv',inferSchema=True,header=True)
finalGrade1 = spark.read.csv('Exam (First time)-Table 1.csv',inferSchema=True,header=True)
finalGrade2 = spark.read.csv('Exam (Second time)-Table 1.csv',inferSchema=True,header=True)

In [4]:
intermediateGrade.show(5)

+----------+---------+---------+---------+---------+---------+
|Student Id|Session 2|Session 3|Session 4|Session 5|Session 6|
+----------+---------+---------+---------+---------+---------+
|         1|      5.0|      0.0|      4.5|      4.0|     2.25|
|         2|      4.0|      3.5|      4.5|      4.0|      1.0|
|         3|      3.5|      3.5|      4.5|      4.0|      0.0|
|         4|      6.0|      4.0|      5.0|      3.5|     2.75|
|         5|      5.0|      4.0|      5.0|      4.0|     2.75|
+----------+---------+---------+---------+---------+---------+
only showing top 5 rows



In [5]:
finalGrade1.columns

['Student ID',
 'ES 1.1 (2 points)',
 'ES 1.2 (3 points)',
 'ES 2.1 (2 points)',
 'ES 2.2 (3 points)',
 'ES 3.1,(1 points)',
 'ES 3.2 (2 points)',
 'ES 3.3 (2 points)',
 'ES 3.4,(2 points)',
 'ES 3.5, (3 points)',
 'ES 4.1 (15 points)',
 'ES 4.2 (10 points)',
 'ES 5.1, (2 points)',
 'ES 5.2 (10 points)',
 'ES 5.3 (3 points)',
 'ES 6.1 (25 points)',
 'ES 6.2 (15 points)',
 'TOTAL (100 points)']

In [6]:
#show the total rows in datasets
#well, too little to have a good prediction
print(intermediateGrade.count())
print(finalGrade1.count())
print(finalGrade2.count())

115
52
62


In [7]:
#join the 2 final 
tempDF = finalGrade1.join(finalGrade2, ['Student ID','TOTAL (100 points)'],'fullouter').select('Student ID','TOTAL (100 points)')
print(tempDF.count())
#tempDF.printSchema()
tempDF = tempDF.dropDuplicates(['Student ID']).orderBy('Student ID')
tempDF.show()

114
+----------+------------------+
|Student ID|TOTAL (100 points)|
+----------+------------------+
|         1|              94.5|
|         2|              44.0|
|         3|              85.0|
|         4|              30.0|
|         5|              38.5|
|         6|              82.0|
|         7|              78.0|
|         8|               8.5|
|         9|              18.5|
|        10|              59.0|
|        11|              60.0|
|        12|              40.5|
|        13|              90.0|
|        14|              64.0|
|        15|              67.5|
|        16|              67.0|
|        17|              97.0|
|        18|              62.0|
|        19|              50.0|
|        20|              97.5|
+----------+------------------+
only showing top 20 rows



In [8]:
newDF = intermediateGrade.alias('a').join(tempDF.alias('b'), col('a.Student ID') == col('b.Student ID'))
newDF = newDF.select('a.Student Id','a.Session 2','a.Session 3','a.Session 4','a.Session 5','a.Session 6','b.TOTAL (100 points)')
print(newDF.count())
newDF.printSchema()

93
root
 |-- Student Id: integer (nullable = true)
 |-- Session 2: double (nullable = true)
 |-- Session 3: double (nullable = true)
 |-- Session 4: double (nullable = true)
 |-- Session 5: double (nullable = true)
 |-- Session 6: double (nullable = true)
 |-- TOTAL (100 points): double (nullable = true)



In [9]:
assembler = VectorAssembler(
    inputCols=['Session 2',
 'Session 3',
 'Session 4',
 'Session 5',
 'Session 6',
  ],
    outputCol="features")

In [10]:
#bucket the total points into Fail, intermediate, & Excellent
splits = [0,40,80,100]
bucketizer = Bucketizer(splits=splits, inputCol="TOTAL (100 points)", outputCol="grade")

#uncomment below to view before pipe
#bucketedData = bucketizer.transform(newDF)
#print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
#bucketedData.select(['Student Id','grade']).show()

In [11]:
logReg = LogisticRegression(featuresCol='features',labelCol='grade')

In [12]:
pipeline = Pipeline(stages=[bucketizer,assembler,logReg])

In [21]:
#dataset too small, so split 90/10
trainData,testData = newDF.randomSplit([0.8,0.2])

In [22]:
trainData.columns

['Student Id',
 'Session 2',
 'Session 3',
 'Session 4',
 'Session 5',
 'Session 6',
 'TOTAL (100 points)']

In [23]:
fitModel = pipeline.fit(trainData)

In [24]:
results = fitModel.transform(testData)

In [25]:
myEval = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='grade')

In [26]:
results.select('grade','prediction').show()

+-----+----------+
|grade|prediction|
+-----+----------+
|  1.0|       1.0|
|  1.0|       0.0|
|  2.0|       2.0|
|  2.0|       2.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  0.0|       1.0|
|  0.0|       1.0|
|  1.0|       0.0|
|  0.0|       0.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  2.0|       1.0|
|  0.0|       0.0|
|  1.0|       1.0|
|  2.0|       1.0|
|  0.0|       1.0|
|  1.0|       1.0|
+-----+----------+



In [27]:
AUC = myEval.evaluate(results)

In [28]:
#from my test, got 0.852 . good enough I guess
# the result may vary based on random split
AUC

0.7337662337662338