In [1]:
import findspark
findspark.init()

In [2]:
#For this task, predictions are first made supervised, i.e. with pre-defined labels
#Afterwards, a series of predictions are made unsupervised, i.e. without any labels, using different files as input
#Labels are created during dataframe construction in the sql queries based on the thresholds specified in the midterm doc
#Smart_normalized task appears first followed by annual failure rate
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()
df1 = spark.read.format("csv").options(header = 'true', inferSchema = 'true').load("C:/Users/jack_/Desktop/Big_data/drive_stats_2019_Q1/2019-01-01.csv")
df1.createOrReplaceTempView("table1")

In [3]:
df2 = spark.read.format("csv").options(header = 'true', inferSchema = 'true').load("C:/Users/jack_/Desktop/Big_data/drive_stats_2019_Q1/2019-03-17.csv")
df2.createOrReplaceTempView("table2")

In [4]:
df3 = spark.read.format("csv").options(header = 'true', inferSchema = 'true').load("C:/Users/jack_/Desktop/Big_data/drive_stats_2019_Q1/*.csv")
df3.createOrReplaceTempView("table3")

In [5]:
#Calculate annual failure rate and create label
df_drive_days = spark.sql("select model, count(*) as drive_days from table3 group by model")
df_failures = spark.sql("select model, count(*) as failures from table3 where failure = 1 group by model")
df_model_count = spark.sql("select model, count(*) as drive_count from table3 where date = '2019-01-01' group by model")
df_drive_days.createOrReplaceTempView("drive_days")
df_failures.createOrReplaceTempView("failures")
df_model_count.createOrReplaceTempView("model_count")
df_failure_rates = spark.sql("select drive_days.model as model, drive_days.drive_days as drive_days, failures.failures as failures, 100.0*(1.0 * failures)/(drive_days/365.0) as annual_failure_rate from drive_days, failures, model_count where drive_days.model = failures.model and model_count.model = failures.model order by model")
df_failure_rates.createOrReplaceTempView("table4")
df_rates = spark.sql("select model, drive_days, failures, annual_failure_rate, case when annual_failure_rate > 2 then 1 else 0 end as afr from table4")
df_rates.show()

+--------------------+----------+--------+-------------------+---+
|               model|drive_days|failures|annual_failure_rate|afr|
+--------------------+----------+--------+-------------------+---+
|HGST HMS5C4040ALE640|    313383|       2|           0.232942|  0|
|HGST HMS5C4040BLE640|   1172824|      11|           0.342336|  0|
|HGST HUH721212ALN604|    259749|       4|           0.562081|  0|
|HGST HUH728080ALE600|     93598|       3|           1.169897|  0|
|       ST10000NM0086|    108555|       3|           1.008705|  0|
|       ST12000NM0007|   2955025|     180|           2.223331|  1|
|         ST4000DM000|   1989429|     107|           1.963126|  0|
|       ST500LM012 HN|     50619|      12|           8.652877|  1|
|          ST500LM030|     14479|       9|          22.688031|  1|
|         ST6000DX000|    135832|       1|           0.268714|  0|
|         ST8000DM002|    888741|      29|           1.191011|  0|
|         ST8000DM004|       273|       1|         133.699670|

In [65]:
#create label based on smart_1_normalized and a threshold of 100
df_normalized = spark.sql("select model, serial_number, case when (smart_1_normalized) >= 100 then 1 else 0 end as smart from table1 limit 1000")
df_normalized = df_normalized.fillna(0)
df_normalized.show(1000)

+--------------------+---------------+-----+
|               model|  serial_number|smart|
+--------------------+---------------+-----+
|         ST4000DM000|       Z305B2QN|    1|
|       ST12000NM0007|       ZJV0XJQ4|    0|
|       ST12000NM0007|       ZJV0XJQ3|    0|
|       ST12000NM0007|       ZJV0XJQ0|    0|
|HGST HMS5C4040ALE640| PL1331LAHG1S4H|    1|
|        ST8000NM0055|       ZA16NQJR|    0|
|       ST12000NM0007|       ZJV02XWG|    0|
|       ST12000NM0007|       ZJV1CSVX|    0|
|       ST12000NM0007|       ZJV02XWA|    0|
|        ST8000NM0055|       ZA18CEBS|    0|
|         ST4000DM000|       Z305DEMG|    1|
|         ST8000DM002|       ZA130TTW|    0|
|       ST12000NM0007|       ZJV1CSVV|    0|
|        ST8000NM0055|       ZA18CEBF|    0|
|       ST12000NM0007|       ZJV02XWV|    0|
|HGST HMS5C4040ALE640| PL2331LAG9TEEJ|    1|
|HGST HMS5C4040BLE640| PL2331LAH3WYAJ|    1|
|HGST HMS5C4040BLE640| PL1331LAHG53YH|    1|
| TOSHIBA MG07ACA14TA|   88Q0A0LGF97G|    1|
|HGST HMS5

In [66]:
#create dataframe to serve as test data for the trainer
df_test = spark.sql("select model, serial_number from table1 limit 1000")
df_test.show()

