# ETL
In this notebook about 40GB of data from csv files extracted and loaded to PySpark locally. Using PySpark SQL relevant data columns are selcted, transformed and cleaned then saved as CSV file to load and analyse in pandas.

# BackBlaze Data Center Hard Disk Survival Analysis from Diagnosis Data

Since 2013, Backblaze has published statistics and insights based on the hard drives in their data centre. And the HDD diagnosis raw data is available to download for public to use. HDDs consists of enterprise and consumer models and in various capacities.  
Diagnosis was ran every day for all the HDDs and following data are recorded.  
`date`- Date diagnosis executed  
`serial_number` – serial number of the HDD  
`model` – Model number of the HDD  
`capacity_bytes` – Capacity in bytes  
`failure` – Failure flag. failure (1) /no failure(0)  
`smart raw`- All available S.M.A.R.T system monitor values normalised  
`smart normalized`- All available S.M.A.R.T system monitor values normalised  
  
My intention is to analyse what are the HDDs used in 2019 have the good survival rate but along the way will checkout other interesting survival curves. Let’s see what the progress in data storage technology brought to this data centre. This will be a continuously ongoing survival analysis project because the amount of combination can analysed.  
  
Download data at:  
https://www.backblaze.com/b2/hard-drive-test-data.html#downloading-the-raw-hard-drive-test-data




## Import libs

In [1]:
import pandas as pd
from lifelines import KaplanMeierFitter
from humanize import naturalsize

In [2]:
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
%matplotlib inline

In [3]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F

sc = SparkContext()  # if using locally
spark = SQLContext(sc)
pyspark.__version__


'2.4.4'

## CSV files to PySpark locally

In [4]:
%%time
df = spark.read.csv('data', header=True) #load all csv files in the .data folder to PySpark

Wall time: 18.1 s


In [5]:
df.columns #check all columns

