In [None]:
!pip install pyspark



In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('tree').getOrCreate()
df = spark.read.csv('/content/drive/MyDrive/Datasets/Thesis/dr16.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- objid: double (nullable = true)
 |-- modelMag_u: double (nullable = true)
 |-- modelMag_g: double (nullable = true)
 |-- modelMag_r: double (nullable = true)
 |-- modelMag_i: double (nullable = true)
 |-- modelMag_z: double (nullable = true)
 |-- fiberMag_u: double (nullable = true)
 |-- fiberMag_g: double (nullable = true)
 |-- fiberMag_r: double (nullable = true)
 |-- fiberMag_i: double (nullable = true)
 |-- fiberMag_z: double (nullable = true)
 |-- petroR50_r: double (nullable = true)
 |-- petroR90_r: double (nullable = true)
 |-- petroR50_z: double (nullable = true)
 |-- petroR90_z: double (nullable = true)
 |-- r: double (nullable = true)
 |-- i: double (nullable = true)
 |-- z: double (nullable = true)
 |-- redshift: double (nullable = true)
 |-- zerr: double (nullable = true)
 |-- mmug: double (nullable = true)
 |-- mmgr: double (nullable = true)
 |-- mmri: double (nullable = true)
 |-- mmiz: double (nullable = true)
 |-- mfug: doub

In [None]:
import pyspark.sql.functions as func
df = df.withColumn("redshift", func.round(df["redshift"], 2).cast('integer'))

In [None]:
import pandas as pd
pd.DataFrame(df.take(5), columns = df.columns).transpose()

Unnamed: 0,0,1,2,3,4
_c0,0.0,1.0,2.0,3.0,4.0
objid,1.23768e+18,1.23768e+18,1.23768e+18,1.23768e+18,1.23768e+18
modelMag_u,21.63269,19.74829,23.74654,20.63075,21.38126
modelMag_g,21.27911,19.45819,23.19651,20.25426,21.13488
modelMag_r,21.2255,19.33118,22.01303,20.04478,21.09993
modelMag_i,20.9569,19.01371,20.80744,19.79378,20.92882
modelMag_z,20.82752,18.80369,19.9166,19.70326,20.80634
fiberMag_u,22.05036,20.1019,24.60136,20.99431,21.85225
fiberMag_g,21.62439,19.79539,23.64122,20.60174,21.4586
fiberMag_r,21.64663,19.6658,22.4804,20.40031,21.4534


In [None]:
df.columns

['_c0',
 'objid',
 'modelMag_u',
 'modelMag_g',
 'modelMag_r',
 'modelMag_i',
 'modelMag_z',
 'fiberMag_u',
 'fiberMag_g',
 'fiberMag_r',
 'fiberMag_i',
 'fiberMag_z',
 'petroR50_r',
 'petroR90_r',
 'petroR50_z',
 'petroR90_z',
 'r',
 'i',
 'z',
 'redshift',
 'zerr',
 'mmug',
 'mmgr',
 'mmri',
 'mmiz',
 'mfug',
 'mfgr',
 'mfri',
 'mfiz']

In [None]:
df.show(5)

+---+--------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------------------+----------+------------------+----------+--------+--------+--------+--------+-----------------+---------+------------------+---------+--------+------------------+---------+--------+------------------+
|_c0|               objid|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|fiberMag_u|fiberMag_g|fiberMag_r|fiberMag_i|fiberMag_z|        petroR50_r|petroR90_r|        petroR50_z|petroR90_z|       r|       i|       z|redshift|             zerr|     mmug|              mmgr|     mmri|    mmiz|              mfug|     mfgr|    mfri|              mfiz|
+---+--------------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------------------+----------+------------------+----------+--------+--------+--------+--------+-----------------+---------+------------------+---------

In [None]:
df = df.drop('objid', '_c0', 'zerr')
df.show(5)

+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------------------+----------+------------------+----------+--------+--------+--------+--------+---------+------------------+---------+--------+------------------+---------+--------+------------------+
|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|fiberMag_u|fiberMag_g|fiberMag_r|fiberMag_i|fiberMag_z|        petroR50_r|petroR90_r|        petroR50_z|petroR90_z|       r|       i|       z|redshift|     mmug|              mmgr|     mmri|    mmiz|              mfug|     mfgr|    mfri|              mfiz|
+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------------------+----------+------------------+----------+--------+--------+--------+--------+---------+------------------+---------+--------+------------------+---------+--------+------------------+
|  21.63269|  21.27911|   21.2255|   20.9569|  20.82752|  22.

In [None]:
#from pyspark.sql.functions import monotonically_increasing_id
#df = df.withColumn("objid", monotonically_increasing_id())
#df.show(5)

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['modelMag_u', 'modelMag_g', 'modelMag_r', 'modelMag_i', 'modelMag_z', 'fiberMag_u', 'fiberMag_g', 'fiberMag_r', 'fiberMag_i', 'fiberMag_z', 'petroR50_r', 'petroR90_r', 'petroR50_z', 'petroR90_z', 'r', 'i', 'z', 'mmug', 'mmgr', 'mmri', 'mmiz', 'mfug', 'mfgr', 'mfri', 'mfiz'], outputCol = 'features')

