In [None]:
!pip install pyspark



In [None]:
import time
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('tree').getOrCreate()
df = spark.read.csv('/content/drive/MyDrive/Datasets/Thesis/New Dataset/dr16.csv', inferSchema=True, header=True)
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Unnamed: 0: integer (nullable = true)
 |-- modelMag_u: double (nullable = true)
 |-- modelMag_g: double (nullable = true)
 |-- modelMag_r: double (nullable = true)
 |-- modelMag_i: double (nullable = true)
 |-- modelMag_z: double (nullable = true)
 |-- fiberMag_u: double (nullable = true)
 |-- fiberMag_g: double (nullable = true)
 |-- fiberMag_r: double (nullable = true)
 |-- fiberMag_i: double (nullable = true)
 |-- fiberMag_z: double (nullable = true)
 |-- petroR50_r: double (nullable = true)
 |-- petroR90_r: double (nullable = true)
 |-- petroR50_z: double (nullable = true)
 |-- petroR90_z: double (nullable = true)
 |-- r: double (nullable = true)
 |-- i: double (nullable = true)
 |-- z: double (nullable = true)
 |-- mmug: double (nullable = true)
 |-- mmgr: double (nullable = true)
 |-- mmri: double (nullable = true)
 |-- mmiz: double (nullable = true)
 |-- mfug: double (nullable = true)
 |-- mfgr: double (nullable = true)
 |-- mfri: do

In [None]:
import pyspark.sql.functions as func
df = df.withColumn("redshift", func.round(df["redshift"], 2).cast('integer'))
df.groupBy('redshift').count().show()

+--------+------+
|redshift| count|
+--------+------+
|       1| 48930|
|       3| 71008|
|       2| 31638|
|       0|409778|
|       6|  5347|
|       5|  6185|
|       4|  9264|
|       7|    81|
+--------+------+



In [None]:
df = df.drop('Unnamed: 0', '_c0')
df.show(5)

+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+
|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|fiberMag_u|fiberMag_g|fiberMag_r|fiberMag_i|fiberMag_z|petroR50_r|petroR90_r|petroR50_z|petroR90_z|       r|       i|       z|     mmug|     mmgr|     mmri|     mmiz|     mfug|     mfgr|     mfri|     mfiz|redshift|
+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+
|   23.2391|  21.44343|  19.67306|    18.944|  18.55688|  23.69614|  22.34922|  20.59603|  19.86871|  19.45047|  1.306018|  3.220677|  0.761024|  1.247238|19.67306|  18.944|18.55688|22

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("objid", monotonically_increasing_id())
df.show(5)

+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+-----+
|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|fiberMag_u|fiberMag_g|fiberMag_r|fiberMag_i|fiberMag_z|petroR50_r|petroR90_r|petroR50_z|petroR90_z|       r|       i|       z|     mmug|     mmgr|     mmri|     mmiz|     mfug|     mfgr|     mfri|     mfiz|redshift|objid|
+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+-----+
|   23.2391|  21.44343|  19.67306|    18.944|  18.55688|  23.69614|  22.34922|  20.59603|  19.86871|  19.45047|  1.306018|  3.220677|  0.761024|  1.247238|19.67306|  

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols = ['modelMag_u', 'modelMag_g', 'modelMag_r', 'modelMag_i', 'modelMag_z', 'fiberMag_u', 'fiberMag_g', 
                                         'fiberMag_r', 'fiberMag_i', 'fiberMag_z', 'petroR50_r', 'petroR90_r', 'petroR50_z', 'petroR90_z', 
                                         'r', 'i', 'z', 'mmug', 'mmgr', 'mmri', 'mmiz', 'mfug', 'mfgr', 'mfri', 'mfiz'], outputCol = 'features')

In [None]:
output = assembler.transform(df)
final_df = output.select('features', 'redshift')
final_df.show(5)

