# A Complete Solution to the BackBaze.com Kaggle Problem

## Step Three.  
### Shaping and cleaning the data.

## Table of contents

1. [Introduction](#10)<br>

2. [Establish environment and parameters](#20)<br>
3. [Data Shaping and Cleaning](#30)<br>
    3.1 [Read in Data](#31)<br>
    3.2 [Remove disks installed after to January 1st 2020 ](#32)<br>
    3.3 [Remove disks that failed in January 2020 and data existing on disks after the failure date.](#33)<br>
    3.4 [Ensure we are not missing any dates.](#34)<br>
4. [Sample the data set to fit environment ](#40)<br>
    4.1 [Understand the underlying proportion of failure to non-failure disks](#41)<br>
    4.2 [Build the sample](#42)<br>
5. [Write DF to a parque file for for use in step 3.](#50)<br>


### 1.0 Introduction <a id="10"></a>

Note this is part two of a four-part solution.

BackBlaze.com, you are the "GOAT." You are the "cat's meow." You "Rock the House." In case you don't know why BackBaze.com is so totally "kick-ass," they open-sourced a vast set of hard drive information a few years ago and continue updating it each quarter.  What a treasure trove of superb data.  BackBlaze.com, thank you from the bottom of my heart.

The backblaze.com data includes operational metrics from hard drives with an indicator of a hard-drive failure.  It is an excellent source for teaching techniques related to machine failure.  Again, thank you for making this available to the open-source community.
Here is a link to the data.

https://www.backblaze.com/b2/hard-drive-test-data.html

My goal in this series of articles is not to give the best solution with the highest AUC.  My goal is to show you how to approach equipment failure problems and build solutions that reflect realistic accuracy, and provide an easy transition from the lab to the real world.

I will use a Spark/Python Jupyter notebook inside IBM's Watson Studio on the cloud as a tool in this discussion.

https://www.ibm.com/cloud/watson-studio

I will also be using cloud object storage on the IBM cloud.

https://www.ibm.com/cloud/block-storage


The third article in this series is not very exciting but is critical.  I will be shaping and scoping the data set I created in the first article, so it is ready for machine learning.

Rarely is it a good idea to take the data as it comes.  It is almost always a good idea to shape your data to be consistent with the problem that needs solving.  The backblaze.com data is no different.  In this notebook, we will take the raw data and shape it to pass it to a machine-learning model.

In the first article of this series, we read the raw data from 2018,2019, and 2020.  Data from 2017, 2018, and 2019 are for creating features.  The 2020 data is for modeling.  Therefore, our shaping exercise is limited to 2020 data.


### 2.0 Establish environment and Parameters <a id="20"></a>


Import the Relevant Libraries and connect to object storage.

In [1]:
#Import relevant Libraries
from functools import reduce
from pyspark.sql import DataFrame

import pyspark.sql.functions as F

from pyspark.sql.functions import when

In [2]:
# The code was removed by Watson Studio for sharing.

### 3.0 Data Shaping and Cleaning <a id="30"></a>

#### 3.1 Read in Data <a id="31"></a>

Read in the data we created in the first notebook.

In [3]:
df_2020 = spark.read.parquet(cos.url('data2020.parquet', 'backblazedata-donotdelete-pr-cij57grgkoctem')).cache()

Note at the begining of the exercise we have 52,286,398 records.

In [4]:
#df_2020.count()

Check for duplicate records.  There are none.

In [5]:
#df_2020=df_2020.distinct()

In [6]:
#df_2020.count()

#### 3.2 Remove disks installed after to January 1st 2020 <a id="32"></a>


When we deploy this model, we will do so on the existing set of disks at the current time.  I want the model to reflect that as well.  In other words, when we deploy this model, we will not predict disks installed in the future.  We will only know about the disks currently in the data center.  To approximate model deployment, it is essential to pick a point in time and only include the disk operating at that time.  In this case, our point in time is 1/1/2020. 

We are defining our modeling data set to include all disk drives installed and operating on January 1, 2020.

Create a list of disk drives operating on 1/1/2020

In [7]:
#Copy the data set so we can aggregate and transform it
df_start=df_2020
#Keep Serial number and date.
df_start=df_start.select([c for c in df_start.columns if c in ['SERIAL_NUMBER','DATE']])
#select records that existed on 1/1/2020
df_start=df_start.filter(df_start.DATE =='2020-01-01')


Include the disks operating on 1/1/20 with an inner join on the original data frame.  The result is only disks running on 1/1/20.

In [8]:
df_start=df_start.select([c for c in df_start.columns if c in ['SERIAL_NUMBER']])
df_start = df_start.withColumnRenamed("SERIAL_NUMBER","DOODEE")

df_2020=df_2020.join(df_start,(((df_start.DOODEE) ==  (df_2020.SERIAL_NUMBER))),"inner")
df_2020 = df_2020.drop("DOODEE")

#### 3.3 Remove disks that failed in January 2020 and data existing on disks after the failure date. <a id="33"></a>

It is essential to remove the disks that failed at the beginning of 2020.  

Why?  

Because these disks were on death's door before we even started, some, if not most, of the information that would predict failure for these disks doesn't exist in the data.  This information would be in 2019.


Also, we need to ensure that a disk does not linger in the data set after it fails.  If a disk fails on Monday, you don't want data for the disk on Tuesday or Wednesday. 

Convert Date to a date field.

In [9]:
df_2020=df_2020.withColumnRenamed("DATE","INPUT")

from pyspark.sql.functions import *

df_2020=df_2020.select(to_date(col("INPUT"),"y-M-dd").alias("DATE"), "*")
df_2020 = df_2020.drop("INPUT").cache()

Create a data set that only includes the disks that failed.

In [10]:
#select records with a failure
df_get=df_2020.filter(df_2020.FAILURE ==1)

#create a complete set of drives that failed.
df_get=df_get.select([c for c in df_get.columns if c in ['SERIAL_NUMBER']])
df_get=df_get.dropDuplicates(['SERIAL_NUMBER'])

df_get = df_get.withColumnRenamed("SERIAL_NUMBER","DOODEE")
df_failures=df_2020.join(df_get,(((df_get.DOODEE) ==  (df_2020.SERIAL_NUMBER))),"inner")
df_failures = df_failures.drop("DOODEE")




Capture the minimum failure date for each disk.  There may be situations where the same disk has two or more failure dates.  We want to identify the first.

In [11]:
df_later=df_2020.filter(df_2020.FAILURE ==1)

from pyspark.sql.functions import col, max as max_, min as min_
df=df_later
v=(df.groupBy("SERIAL_NUMBER")
    .agg(min("DATE")))
v = v.withColumnRenamed("min(DATE)","MINDATE")
v = v.withColumnRenamed("SERIAL_NUMBER","DOODEE")

v=v.filter(v.MINDATE>='2020-01-30')

v=v.sort(("MINDATE"))

Remove any records that exist after the first failure.

In [12]:
df_failures=v.join(df_failures,(((v.DOODEE) ==  (df_failures.SERIAL_NUMBER))),"inner")
df_failures = df_failures.drop("DOODEE")


df_failures=df_failures.filter(df_failures.DATE<=df_failures.MINDATE)
df_failures = df_failures.drop("DOODEE")
df_failures = df_failures.drop("MINDATE")

Create a dataframe that only includes disks that did not fail in 2020.

In [13]:
df_bysn = df_2020.groupBy('SERIAL_NUMBER').agg(F.sum("FAILURE").alias('sum_failure'))
df_bysn=df_bysn.filter(df_bysn.sum_failure==0)

df_bysn=df_bysn.select([c for c in df_bysn.columns if c in ['SERIAL_NUMBER']])
df_bysn=df_bysn.dropDuplicates(['SERIAL_NUMBER'])

df_bysn = df_bysn.withColumnRenamed("SERIAL_NUMBER","DOODEE")

df_nonfailures=df_2020.join(df_bysn,(((df_bysn.DOODEE) ==  (df_2020.SERIAL_NUMBER))),"inner")
df_nonfailures = df_nonfailures.drop("DOODEE")

Append the disks with failures and without failures into a single df

In [14]:

df_total = reduce(DataFrame.unionAll, [df_failures,df_nonfailures])

#df_total.count()

Now we have 43,844,671 records in the modeling dataframe.

#### 3.4 Ensure we are not missing any dates. <a id="34"></a>

When working with time series or panel data, we need to ensure that all dates are consecutive.  That is, there is no missing time.  For example, if you have a time series measured daily for June, you must ensure that June 5th is not missing.  There are many ways to do this in pandas; unfortunately, I don't have an elegant way to do it in SPARK.  The following code is not elegant, but it works. 

Create a field called "ROW" the represents the sequential value of each record for each disk.

In [15]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("SERIAL_NUMBER").orderBy("DATE")

dfzz1=df_total.withColumn("ROW",row_number().over(windowSpec))
dfzz=dfzz1

Create a DF with that adds 1 to the existing row number

In [16]:
#select the relevant fields
dfzz=dfzz.select([c for c in dfzz.columns if c in ['SERIAL_NUMBER', 'DATE','ROW']])
#sort the data
dfzz=dfzz.sort((['SERIAL_NUMBER','ROW']))
#add one to row.
df_1x = dfzz.withColumn('ROW_1', ( dfzz['ROW'] + 1 ) )
#get rid of ROW
df_1x = df_1x.drop("ROW")
df_1=df_1x

Rename the working data frame to become the date in the previous record, then merge it to the original data frame.

In [17]:

#Rename the variables
df_1=df_1.withColumnRenamed("SERIAL_NUMBER","SERIAL_NUMBER_1")
df_1=df_1.withColumnRenamed("DATE","DATE_1")
#merge to the original df
dfzz=dfzz.join(df_1,(((dfzz.ROW) ==  (df_1.ROW_1)) & ((dfzz.SERIAL_NUMBER) ==  (df_1.SERIAL_NUMBER_1))),"inner")



Create a field called "datediff" that is the difference between the current date and the previous date.

In [18]:
from pyspark.sql.functions import *
dfzz=dfzz.select(
      col("SERIAL_NUMBER"),
      col("DATE"),
      col("DATE_1"),
      datediff(col("DATE"),col("DATE_1")).alias("datediff"))

Take the maximum "datediff" for each disk.

In [19]:
v=(dfzz.groupBy("SERIAL_NUMBER")
    .agg(max("datediff")))
v = v.withColumnRenamed("max(datediff)","MAXDATEDIFF")
v = v.withColumnRenamed("SERIAL_NUMBER","DOODEE")

Ensure that the maximum is 1, meaning the largest gap between any two consecutive days is 1.  The resulting data set will contain drives with no missing dates.

In [20]:
v=v.filter(v.MAXDATEDIFF==1)

Join the data frame containing drives with no missing dates to the original data frame with an inner join.  This join eliminates disks with missing dates from the original data frame.


In [21]:
v=v.select([c for c in v.columns if c in ['DOODEE']])

df_total=df_total.join(v,(((df_total.SERIAL_NUMBER) ==  (v.DOODEE))),"inner")
df_total = df_total.drop("DOODEE")

In [22]:
#df_total.count()

Now we have 42,202,057 where all disks are not missing consecutive dates.

### 4.0 Sample the data set to fit environment <a id="40"></a>

Unfortunately, money does not grow on trees.  I don't have unlimited resources to run this notebook and build a model.  In this section I down-sample the data so that I can run it in my environment.

#### 4.1 Understand the underlying proportion of failure to non-failure disks. <a id="41"></a>

I will stratify the sample based on if a drive failed or did not fail in 2020.  The first step in doing so is to understand the underlying base incident failure rate among the disk drives.

In [23]:
df_bysn = df_total.groupBy('SERIAL_NUMBER').agg(F.sum("FAILURE").alias('sum_failure'))
df_nonfailurelist=df_bysn.filter(df_bysn.sum_failure==0)
df_failurelist=df_bysn.filter(df_bysn.sum_failure>0)
df_q = df_failurelist.unionByName(df_nonfailurelist)
#df_q.groupBy("sum_failure").count().show()

We have 1084 disk drives that failed and 119,041 that did not fail.  That is a failure rate of about .9%.  Or, for every disk that failed, about 109 did not fail.  Your sample will need to fit your computing environment.  For me and my environment, I will select 1000 failures and 1000 non-failures.  Again, you will need to scale the sample to what you can actualy run in your computing environment.

#### 4.2 Build the sample. <a id="42"></a>

First, randomly select 1000 disks with a failure.

In [24]:
df_2020=df_total

In [25]:
#identify disks with a failure
df_failure=df_2020.filter(df_2020.FAILURE ==1)
#keep relevant columns
df_failure=df_failure.select([c for c in df_failure.columns if c in ['SERIAL_NUMBER']])
#dedup the data 
df_failure=df_failure.dropDuplicates(['SERIAL_NUMBER'])
#assign a random number to each serial number
df_failure=df_failure.withColumn('wookie', rand())
#sort by the random number
df_failure=df_failure.sort(("wookie"))

#keep the first 1000 records
df_failure=df_failure.limit(1000)

#clean up the data
df_failure_list = df_failure.withColumnRenamed("SERIAL_NUMBER","DOODEE")
df_failure_list = df_failure_list.drop("wookie")

#conduct an inner join to the original data results set will be all records for the 1000 randomly selected disks
df_failure=df_2020.join(df_failure_list,(((df_failure_list.DOODEE) ==  (df_2020.SERIAL_NUMBER))),"inner")
df_failure = df_failure.drop("DOODEE")

Second, randomly select 1000 disks with a failure.

In [26]:
#identify disks that did not fail
df_bysn = df_2020.groupBy('SERIAL_NUMBER').agg(F.sum("FAILURE").alias('sum_failure'))
df_nonfailurelist=df_bysn.filter(df_bysn.sum_failure==0)
#keep relevant variables
df_nonfailurelist=df_nonfailurelist.select([c for c in df_nonfailurelist.columns if c in ['SERIAL_NUMBER']])
#ensure no duplicates
df_nonfailurelist=df_nonfailurelist.dropDuplicates(['SERIAL_NUMBER'])
#create a random number for each disk
df_nonfailurelist=df_nonfailurelist.withColumn('wookie', rand())
#sort the disks by the random number
df_nonfailurelist=df_nonfailurelist.sort(("wookie"))
#keep 1000 randomly selected disks.
df_nonfailurelist=df_nonfailurelist.limit(1000)


#clean up the data
df_nonfailurelist = df_nonfailurelist.withColumnRenamed("SERIAL_NUMBER","DOODEE")
df_nonfailurelist = df_nonfailurelist.drop("wookie")
#join to the original data.  The results set will be the all records from the randomly selected disks
df_nonfailures=df_2020.join(df_nonfailurelist,(((df_nonfailurelist.DOODEE) ==  (df_2020.SERIAL_NUMBER))),"inner")
df_nonfailures = df_nonfailures.drop("DOODEE")

Third, concentate the failures and non-failures

In [27]:
df_total = df_failure.unionByName(df_nonfailures)

#### 5.0 Write DF to a parque file for for use in step 4. <a id="50"></a>

In [28]:
# The code was removed by Watson Studio for sharing.

In [29]:


df_total.write.mode("overwrite").parquet(cos.url('data_2020_final.parquet', 'backblazedata-donotdelete-pr-cij57grgkoctem'))

All data used in this notebook is the property of BackBlaze.com. 

For questions regarding use of data please see the following website.
https://www.backblaze.com/b2/hard-drive-test-data.html