+--------------------+--------------+
|               model| serial_number|
+--------------------+--------------+
|         ST4000DM000|      Z305B2QN|
|       ST12000NM0007|      ZJV0XJQ4|
|       ST12000NM0007|      ZJV0XJQ3|
|       ST12000NM0007|      ZJV0XJQ0|
|HGST HMS5C4040ALE640|PL1331LAHG1S4H|
|        ST8000NM0055|      ZA16NQJR|
|       ST12000NM0007|      ZJV02XWG|
|       ST12000NM0007|      ZJV1CSVX|
|       ST12000NM0007|      ZJV02XWA|
|        ST8000NM0055|      ZA18CEBS|
|         ST4000DM000|      Z305DEMG|
|         ST8000DM002|      ZA130TTW|
|       ST12000NM0007|      ZJV1CSVV|
|        ST8000NM0055|      ZA18CEBF|
|       ST12000NM0007|      ZJV02XWV|
|HGST HMS5C4040ALE640|PL2331LAG9TEEJ|
|HGST HMS5C4040BLE640|PL2331LAH3WYAJ|
|HGST HMS5C4040BLE640|PL1331LAHG53YH|
| TOSHIBA MG07ACA14TA|  88Q0A0LGF97G|
|HGST HMS5C4040BLE640|PL2331LAHDUVVJ|
+--------------------+--------------+
only showing top 20 rows



In [67]:
#more test data
df_test2 = spark.sql("select model, serial_number from table2 limit 1000")
df_test2.show()

+--------------------+--------------+
|               model| serial_number|
+--------------------+--------------+
|         ST4000DM000|      Z305B2QN|
|       ST12000NM0007|      ZJV0XJQ4|
|       ST12000NM0007|      ZJV0XJQ3|
|       ST12000NM0007|      ZJV0XJQ0|
|HGST HMS5C4040ALE640|PL1331LAHG1S4H|
|        ST8000NM0055|      ZA16NQJR|
|       ST12000NM0007|      ZJV02XWG|
|       ST12000NM0007|      ZJV1CSVX|
|       ST12000NM0007|      ZJV02XWA|
|        ST8000NM0055|      ZA18CEBS|
|         ST4000DM000|      Z305DEMG|
|         ST8000DM002|      ZA130TTW|
|       ST12000NM0007|      ZJV1CSVV|
|        ST8000NM0055|      ZA18CEBF|
|       ST12000NM0007|      ZJV02XWV|
|HGST HMS5C4040ALE640|PL2331LAG9TEEJ|
|HGST HMS5C4040BLE640|PL2331LAH3WYAJ|
|HGST HMS5C4040BLE640|PL1331LAHG53YH|
| TOSHIBA MG07ACA14TA|  88Q0A0LGF97G|
|HGST HMS5C4040BLE640|PL2331LAHDUVVJ|
+--------------------+--------------+
only showing top 20 rows



In [55]:
#supervised training using labels for smart_1_normalized
#corresponds to question 3b
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
stage_1 = StringIndexer(inputCol= 'model', outputCol= 'model_index')
stage_2 = StringIndexer(inputCol= 'serial_number', outputCol= 'serial_index')
stage_3 = OneHotEncoder(dropLast=True, inputCols=[stage_1.getOutputCol(), stage_2.getOutputCol()], outputCols= ['model_encoded', 'serial_encoded'])
stage_4 = VectorAssembler(inputCols= ['model_encoded', 'serial_encoded'], outputCol='vectors')                         
stage_5 = LogisticRegression(featuresCol='vectors',labelCol='smart')
pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, stage_4, stage_5])
model = pipeline.fit(df_normalized)
newData = model.transform(df_normalized)
newData.show()

