In [1]:
import os
import pyspark
conf = pyspark.SparkConf()
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4041') 
conf.set('spark.sql.repl.eagerEval.enabled', True)
conf.set('spark.driver.memory','4g')
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/13 13:21:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/13 13:22:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
import pyspark.sql.functions as func
from pyspark.sql import Window

In [21]:
#reading the data
df = spark.read.option("header", "true").option("inferSchema", "true").csv("shared/midterm/drive_stats_2019_Q1/*.csv")

temp=df

#filter the dates from 1-jan-2019 to 28-march-2029
temp = temp.withColumn("date", func.to_date("date"))

start_date = func.to_date(func.lit("2019-01-01"))

end_date = func.to_date(func.lit("2019-03-28"))

temp = temp.filter((temp["date"] >= start_date) & (temp["date"] <= end_date))

                                                                                

In [23]:
#create column that has total failures per model
temp1 = temp.groupBy("model").agg(func.sum("failure").alias("failure"))
#create column that has total number per model
temp2= temp.groupBy("model").agg(func.count("failure").alias("total"))
#join them to have one single dataframe
temp3=temp1.join(temp2, on="model", how="full_outer")


                                                                                

model,failure,total
DELLBOSS VD,0,480
HGST HDS5C4040ALE630,0,3039
HGST HMS5C4040ALE640,2,305716
HGST HMS5C4040BLE640,11,1134565
HGST HMS5C4040BLE641,0,87
HGST HUH721010ALE600,0,1194
HGST HUH721212ALE600,1,12480
HGST HUH721212ALN604,4,245133
HGST HUH728080ALE600,3,90595
HGST HUS726040ALE610,0,2514


In [27]:
#find the cumulative days of working which is nothing but the 0 count in failure column
#we get that by subtracting failure from total
temp3=temp3.withColumn("cumulative",temp3["total"]-temp3["failure"])
temp3

                                                                                

model,failure,total,cumulative
DELLBOSS VD,0,480,480
HGST HDS5C4040ALE630,0,3039,3039
HGST HMS5C4040ALE640,2,305716,305714
HGST HMS5C4040BLE640,11,1134565,1134554
HGST HMS5C4040BLE641,0,87,87
HGST HUH721010ALE600,0,1194,1194
HGST HUH721212ALE600,1,12480,12479
HGST HUH721212ALN604,4,245133,245129
HGST HUH728080ALE600,3,90595,90592
HGST HUS726040ALE610,0,2514,2514


In [30]:
#create a udf to find the R value for each model
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

@udf(DoubleType())
def R_calc(cumulative,fail):
    D=87
    R=100.0*((1.0*fail)/(cumulative/D))
    return(R)
temp3 = temp3.withColumn("R_value", R_calc(temp3["cumulative"],temp3["failure"]))

In [31]:
temp3

                                                                                

model,failure,total,cumulative,R_value
DELLBOSS VD,0,480,480,0.0
HGST HDS5C4040ALE630,0,3039,3039,0.0
HGST HMS5C4040ALE640,2,305716,305714,0.0569159410429355
HGST HMS5C4040BLE640,11,1134565,1134554,0.0843503262074788
HGST HMS5C4040BLE641,0,87,87,0.0
HGST HUH721010ALE600,0,1194,1194,0.0
HGST HUH721212ALE600,1,12480,12479,0.6971712476961295
HGST HUH721212ALN604,4,245133,245129,0.1419660668464359
HGST HUH728080ALE600,3,90595,90592,0.2881049099258213
HGST HUS726040ALE610,0,2514,2514,0.0


In [35]:
temp3.describe()

                                                                                

summary,model,failure,total,cumulative,R_value
count,49,49.0,49.0,49.0,49.0
mean,,8.89795918367347,188931.4693877551,188922.57142857145,1.0637228434381965
stddev,,30.021690004866468,538200.5515609826,538171.9689615177,4.780507362375505
min,DELLBOSS VD,0.0,3.0,3.0,0.0
max,WDC WD60EFRX,178.0,2850901.0,2850723.0,33.07984790874524


In [40]:
#Find the mean and standard deviation
R_mean=1.0637228434381965
R_stddev=4.780507362375505
#comp is the value we have to compare with M+1S
comp=R_mean+R_stddev

In [43]:
#find the models that are outlier or anamolous
temp4=temp3.filter(temp3['R_value']>comp)
temp4

                                                                                

model,failure,total,cumulative,R_value
ST8000DM004,1,264,263,33.07984790874524


In [44]:
#find the models on 29-march-2019 
date = func.to_date(func.lit("2019-03-29"))
new_df = df.filter((df["date"] == date))

# join them with the temp4 to get the models that are predicted to fail
result_df = new_df.join(temp4, on="model", how="inner")

In [46]:
#selectin only serial number and model columns for easier reading
result_df=result_df.select("serial_number","model")
result_df

                                                                                

serial_number,model
WCT0EJDJ,ST8000DM004
WCT0EKW3,ST8000DM004
WCT0EJY6,ST8000DM004