In [None]:
output = assembler.transform(df)

In [None]:
final_df = output.select('features', 'redshift')
final_df.show(3)

+--------------------+--------+
|            features|redshift|
+--------------------+--------+
|[21.63269,21.2791...|       1|
|[19.74829,19.4581...|       2|
|[23.74654,23.1965...|       0|
+--------------------+--------+
only showing top 3 rows



In [None]:
train, test = final_df.randomSplit([0.7, 0.3])

In [None]:
layers = [25, 24, 24, 8]

In [None]:
#trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
classifier = MultilayerPerceptronClassifier(labelCol='redshift', featuresCol='features', maxIter=100, layers=layers, blockSize=128, seed=1234)


In [None]:
start_time = time.time()
model = classifier.fit(train)
print("%s seconds" % (time.time() - start_time))

In [None]:
result = model.transform(test)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'redshift', metricName = 'accuracy')
print('Random Forest Accu:', multi_evaluator.evaluate(result))

Random Forest Accu: 0.8539784435309296


In [None]:
precision_evaluator = MulticlassClassificationEvaluator(labelCol = 'redshift', metricName = 'weightedPrecision')
print('Random Forest Accu:', precision_evaluator.evaluate(result))

Random Forest Accu: 0.8258246521231389


In [None]:
print('test data (weightedPrecision): ', multi_evaluator.setMetricName('weightedPrecision').evaluate(result))

test data (weightedPrecision):  0.8258246521231389


In [None]:
print('test data (weightedRecall): ', multi_evaluator.setMetricName('weightedRecall').evaluate(result))

test data (weightedRecall):  0.8539784435309297


#Cascading

In [None]:
result2 = model.transform(final_df)

In [None]:
result2.show(3)

+--------------------+--------+--------------------+--------------------+----------+
|            features|redshift|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|[21.63269,21.2791...|       1|[1.57716458308749...|[0.17029967441546...|       1.0|
|[19.74829,19.4581...|       2|[1.94432037178717...|[0.25368640497025...|       1.0|
|[23.74654,23.1965...|       0|[5.08391356729749...|[0.94851365831968...|       0.0|
+--------------------+--------+--------------------+--------------------+----------+
only showing top 3 rows



In [None]:
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("objid", monotonically_increasing_id())
df.show(5)

+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------------------+----------+------------------+----------+--------+--------+--------+--------+---------+------------------+---------+--------+------------------+---------+--------+------------------+-----+
|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|fiberMag_u|fiberMag_g|fiberMag_r|fiberMag_i|fiberMag_z|        petroR50_r|petroR90_r|        petroR50_z|petroR90_z|       r|       i|       z|redshift|     mmug|              mmgr|     mmri|    mmiz|              mfug|     mfgr|    mfri|              mfiz|objid|
+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------------------+----------+------------------+----------+--------+--------+--------+--------+---------+------------------+---------+--------+------------------+---------+--------+------------------+-----+
|  21.63269|  21.27911|   21.2255|   20.956

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
result2 = result2.withColumn("objid", monotonically_increasing_id())
result2.show(3)

+--------------------+--------+--------------------+--------------------+----------+-----+
|            features|redshift|       rawPrediction|         probability|prediction|objid|
+--------------------+--------+--------------------+--------------------+----------+-----+
|[21.63269,21.2791...|       1|[1.57716458308749...|[0.17029967441546...|       1.0|    0|
|[19.74829,19.4581...|       2|[1.94432037178717...|[0.25368640497025...|       1.0|    1|
|[23.74654,23.1965...|       0|[5.08391356729749...|[0.94851365831968...|       0.0|    2|
+--------------------+--------+--------------------+--------------------+----------+-----+
only showing top 3 rows



In [None]:
print(df.count(), len(df.columns))
print(result2.count(), len(result2.columns))

new_df = df.join(result2, df.objid == result2.objid).select(df["*"],result2["prediction"])
#new_df = df.join(dt_predictions, df.objid == dt_predictions.objid)
new_df.show(5)

3524477 27
3524477 6
+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------------------+------------------+----------+------------------+--------+--------+--------+--------+---------+------------------+------------------+---------+---------+---------+---------+---------+-----+----------+
|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|fiberMag_u|fiberMag_g|fiberMag_r|fiberMag_i|fiberMag_z|        petroR50_r|        petroR90_r|petroR50_z|        petroR90_z|       r|       i|       z|redshift|     mmug|              mmgr|              mmri|     mmiz|     mfug|     mfgr|     mfri|     mfiz|objid|prediction|
+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+------------------+------------------+----------+------------------+--------+--------+--------+--------+---------+------------------+------------------+---------+---------+---------+---------+---------+----

In [None]:
assembler = VectorAssembler(inputCols = ['modelMag_u', 'modelMag_g', 'modelMag_r', 'modelMag_i', 'modelMag_z', 'fiberMag_u', 'fiberMag_g', 'fiberMag_r', 'fiberMag_i', 'fiberMag_z', 'petroR50_r', 'petroR90_r', 'petroR50_z', 'petroR90_z', 'r', 'i', 'z', 'mmug', 'mmgr', 'mmri', 'mmiz', 'mfug', 'mfgr', 'mfri', 'mfiz', 'prediction'], outputCol = 'features2')

In [None]:
output = assembler.transform(new_df)

In [None]:
final_df = output.select('features2', 'redshift')
final_df.show(3)

+--------------------+--------+
|           features2|redshift|
+--------------------+--------+
|[21.63269,21.2791...|       1|
|[20.90748,21.2373...|       1|
|[23.24831,21.2138...|       3|
+--------------------+--------+
only showing top 3 rows



In [None]:
train, test = final_df.randomSplit([0.7, 0.3])

In [None]:
layers = [26, 24, 24, 8]

In [None]:
classifier2 = MultilayerPerceptronClassifier(labelCol='redshift', featuresCol='features2', maxIter=100, layers=layers, blockSize=128, seed=1234)

In [None]:
model2 = classifier2.fit(train)

In [None]:
result2 = model2.transform(test)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'redshift', metricName = 'accuracy')
print('Decision Tree Accu:', multi_evaluator.evaluate(result2))

Decision Tree Accu: 0.8538496959634192


In [None]:
print('test data (weightedPrecision): ', multi_evaluator.setMetricName('weightedPrecision').evaluate(result2))
print('test data (weightedRecall): ', multi_evaluator.setMetricName('weightedRecall').evaluate(result2))

test data (weightedPrecision):  0.8276868780281237
test data (weightedRecall):  0.8538496959634191