+--------------------+--------+
|            features|redshift|
+--------------------+--------+
|[23.2391,21.44343...|       0|
|[24.05348,21.2472...|       0|
|[26.32478,20.7405...|       0|
|[24.76801,19.3207...|       0|
|[21.75992,19.0856...|       0|
+--------------------+--------+
only showing top 5 rows



In [None]:
train, test = final_df.randomSplit([0.7, 0.3])

In [None]:
from pyspark.ml.classification import (DecisionTreeClassifier, RandomForestClassifier, 
                                      GBTClassifier)
from pyspark.ml import Pipeline

dt = DecisionTreeClassifier(labelCol = 'redshift', featuresCol = 'features')
#rf = RandomForestClassifier(labelCol = 'redshift', featuresCol = 'features')

In [None]:
start_time = time.time()
dt_model = dt.fit(train)
print("%s seconds" % (time.time() - start_time))

55.82771134376526 seconds


In [None]:
dt_predictions = dt_model.transform(test)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'redshift', metricName = 'accuracy')
print('Decision Tree Accu:', multi_evaluator.evaluate(dt_predictions))

Decision Tree Accu: 0.8334040795723103


In [None]:
print('test data (weightedPrecision): ', multi_evaluator.setMetricName('weightedPrecision').evaluate(dt_predictions))
print('test data (weightedRecall): ', multi_evaluator.setMetricName('weightedRecall').evaluate(dt_predictions))

test data (weightedPrecision):  0.8232812597071821
test data (weightedRecall):  0.8334040795723103


#Cascading

In [None]:
dt_predictions2 = dt_model.transform(final_df)

In [None]:
dt_predictions2.show(3)

+--------------------+--------+--------------------+--------------------+----------+
|            features|redshift|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|[23.2391,21.44343...|       0|[231196.0,759.0,2...|[0.98500734933855...|       0.0|
|[24.05348,21.2472...|       0|[231196.0,759.0,2...|[0.98500734933855...|       0.0|
|[26.32478,20.7405...|       0|[231196.0,759.0,2...|[0.98500734933855...|       0.0|
+--------------------+--------+--------------------+--------------------+----------+
only showing top 3 rows



In [None]:
from pyspark.sql.functions import monotonically_increasing_id
dt_predictions = dt_predictions2.withColumn("objid", monotonically_increasing_id())
dt_predictions.show(3)

+--------------------+--------+--------------------+--------------------+----------+-----+
|            features|redshift|       rawPrediction|         probability|prediction|objid|
+--------------------+--------+--------------------+--------------------+----------+-----+
|[23.2391,21.44343...|       0|[231196.0,759.0,2...|[0.98500734933855...|       0.0|    0|
|[24.05348,21.2472...|       0|[231196.0,759.0,2...|[0.98500734933855...|       0.0|    1|
|[26.32478,20.7405...|       0|[231196.0,759.0,2...|[0.98500734933855...|       0.0|    2|
+--------------------+--------+--------------------+--------------------+----------+-----+
only showing top 3 rows



In [None]:
print(df.count(), len(df.columns))
print(dt_predictions.count(), len(dt_predictions.columns))

new_df = df.join(dt_predictions, df.objid == dt_predictions.objid).select(df["*"],dt_predictions["prediction"])
#new_df = df.join(dt_predictions, df.objid == dt_predictions.objid)
new_df.show(5)

582231 27
582231 6
+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+-----+----------+
|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|fiberMag_u|fiberMag_g|fiberMag_r|fiberMag_i|fiberMag_z|petroR50_r|petroR90_r|petroR50_z|petroR90_z|       r|       i|       z|     mmug|     mmgr|     mmri|     mmiz|     mfug|     mfgr|     mfri|     mfiz|redshift|objid|prediction|
+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+--------+-----+----------+
|   23.2391|  21.44343|  19.67306|    18.944|  18.55688|  23.69614|  22.34922|  20.59603|  19.86871|  19.45047|  1

In [None]:
assembler = VectorAssembler(inputCols = ['modelMag_u', 'modelMag_g', 'modelMag_r', 'modelMag_i', 'modelMag_z', 'fiberMag_u', 'fiberMag_g', 
                                         'fiberMag_r', 'fiberMag_i', 'fiberMag_z', 'petroR50_r', 'petroR90_r', 'petroR50_z', 'petroR90_z', 
                                         'r', 'i', 'z', 'mmug', 'mmgr', 'mmri', 'mmiz', 'mfug', 'mfgr', 'mfri', 'mfiz', 'prediction'], outputCol = 'features2')

In [None]:
output = assembler.transform(new_df)
final_df = output.select('features2', 'redshift')
final_df.show(3)

+--------------------+--------+
|           features2|redshift|
+--------------------+--------+
|[23.2391,21.44343...|       0|
|[24.05348,21.2472...|       0|
|[25.42423,21.6029...|       0|
+--------------------+--------+
only showing top 3 rows



In [None]:
train, test = final_df.randomSplit([0.7, 0.3])

In [None]:
from pyspark.ml.classification import (DecisionTreeClassifier, RandomForestClassifier, 
                                      GBTClassifier)
from pyspark.ml import Pipeline

dt1 = DecisionTreeClassifier(labelCol = 'redshift', featuresCol = 'features2')
rf = RandomForestClassifier(labelCol = 'redshift', featuresCol = 'features2')
gb = GBTClassifier(labelCol = 'redshift', featuresCol = 'features2')

In [None]:
start_time = time.time()
dt_model1 = dt1.fit(train)
print("%s seconds" % (time.time() - start_time))

68.32338333129883 seconds


In [None]:
dt_predictions2 = dt_model1.transform(test)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'redshift', metricName = 'accuracy')
print('Decision Tree Accu:', multi_evaluator.evaluate(dt_predictions2))

Decision Tree Accu: 0.8454708350746226


In [None]:
print('test data (weightedPrecision): ', multi_evaluator.setMetricName('weightedPrecision').evaluate(dt_predictions2))
print('test data (weightedRecall): ', multi_evaluator.setMetricName('weightedRecall').evaluate(dt_predictions2))

test data (weightedPrecision):  0.8461956549882351
test data (weightedRecall):  0.8454708350746225