['date',
 'serial_number',
 'model',
 'capacity_bytes',
 'failure',
 'smart_1_normalized',
 'smart_1_raw',
 'smart_2_normalized',
 'smart_2_raw',
 'smart_3_normalized',
 'smart_3_raw',
 'smart_4_normalized',
 'smart_4_raw',
 'smart_5_normalized',
 'smart_5_raw',
 'smart_7_normalized',
 'smart_7_raw',
 'smart_8_normalized',
 'smart_8_raw',
 'smart_9_normalized',
 'smart_9_raw',
 'smart_10_normalized',
 'smart_10_raw',
 'smart_11_normalized',
 'smart_11_raw',
 'smart_12_normalized',
 'smart_12_raw',
 'smart_13_normalized',
 'smart_13_raw',
 'smart_15_normalized',
 'smart_15_raw',
 'smart_16_normalized',
 'smart_16_raw',
 'smart_17_normalized',
 'smart_17_raw',
 'smart_22_normalized',
 'smart_22_raw',
 'smart_23_normalized',
 'smart_23_raw',
 'smart_24_normalized',
 'smart_24_raw',
 'smart_168_normalized',
 'smart_168_raw',
 'smart_170_normalized',
 'smart_170_raw',
 'smart_173_normalized',
 'smart_173_raw',
 'smart_174_normalized',
 'smart_174_raw',
 'smart_177_normalized',
 'smart_177_r

We only need following columns  for our analysis.  
'date' - diagnosis executed data  
'serial_number'- Serialnumber of the hdd  
'model' - model name of the hdd  
'capacity_bytes' - size of the hdd  
'failure'- failed / not failed flag  
'smart_9_raw' - numer of hours hdd operated  

## Data Cleaning

In [6]:
#Select relevant columns
df=df['date','serial_number',
 'model',
 'capacity_bytes',
 'failure',
'smart_9_raw']

In [7]:
df.columns #check selected columns

['date', 'serial_number', 'model', 'capacity_bytes', 'failure', 'smart_9_raw']

In [8]:
type(df) #check type of dataframe

pyspark.sql.dataframe.DataFrame

In [9]:
%%time
print((df.count(), len(df.columns))) #check shape of the dataframe, output: (156573413, 6) 

(156573413, 6)
Wall time: 1min 9s


In [10]:
# change type of the columns and names
usedf = df.select(
    F.col("date").astype("date"),
    F.trim(F.col("serial_number")).alias("sn").astype("string"),
    F.trim(F.col("model")).alias("model").astype("string"),
    F.col("capacity_bytes").alias("cap_tb").astype("bigint"),
    F.col("failure").astype("int"),
    F.col("smart_9_raw").alias("smart9").astype("bigint"))

In [11]:
#check the changes
usedf.head()

Row(date=datetime.date(2019, 9, 27), sn='Z305B2QN', model='ST4000DM000', cap_tb=4000787030016, failure=0, smart9=33165)

In [12]:
#To pandas dataframe and check
usedf.limit(10).toPandas()

Unnamed: 0,date,sn,model,cap_tb,failure,smart9
0,2019-09-27,Z305B2QN,ST4000DM000,4000787030016,0,33165
1,2019-09-27,ZJV0XJQ4,ST12000NM0007,12000138625024,0,10203
2,2019-09-27,ZJV0XJQ3,ST12000NM0007,12000138625024,0,7234
3,2019-09-27,ZJV0XJQ0,ST12000NM0007,12000138625024,0,10803
4,2019-09-27,PL1331LAHG1S4H,HGST HMS5C4040ALE640,4000787030016,0,23239
5,2019-09-27,ZA16NQJR,ST8000NM0055,8001563222016,0,21118
6,2019-09-27,ZJV02XWG,ST12000NM0007,12000138625024,0,12147
7,2019-09-27,ZJV1CSVX,ST12000NM0007,12000138625024,0,10460
8,2019-09-27,ZJV02XWA,ST12000NM0007,12000138625024,0,12133
9,2019-09-27,ZA18CEBS,ST8000NM0055,8001563222016,0,18091


In [13]:
#Create manufacturer column and fill
usedf = usedf.withColumn(
    "manuf",
    F.when(F.col("model").like("ST%"), "Seagate")
    .when(F.col("model").like("Hitachi %"), "HGST/Hitachi")
    .when(F.col("model").like("HGST %"), "HGST/Hitachi")
    .when(F.col("model").like("% %"), F.split(F.col("model"), " ")[0])
    .otherwise("unknown"),
)

In [14]:
#check as pandas dataframe
usedf.limit(10).toPandas()

Unnamed: 0,date,sn,model,cap_tb,failure,smart9,manuf
0,2019-09-27,Z305B2QN,ST4000DM000,4000787030016,0,33165,Seagate
1,2019-09-27,ZJV0XJQ4,ST12000NM0007,12000138625024,0,10203,Seagate
2,2019-09-27,ZJV0XJQ3,ST12000NM0007,12000138625024,0,7234,Seagate
3,2019-09-27,ZJV0XJQ0,ST12000NM0007,12000138625024,0,10803,Seagate
4,2019-09-27,PL1331LAHG1S4H,HGST HMS5C4040ALE640,4000787030016,0,23239,HGST/Hitachi
5,2019-09-27,ZA16NQJR,ST8000NM0055,8001563222016,0,21118,Seagate
6,2019-09-27,ZJV02XWG,ST12000NM0007,12000138625024,0,12147,Seagate
7,2019-09-27,ZJV1CSVX,ST12000NM0007,12000138625024,0,10460,Seagate
8,2019-09-27,ZJV02XWA,ST12000NM0007,12000138625024,0,12133,Seagate
9,2019-09-27,ZA18CEBS,ST8000NM0055,8001563222016,0,18091,Seagate


### Create temp view table

In [15]:
#creat and temporary view to calculate days each hdd in operation
usedf.createOrReplaceTempView('drive_days')

In [16]:
%%time
#latest diagnosis run date as retired date for each hdd
#first diagnosis run date as hdd started operation date
#number of days diagnosis ran for each hdd as observed days
#max capacity captured as real capacity for each hdd
#if failure flag set to 1 then the date as failed date
#smart9 value as operational hours for hdd
#failure flag
# all above from temp view table group by manufacturer, model and serial number of the hdd


drive_spans = spark.sql("""
select
    manuf,
    model,
    sn,
    max(date) as retired_date,
    min(date) as launched_date,
    count(date) as observed_days,
    max(cap_tb) as cap_tb,
    min(case when failure=1 then date end) as failed_date,
    max(smart9) as max_hours,
    min(case when failure=1 then smart9 end) as failed_hours,
    max(failure) as failure
from drive_days
group by manuf, model, sn
""").cache()

Wall time: 363 ms


### PySpark to Pandas

In [17]:
%%time
pddf=drive_spans.toPandas()

Wall time: 5min 40s


In [18]:
pddf.manuf.unique()

array(['Seagate', 'HGST/Hitachi', 'TOSHIBA', 'WDC', 'DELLBOSS', 'Samsung',
       'SAMSUNG', 'unknown'], dtype=object)

In [19]:
#replace samsung with same name
pddf=pddf.replace('SAMSUNG','Samsung')

In [20]:
#check for any capacity anomaly
pddf[pddf['cap_tb']<0]

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure
97459,Seagate,ST12000NM0007,ZCH071VT,2018-01-05,2018-01-05,1,-1,2018-01-05,,,1


In [21]:
#get correct capacity
pddf[pddf['model']=='ST12000NM0007']

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure
2,Seagate,ST12000NM0007,ZJV2EHGV,2019-09-30,2018-12-19,286,12000138625024,,6941.0,,0
3,Seagate,ST12000NM0007,ZJV02Y65,2019-09-30,2018-05-16,501,12000138625024,,12209.0,,0
7,Seagate,ST12000NM0007,ZCH06NG7,2019-09-30,2017-11-14,685,12000138625024,,16560.0,,0
8,Seagate,ST12000NM0007,ZJV0WTP3,2019-09-30,2018-12-05,300,12000138625024,,7320.0,,0
12,Seagate,ST12000NM0007,ZJV06G6F,2019-09-30,2019-07-03,90,12000138625024,,2261.0,,0
...,...,...,...,...,...,...,...,...,...,...,...
172896,Seagate,ST12000NM0007,ZJV0QXDZ,2019-09-30,2018-07-28,224,12000138625024,,5935.0,,0
172897,Seagate,ST12000NM0007,ZCH0AZ6R,2019-09-04,2018-04-27,495,12000138625024,2019-09-04,11927.0,11927.0,1
172920,Seagate,ST12000NM0007,ZCH06G1T,2019-04-12,2017-10-30,528,12000138625024,2019-04-12,12743.0,12743.0,1
172954,Seagate,ST12000NM0007,ZJV2EFHJ,2018-12-22,2018-12-19,4,12000138625024,2018-12-22,152.0,152.0,1


In [22]:
#replace correct capacity
ind=pddf[pddf['cap_tb']<0].index
pddf.loc[ind,'cap_tb'] = 12000138625024

In [23]:
#check for any observed_days anomaly
pddf[pddf['observed_days']<=0]

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure


In [24]:
#convert capacity from bytes to terabytes
pddf['cap_tb']=pddf['cap_tb']/(1024**4)

In [25]:
pddf.head()

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure
0,Seagate,ST8000DM002,ZA12KZ1X,2019-09-30,2016-08-07,1143,7.277379,,27951.0,,0
1,Seagate,ST4000DM000,S30117YT,2019-09-30,2015-05-06,1587,3.638695,,38383.0,,0
2,Seagate,ST12000NM0007,ZJV2EHGV,2019-09-30,2018-12-19,286,10.914062,,6941.0,,0
3,Seagate,ST12000NM0007,ZJV02Y65,2019-09-30,2018-05-16,501,10.914062,,12209.0,,0
4,HGST/Hitachi,HGST HMS5C4040BLE640,PL1331LAHER2KH,2019-09-30,2017-02-17,956,3.638695,,23305.0,,0


In [26]:
pddf.sort_values(by=['cap_tb'],ascending=False)

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure
19202,Seagate,ST4000DM000,Z300WEYV,2014-01-28,2014-01-03,26,545999.287909,2014-01-28,522.0,522.0,1
169719,Seagate,ST4000DM000,Z300VLEX,2014-01-28,2014-01-03,26,545999.287909,2014-01-28,465.0,465.0,1
148089,HGST/Hitachi,HGST HMS5C4040ALE640,PL2331LAGSU5RJ,2014-10-09,2014-07-19,83,97633.638677,2014-10-09,1809.0,,1
12217,TOSHIBA,TOSHIBA MG07ACA14TA,78C0A01XF97G,2019-09-30,2018-10-05,360,12.733398,,8710.0,,0
60772,TOSHIBA,TOSHIBA MG07ACA14TA,88Q0A01JF97G,2019-09-30,2018-10-05,360,12.733398,,8690.0,,0
...,...,...,...,...,...,...,...,...,...,...,...
138572,WDC,WDC WD800JB,WD-WMAM9DSH9912,2016-05-31,2014-02-14,766,0.072784,2016-05-31,59759.0,59759.0,1
127240,WDC,WDC WD800BB,WD-WMAMD1850685,2015-11-09,2014-02-14,631,0.072784,2015-11-09,66358.0,66358.0,1
124557,WDC,WDC WD800BB,WD-WMAM9CVE9597,2015-03-06,2014-02-14,385,0.072784,,59614.0,,0
19178,WDC,WDC WD800BB,WD-WMAM9CVY5649,2015-03-20,2014-02-14,399,0.072784,,53969.0,,0


In [27]:
#get correct capacity
pddf[pddf['model']=='ST4000DM000']

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure
1,Seagate,ST4000DM000,S30117YT,2019-09-30,2015-05-06,1587,3.638695,,38383.0,,0
9,Seagate,ST4000DM000,Z305G50R,2019-09-30,2016-02-26,1304,3.638695,,31535.0,,0
11,Seagate,ST4000DM000,Z304JCWL,2019-09-30,2015-10-21,1434,3.638695,,34873.0,,0
18,Seagate,ST4000DM000,Z305GC0B,2019-09-30,2016-02-26,1307,3.638695,,31533.0,,0
20,Seagate,ST4000DM000,Z304JMJ9,2019-09-30,2015-10-20,1435,3.638695,,34892.0,,0
...,...,...,...,...,...,...,...,...,...,...,...
173158,Seagate,ST4000DM000,Z300GQJM,2015-07-14,2013-07-23,664,3.638695,2015-07-14,17500.0,17500.0,1
173165,Seagate,ST4000DM000,Z3015SZE,2015-06-01,2014-02-15,451,3.638695,2015-06-01,11331.0,11331.0,1
173167,Seagate,ST4000DM000,Z300X8AZ,2015-05-15,2014-01-29,471,3.638695,2015-05-15,11789.0,11789.0,1
173172,Seagate,ST4000DM000,Z3025L35,2015-02-17,2014-10-28,112,3.638695,2015-02-17,2794.0,2794.0,1


In [28]:
#get correct capacity
pddf[pddf['model']=='HGST HMS5C4040ALE640']

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure
55,HGST/Hitachi,HGST HMS5C4040ALE640,PL1331LAGA9R6H,2019-09-30,2017-02-17,956,3.638695,,23329.0,,0
62,HGST/Hitachi,HGST HMS5C4040ALE640,PL2331LAHDLJMJ,2019-09-30,2017-03-03,942,3.638695,,22942.0,,0
109,HGST/Hitachi,HGST HMS5C4040ALE640,PL1331LAHGV96H,2019-09-30,2017-02-17,956,3.638695,,23295.0,,0
196,HGST/Hitachi,HGST HMS5C4040ALE640,PL2331LAHDALHJ,2019-09-30,2017-03-03,942,3.638695,,22949.0,,0
228,HGST/Hitachi,HGST HMS5C4040ALE640,PL2331LAGSUJPJ,2019-09-30,2014-08-13,1605,3.638695,,39547.0,,0
...,...,...,...,...,...,...,...,...,...,...,...
173048,HGST/Hitachi,HGST HMS5C4040ALE640,PL1331LAGTKB7H,2017-11-15,2014-09-23,1096,3.638695,,27666.0,,0
173067,HGST/Hitachi,HGST HMS5C4040ALE640,PL1331LAGTLYBH,2017-10-14,2014-09-09,1122,3.638695,,27204.0,,0
173070,HGST/Hitachi,HGST HMS5C4040ALE640,PL1331LAGSDL5H,2017-09-06,2014-06-19,1145,3.638695,,28205.0,,0
173074,HGST/Hitachi,HGST HMS5C4040ALE640,PL1331LAGTL38H,2017-09-19,2014-09-23,1082,3.638695,,26299.0,,0


In [29]:
#replace correct capacity
ind=pddf[pddf['cap_tb']>13].index
pddf.loc[ind,'cap_tb'] = 3.638695

In [30]:
#creat column with operational years for each hdd from operational hours
pddf['max_years']=pddf['max_hours']/(365.2422*24)

In [31]:
pddf['max_years'].max()

31.31620606819256

In [32]:
pddf.sort_values(by=['max_years'],ascending=False).head(25)

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure,max_years
2499,WDC,WDC WD10EADS,WD-WCAU45409452,2015-11-12,2013-04-10,788,0.909681,,274512.0,,0,31.316206
69155,WDC,WDC WD800BB,WD-WCAMD2817456,2014-04-30,2014-02-14,51,0.072784,,163730.0,,0,18.678245
89677,WDC,WDC WD800BB,WD-WCAMD2252340,2015-06-04,2014-02-22,457,0.072784,2015-06-04,141415.0,141415.0,1,16.13256
91387,WDC,WDC WD800BB,WD-WCAMD2603237,2014-08-13,2014-02-14,105,0.072784,2014-08-13,137899.0,137899.0,1,15.731456
154202,WDC,WDC WD800BB,WD-WCAMD2819726,2015-06-29,2014-02-14,500,0.072784,,114239.0,,0,13.032334
18273,WDC,WDC WD800LB,WD-WCADW2290554,2016-03-16,2014-02-14,759,0.072784,2016-03-16,90477.0,90477.0,1,10.321576
129884,WDC,WDC WD800BB,WD-WCAMD3298843,2016-03-17,2014-02-14,758,0.072784,2016-03-17,88492.0,88492.0,1,10.095128
8588,WDC,WDC WD800BB,WD-WCAMD1854163,2015-02-23,2014-02-14,374,0.072784,,86465.0,,0,9.863888
35492,WDC,WDC WD800BB,WD-WCAMD1587919,2016-03-17,2014-02-14,739,0.072784,2016-03-17,85488.0,85488.0,1,9.752433
46693,WDC,WDC WD800BB,WD-WCAMD2323176,2016-03-16,2014-02-13,755,0.072784,2016-03-16,84882.0,84882.0,1,9.683301


In [33]:
#from research remove all WDC hdd WD800BB with max_years more than 10 since market release date and max_years didnt match
ind=pddf[(pddf['max_years']>10)].index
pddf.drop(ind,inplace=True)

In [34]:
pddf['launched_date'] = pddf['launched_date'].astype('datetime64[ns]')
pddf['year'] = pddf['launched_date'].map(lambda x: x.strftime('%Y'))
pddf['year'] = pddf['year'].astype('int64')
pddf['max_years'] = pddf['max_years'].astype('float64')

In [35]:
#remove any data of 2013 that doesnt match since the data is released in 2013
pddf[(pddf['year']==2013) & ((pddf['year']-pddf['max_years']) <= 2012)]

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure,max_years,year
394,Seagate,ST4000DM000,Z300GQ2F,2019-09-27,2013-08-07,1578,3.638695,,51124.0,,0,5.832203,2013
621,Seagate,ST4000DM000,Z300GQ9L,2018-07-03,2013-07-23,1797,3.638695,,42990.0,,0,4.904280,2013
622,Seagate,ST4000DM000,W30062K6,2018-05-22,2013-09-26,1693,3.638695,,40766.0,,0,4.650567,2013
623,Seagate,ST4000DM000,Z300NKJ6,2018-06-09,2013-11-06,1670,3.638695,,40455.0,,0,4.615088,2013
627,Seagate,ST4000DM000,Z300E4XS,2018-05-16,2013-10-15,1664,3.638695,,41010.0,,0,4.678402,2013
...,...,...,...,...,...,...,...,...,...,...,...,...,...
173200,Seagate,ST31500341AS,9VS3G4QQ,2014-01-26,2013-06-08,177,1.364517,,32543.0,,0,3.712491,2013
173202,Seagate,ST3000DM001,W1F0A4AJ,2014-01-14,2013-04-10,224,2.729023,2014-01-14,15497.0,15497.0,1,1.767891,2013
173203,Seagate,ST31500541AS,6XW0P2GJ,2013-10-18,2013-04-10,136,1.364517,2013-10-18,32804.0,32804.0,1,3.742266,2013
173204,Seagate,ST31500541AS,9XW01DVK,2013-08-19,2013-04-10,132,1.364517,,30865.0,,0,3.521065,2013


In [36]:
#remove all hdd date prior to BackBlaze business starting date of 2007 with the grace period on 1 year
ind=pddf[(pddf['year']==2013) & ((pddf['year']-pddf['max_years']) <= 2012)].index
pddf.drop(ind,inplace=True)

In [37]:
pddf.head()

Unnamed: 0,manuf,model,sn,retired_date,launched_date,observed_days,cap_tb,failed_date,max_hours,failed_hours,failure,max_years,year
0,Seagate,ST8000DM002,ZA12KZ1X,2019-09-30,2016-08-07,1143,7.277379,,27951.0,,0,3.188638,2016
1,Seagate,ST4000DM000,S30117YT,2019-09-30,2015-05-06,1587,3.638695,,38383.0,,0,4.378715,2015
2,Seagate,ST12000NM0007,ZJV2EHGV,2019-09-30,2018-12-19,286,10.914062,,6941.0,,0,0.791826,2018
3,Seagate,ST12000NM0007,ZJV02Y65,2019-09-30,2018-05-16,501,10.914062,,12209.0,,0,1.392797,2018
4,HGST/Hitachi,HGST HMS5C4040BLE640,PL1331LAHER2KH,2019-09-30,2017-02-17,956,3.638695,,23305.0,,0,2.658624,2017


## Save to CSV

In [38]:
%%time
pddf.to_csv('./use_data/hdd_data.csv',index=False)

Wall time: 2.51 s


In [39]:
!dir use_data /a/s

 Volume in drive C is Windows-SSD
 Volume Serial Number is 4E0B-2946

 Directory of C:\Users\Thelee\Documents\backblaze\use_data

01/05/2020  05:43 PM    <DIR>          .
01/05/2020  05:43 PM    <DIR>          ..
01/06/2020  06:47 PM        16,449,069 hdd_data.csv
               1 File(s)     16,449,069 bytes

     Total Files Listed:
               1 File(s)     16,449,069 bytes
               2 Dir(s)   2,037,927,936 bytes free