+--------------------+--------------+-----+-----------+------------+--------------+-----------------+--------------------+--------------------+--------------------+----------+
|               model| serial_number|smart|model_index|serial_index| model_encoded|   serial_encoded|             vectors|       rawPrediction|         probability|prediction|
+--------------------+--------------+-----+-----------+------------+--------------+-----------------+--------------------+--------------------+--------------------+----------+
|         ST4000DM000|      Z305B2QN|    1|        1.0|       381.0|(20,[1],[1.0])|(999,[381],[1.0])|(1019,[1,401],[1....|[-19.109535906187...|[5.02150618591953...|       1.0|
|       ST12000NM0007|      ZJV0XJQ4|    0|        0.0|       950.0|(20,[0],[1.0])|(999,[950],[1.0])|(1019,[0,970],[1....|[19.8465972173808...|[0.99999999759711...|       0.0|
|       ST12000NM0007|      ZJV0XJQ3|    0|        0.0|       949.0|(20,[0],[1.0])|(999,[949],[1.0])|(1019,[0,969],[1...

In [57]:
#test out the model, make a prediction unsupervised with test data created earlier with no labels
prediction = model.transform(df_test)
prediction.show()

+--------------------+--------------+-----------+------------+--------------+-----------------+--------------------+--------------------+--------------------+----------+
|               model| serial_number|model_index|serial_index| model_encoded|   serial_encoded|             vectors|       rawPrediction|         probability|prediction|
+--------------------+--------------+-----------+------------+--------------+-----------------+--------------------+--------------------+--------------------+----------+
|         ST4000DM000|      Z305B2QN|        1.0|       381.0|(20,[1],[1.0])|(999,[381],[1.0])|(1019,[1,401],[1....|[-19.109535906187...|[5.02150618591953...|       1.0|
|       ST12000NM0007|      ZJV0XJQ4|        0.0|       950.0|(20,[0],[1.0])|(999,[950],[1.0])|(1019,[0,970],[1....|[19.8465972173808...|[0.99999999759711...|       0.0|
|       ST12000NM0007|      ZJV0XJQ3|        0.0|       949.0|(20,[0],[1.0])|(999,[949],[1.0])|(1019,[0,969],[1....|[19.8465972173808...|[0.9999999975

In [68]:
#another unsupervised test on the same model
prediction2 = model.transform(df_test2)
prediction2.show()

+--------------------+--------------+-----------+------------+--------------+-----------------+--------------------+--------------------+--------------------+----------+
|               model| serial_number|model_index|serial_index| model_encoded|   serial_encoded|             vectors|       rawPrediction|         probability|prediction|
+--------------------+--------------+-----------+------------+--------------+-----------------+--------------------+--------------------+--------------------+----------+
|         ST4000DM000|      Z305B2QN|        1.0|       381.0|(20,[1],[1.0])|(999,[381],[1.0])|(1019,[1,401],[1....|[-19.109535906187...|[5.02150618591953...|       1.0|
|       ST12000NM0007|      ZJV0XJQ4|        0.0|       950.0|(20,[0],[1.0])|(999,[950],[1.0])|(1019,[0,970],[1....|[19.8465972173808...|[0.99999999759711...|       0.0|
|       ST12000NM0007|      ZJV0XJQ3|        0.0|       949.0|(20,[0],[1.0])|(999,[949],[1.0])|(1019,[0,969],[1....|[19.8465972173808...|[0.9999999975

In [78]:
#train model supervised using the label created for annual failure rate
#corresponds to question 3a
stage1 = StringIndexer(inputCol= 'model', outputCol= 'model_index')
stage2 = OneHotEncoder(dropLast=True, inputCol=stage_1.getOutputCol(), outputCol= 'model_encoded')
stage3 = VectorAssembler(inputCols= ['model_encoded', 'drive_days', 'failures'], outputCol='vectors')                         
stage4 = LogisticRegression(featuresCol='vectors',labelCol='afr')
pipeline1 = Pipeline(stages= [stage1, stage2, stage3, stage4])
model1 = pipeline1.fit(df_rates)
training = model1.transform(df_rates)
training.show()

+--------------------+----------+--------+-------------------+---+-----------+---------------+--------------------+--------------------+--------------------+----------+
|               model|drive_days|failures|annual_failure_rate|afr|model_index|  model_encoded|             vectors|       rawPrediction|         probability|prediction|
+--------------------+----------+--------+-------------------+---+-----------+---------------+--------------------+--------------------+--------------------+----------+
|HGST HMS5C4040ALE640|    313383|       2|           0.232942|  0|        0.0| (18,[0],[1.0])|(20,[0,18,19],[1....|[20.5015110407407...|[0.99999999875173...|       0.0|
|HGST HMS5C4040BLE640|   1172824|      11|           0.342336|  0|        1.0| (18,[1],[1.0])|(20,[1,18,19],[1....|[20.1866810649778...|[0.99999999828984...|       0.0|
|HGST HUH721212ALN604|    259749|       4|           0.562081|  0|        2.0| (18,[2],[1.0])|(20,[2,18,19],[1....|[20.5016263428710...|[0.99999999875187..

In [79]:
#test the model by making a prediction unsupervised, using test data created earlier
training2 = model1.transform(df_failure_rates)
training2.show()

+--------------------+----------+--------+-------------------+-----------+---------------+--------------------+--------------------+--------------------+----------+
|               model|drive_days|failures|annual_failure_rate|model_index|  model_encoded|             vectors|       rawPrediction|         probability|prediction|
+--------------------+----------+--------+-------------------+-----------+---------------+--------------------+--------------------+--------------------+----------+
|HGST HMS5C4040ALE640|    313383|       2|           0.232942|        0.0| (18,[0],[1.0])|(20,[0,18,19],[1....|[20.5015110407407...|[0.99999999875173...|       0.0|
|HGST HMS5C4040BLE640|   1172824|      11|           0.342336|        1.0| (18,[1],[1.0])|(20,[1,18,19],[1....|[20.1866810649778...|[0.99999999828984...|       0.0|
|HGST HUH721212ALN604|    259749|       4|           0.562081|        2.0| (18,[2],[1.0])|(20,[2,18,19],[1....|[20.5016263428710...|[0.99999999875187...|       0.0|
|HGST HUH7