In [1]:
# spark libraries
import pyspark
from pyspark.sql.functions import max

# data wrangling
import pandas as pd
import numpy as np

#### Use spark! 

In [2]:
# create local spark enviroment
spark = pyspark.sql.SparkSession.builder.getOrCreate()

#### Read in the three quarters from 2019

In [4]:
df1 = spark.read.csv('../data/data_Q1_2019/*.csv', header=True)
df2 = spark.read.csv('../data/data_Q2_2019/*.csv', header=True)
df3 = spark.read.csv('../data/data_Q3_2019/*.csv', header=True)

In [5]:
for df in [df1,df2,df3]:
    print(len(df.columns))

129
129
129


#### Combine 2019 dataframes together

In [6]:
df_2019 = df1.union(df2).union(df3)

#### Read in the 4 quarters from 2018

In [7]:
df1 = spark.read.csv('../data/data_Q1_2018/*.csv', header=True)
df2 = spark.read.csv('../data/data_Q2_2018/*.csv', header=True)
df3 = spark.read.csv('../data/data_Q3_2018/*.csv', header=True)
df4 = spark.read.csv('../data/data_Q4_2018/*.csv', header=True)

In [8]:
for df in [df1,df2,df3,df4]:
    print(len(df.columns))

105
109
109
129


#### Since the columns don't match, find the missing columns

In [9]:
list1 = df1.columns
list4 = df4.columns

In [10]:
remove_traits_list = list(set(list4).difference(list1))

In [11]:
remove_traits_list

['smart_170_normalized',
 'smart_23_raw',
 'smart_16_raw',
 'smart_168_raw',
 'smart_174_normalized',
 'smart_231_raw',
 'smart_232_normalized',
 'smart_173_raw',
 'smart_231_normalized',
 'smart_218_raw',
 'smart_173_normalized',
 'smart_24_raw',
 'smart_232_raw',
 'smart_233_normalized',
 'smart_218_normalized',
 'smart_168_normalized',
 'smart_233_raw',
 'smart_24_normalized',
 'smart_17_normalized',
 'smart_174_raw',
 'smart_23_normalized',
 'smart_170_raw',
 'smart_16_normalized',
 'smart_17_raw']

#### Remove missing columns

In [12]:
for trait in remove_traits_list:
    df2 = df2.drop(trait)
    df3 = df3.drop(trait)
    df4 = df4.drop(trait)

In [13]:
for df in [df1,df2,df3,df4]:
    print(len(df.columns))

105
105
105
105


#### Combine 2018 dataframes together

In [14]:
df_2018 = df1.union(df2).union(df3).union(df4)

#### Read in the four quarters from 2017

In [15]:
df1 = spark.read.csv('../data/data_Q1_2017/*.csv', header=True)
df2 = spark.read.csv('../data/data_Q2_2017/*.csv', header=True)
df3 = spark.read.csv('../data/data_Q3_2017/*.csv', header=True)
df4 = spark.read.csv('../data/data_Q4_2017/*.csv', header=True)

In [17]:
for df in [df1,df2,df3,df4]:
    print(len(df.columns))

95
95
95
95


#### Combine the 2017 dataframes together

In [18]:
df_2017 = df1.union(df2).union(df3).union(df4)

#### Read in the four quarters from 2016

In [19]:
df1 = spark.read.csv('../data/data_Q1_2016/*.csv', header=True)
df2 = spark.read.csv('../data/data_Q2_2016/*.csv', header=True)
df3 = spark.read.csv('../data/data_Q3_2016/*.csv', header=True)
df4 = spark.read.csv('../data/data_Q4_2016/*.csv', header=True)

In [20]:
for df in [df1,df2,df3,df4]:
    print(len(df.columns))

95
95
95
95


#### Combine the 2016 dataframes together

In [21]:
df_2016 = df1.union(df2).union(df3).union(df4)

#### Extract max values from top five SMART attributes and convert to pandas

In [22]:
def data_to_pandas(df):
    df = df.groupby('serial_number', 'model', 'capacity_bytes'
              ).agg(
                max('failure'), max('smart_9_raw'), max('smart_5_raw'), 
                max('smart_187_raw') , max('smart_197_raw'), max('smart_198_raw')
                )
    df = df.select("*").toPandas()
    return df

#### Save dfs as csv for easy access

In [23]:
# df = data_to_pandas(df_2019)
# df.to_csv(r'./hard_drives_2019.csv')

In [24]:
# df = data_to_pandas(df_2018)
# df.to_csv(r'./hard_drives_2018.csv')

In [25]:
# df = data_to_pandas(df_2017)
# df.to_csv(r'./hard_drives_2017.csv')

In [26]:
# df = data_to_pandas(df_2016)
# df.to_csv(r'./hard_drives_2016.csv')