# Data Cleaning Test #2

In [1]:
import time
import numpy as np
from pyspark.sql.types import StructType, StructField, LongType
from pyspark.sql import SparkSession
import pyspark.sql
import pyspark.sql.functions

In [2]:
%reload_ext autoreload
%autoreload 2

In [3]:
# store path to notebook
PWD = !pwd
PWD = PWD[0]

In [4]:
# start Spark Session
app_name = "data_cleaning_notebook"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext

Do this on the command line first: hadoop fs -mkdir /RddCheckPoint

In [5]:
#sc.setCheckpointDir("hdfs://quickstart.cloudera:8020/RddCheckPoint")

## Load the data

Load training and test data

In [6]:
trainRDD = sc.textFile('data/train1percentsample.txt')
#testRDD = sc.textFile('data/test1percentsample.txt')

In [7]:
def ConvertNumber(idx, num):
    if num != '':
        if idx > 13:
            return int(num, 16)
        else:
            return int(num)
    else:
        return None

In [8]:
trainRDD2 = trainRDD.map(lambda x: [ConvertNumber(idx, num) for idx,num in enumerate(x.split('\t'))]).cache()
#testRDD2 = testRDD.map(lambda x: [ConvertNumber(idx, num) for idx,num in enumerate(x.split('\t'))]).cache()

In [9]:
trainRDDRow = trainRDD2.take(1)
numColumns = len(trainRDDRow[0])

In [10]:
structFieldList = [StructField('field_' + str(num), LongType(), True) for num in range(numColumns)]
schema = StructType(structFieldList)

trainDF = spark.createDataFrame(trainRDD2, schema)
#testDF = spark.createDataFrame(testRDD2, schema)

## Define functions

In [11]:
def scaleRow(row):
    rowDict = row.asDict()
    
    # Scale by subtracting the min, and dividing by the delta
    for field in scaleDict.keys():
        temp = float(rowDict[field]-scaleDict[field][0])/scaleDict[field][1]
        
        # The test set may have data that is outside the range in the training set.
        # Limit the range to 0-1.
        rowDict[field] = min(max(temp, 0.0), 1.0)
        
    return pyspark.sql.Row(**rowDict)

In [12]:
def scaleDataFrame_fit(fields, df):

    # Note: Need to rename the 'summary' column, because using it in the filter statement tries to invoke the function
    summaryDF = df.select(fields).summary(['min', 'max']).withColumnRenamed('summary', 'summary_col').cache()
    
    minRow = summaryDF.filter(summaryDF.summary_col == 'min').first()
    maxRow = summaryDF.filter(summaryDF.summary_col == 'max').first()
    
    for field in fields:
        min = int(minRow[field])
        max = int(maxRow[field])    
        scaleDict[field] = (min, max-min)

In [13]:
def scaleDataFrame_transform(df):
    return df.rdd.map(scaleRow).toDF()

In [14]:
def oheForRow(value):

    # Encode the rare / unseen value
    if value not in valueDict:
        oheList = [0] * len(valueDict)
        oheList.append(1)
        return oheList

    oheList = []

    # Encode values that have been seen
    for key in valueDict.keys():
        if key == value:
            oheList.append(1)
        else:
            oheList.append(0)
    
    # Last column is for rare / unseen. Set to 0.
    oheList.append(0)
    
    return oheList

In [15]:
def oheDataFrame_fit(field, topN, df):

    # Find the frequency of items in the category
    fieldFreqRDD = df.rdd.map(lambda x: (x[field], 1)).\
                          reduceByKey(lambda x, y: x+y)

    # Use the top N frequent values
    valueCountList = fieldFreqRDD.takeOrdered(topN, key=lambda x: -x[1])
    
    # Add those values to a dictionary for OHE
    for pair in valueCountList:
        valueDict[pair[0]] = True

In [16]:
def oheDataFrame_transform(field, df):

    # Create a new RDD with the encoded values
    oheRDD = df.rdd.map(lambda row: oheForRow(row[field]))

    # Create a new DataFrame for the OHE RDD
    structFieldList = [StructField(field + '_' + str(num), LongType(), True) for num in range(len(valueDict))]
    structFieldList.append(StructField(field + '_UnkwnRare', LongType(), True))
    schema = StructType(structFieldList)
    oheDF = spark.createDataFrame(oheRDD, schema)
    
    # Add an index column
    df = df.withColumn('id', pyspark.sql.functions.monotonically_increasing_id())
    oheDF = oheDF.withColumn('id', pyspark.sql.functions.monotonically_increasing_id())

    # Join the original DataFrame with the OHE DataFrame
    #updatedDF = df.join(oheDF, df.id == oheDF.id, 'inner').drop(oheDF.id)
    updatedDF = df.join(oheDF, ['id'])
    
    # Drop the original field that was OHE
    updatedDF = updatedDF.drop(field)
    updatedDF = updatedDF.drop('id')

    return updatedDF

# TODO: Optimize this to do all columns at once

In [17]:
def imputeWithMean(field, df):
    fieldMean = df.rdd.map(lambda row: row[field]).filter(lambda x: x != None).mean()
    return df.fillna(fieldMean, [field])

## Clean the data

In [18]:
startTime = time.time()

In [19]:
print('Started processing the fields...')

Started processing the fields...


### Field 1

This field has about 42% null values.  Drop it.

In [20]:
trainDF = trainDF.drop('field_1')
#testDF = testDF.drop('field_1')

### Field 2

No null values, so scale this field.

### Field 3

This field has about 23% null values.  Drop it.

In [21]:
trainDF = trainDF.drop('field_3')
#testDF = testDF.drop('field_3')

### Field 4

This field has about 25% null values.  Drop it.

In [22]:
trainDF = trainDF.drop('field_4')
#testDF = testDF.drop('field_4')

### Field 5

This field has about 3% null values.  Impute with the mean and scale.

In [23]:
trainDF = imputeWithMean('field_5', trainDF)
#testDF = imputeWithMean('field_5', testDF)

### Field 6

This field has about 22% null values. Drop it.

In [24]:
trainDF = trainDF.drop('field_6')
#testDF = testDF.drop('field_6')

### Field 7

This field has about 4% null values. Impute with the mean and scale.

In [25]:
trainDF = imputeWithMean('field_7', trainDF)
#testDF = imputeWithMean('field_7', testDF)

### Field 8

This field has < 1% null values. Impute with the mean and scale.

In [26]:
trainDF = imputeWithMean('field_8', trainDF)
#testDF = imputeWithMean('field_8', testDF)

## Field 9

This field has 4% null values. Impute with the mean and scale.

In [27]:
trainDF = imputeWithMean('field_9', trainDF)
#testDF = imputeWithMean('field_9', testDF)

### Field 10

This field has 42% null values. Drop it.

In [28]:
trainDF = trainDF.drop('field_10')
#testDF = testDF.drop('field_10')

### Field 11

This field has 4% null values. Impute with the mean and scale.

In [29]:
trainDF = imputeWithMean('field_11', trainDF)
#testDF = imputeWithMean('field_11', testDF)

### Field 12

This field has 77% null values. Drop it.

In [30]:
trainDF = trainDF.drop('field_12')
#testDF = testDF.drop('field_12')

### Field 13

This field has 25% null values. Drop it.

In [31]:
trainDF = trainDF.drop('field_13')
#testDF = testDF.drop('field_13')

### Field 14

This field has 0% null values. It has a modest amount of distinct values, so OHE.

### Field 15

This field has 0% null values. It has a modest amount of distinct values, so OHE.

### Field 16

This field has 4% null values. It has a large amount of distinct values, so drop.

In [32]:
trainDF = trainDF.drop('field_16')
#testDF = testDF.drop('field_16')

### Field 17

This field has 4% null values. It has a large amount of distinct values, so drop.

In [33]:
trainDF = trainDF.drop('field_17')
#testDF = testDF.drop('field_17')

### Field 18

This field has 0% null values. It has a modest amount of distinct values, so OHE.

### Field 19

This field has 12% null values, so delete those observations (NOTE: this is categorical, so we can't impute).
It has a modest amount of distinct values, so OHE.

### Field 20

This field has 0% null values. It has a modest amount of distinct values, so OHE.

### Field 21

This field has 0% null values. It has a modest amount of distinct values, so OHE.

### Field 22

This field has 0% null values. It has a small amount of distinct values, so OHE.

### Field 23

This field has 0% null values. It has a large amount of distinct values, so drop.

In [34]:
trainDF = trainDF.drop('field_23')
#testDF = testDF.drop('field_23')

### Field 24

This field has 0% null values. It has a modest amount of distinct values, so OHE.

### Field 25

This field has 4% null values. It has a large amount of distinct values, so drop.

In [35]:
trainDF = trainDF.drop('field_25')
#testDF = testDF.drop('field_25')

### Field 26

This field has 0% null values. It has a modest amount of distinct values, so OHE.

### Field 27

This field has 0% null values. It has a small amount of distinct values, so OHE.

### Field 28

This field has 0% null values. It has a modest amount of distinct values, so OHE.

### Field 29

This field has 4% null values. It has a large amount of distinct values, so drop.

In [36]:
trainDF = trainDF.drop('field_29')
#testDF = testDF.drop('field_29')

### Field 30

This field has 0% null values. It has a small amount of distinct values, so OHE.

### Field 31

This field has 0% null values. It has a modest amount of distinct values, so OHE.

### Field 32

This field has 48% null values, so drop.

In [37]:
trainDF = trainDF.drop('field_32')
#testDF = testDF.drop('field_32')

### Field 33

This field has 48% null values, so drop.

In [38]:
trainDF = trainDF.drop('field_33')
#testDF = testDF.drop('field_33')

### Field 34

This field has 4% null values. It has a large amount of distinct values, so drop.

In [39]:
trainDF = trainDF.drop('field_34')
#testDF = testDF.drop('field_34')

### Field 35

This field has 74% null values, so drop.

In [40]:
trainDF = trainDF.drop('field_35')
#testDF = testDF.drop('field_35')

### Field 36

This field has 0% null values. It has a small amount of distinct values, so OHE.

### Field 37

This field has 4% null values. It has a large amount of distinct values, so drop.

In [41]:
trainDF = trainDF.drop('field_37')
#testDF = testDF.drop('field_37')

### Field 38

This field has 48% null values, so drop.

In [42]:
trainDF = trainDF.drop('field_38')
#testDF = testDF.drop('field_38')

### Field 39

This field has 48% null values, so drop.

In [43]:
trainDF = trainDF.drop('field_39')
#testDF = testDF.drop('field_39')

In [44]:
print('...Finished processing the fields')

...Finished processing the fields


## Delete null observations in a batch (this is for categorical values)

In [45]:
trainDF = trainDF.dropna(how='any', subset=['field_19'])
#testDF = testDF.dropna(how='any', subset=['field_19'])

## TODO: Do Imputation In a Batch (this is for numerical values)

## Do Scaling (all columns at once)

* Note that this is being done after observations have been deleted

In [46]:
print('Started scaling fields...')

Started scaling fields...


In [47]:
scaleDict = {}
scaleDataFrame_fit(['field_2','field_5','field_7','field_8','field_9','field_11'], trainDF)
trainDF = scaleDataFrame_transform(trainDF)
#testDF = scaleDataFrame_transform(testDF)

In [48]:
trainDF.select(['field_2','field_5','field_7','field_8','field_9','field_11']).show(n=20)
#testDF.select(['field_2','field_5','field_7','field_8','field_9','field_11']).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             field_2|             field_5|             field_7|             field_8|             field_9|            field_11|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|2.563445270443476E-4|5.197465732413659E-4|1.624827362092777...|8.710801393728223E-4| 1.44102601051949E-4|0.006944444444444444|
|1.538067162266085...|1.746409230759885...|5.686895767324722E-4|2.903600464576074...|                 0.0|0.006944444444444444|
|1.538067162266085...|1.898270902999875...|0.005361930294906166|0.005226480836236934|0.005547950140500036|0.027777777777777776|
| 0.04301461163804153|5.918808675553611E-4|0.002031034202615972|2.903600464576074...|0.028244109806182003| 0.04861111111111111|
|5.126890540886952E-5|0.002833359149817614|8.124136810463888E-5|                 0.0|                 0.

In [49]:
print('...Finished scaling fields')

...Finished scaling fields


In [50]:
#trainDF.show()

## Create OHE fields (one column at a time to control for number of distinct values)

In [51]:
print('Started OHE fields...')

Started OHE fields...


In [52]:
oheFieldsAndSizes = [('field_14',10),('field_15',10),('field_18',10),('field_19',10),('field_20',10),('field_21',10),('field_22',3),('field_24',10),('field_26',10),('field_27',10),('field_28',10),('field_30',10),('field_31',10),('field_36',10)]

In [53]:
for fieldAndSize in oheFieldsAndSizes:
    valueDict = {}
    fieldToEncode = fieldAndSize[0]
    oheDataFrame_fit(fieldToEncode, fieldAndSize[1], trainDF)
    trainDF = oheDataFrame_transform(fieldToEncode, trainDF)
    #testDF = oheDataFrame_transform(fieldToEncode, testDF)
    
    trainDF.cache()
    #testDF.cache()

    #trainDF.checkpoint(eager=True)
    #testDF.checkpoint(eager=True)

In [54]:
print('...Finished OHE fields')

...Finished OHE fields


## Save cleaned DataFrame to a file

In [55]:
print('Started saving the DataFrames...')

Started saving the DataFrames...


In [56]:
trainDF.write.csv(path='data/train1percentsample_cleaned_tmp.txt', header=False, sep='\t')
#testDF.write.csv(path='data/test1percentsample_cleaned_tmp.txt', header=False, sep='\t')

In [57]:
!rm -rf data/train1percentsample_cleaned.txt
#!rm -rf data/test1percentsample_cleaned.txt
!mv data/train1percentsample_cleaned_tmp.txt data/train1percentsample_cleaned.txt
#!mv data/test1percentsample_cleaned_tmp.txt data/test1percentsample_cleaned.txt

In [58]:
print('...Finished saving the DataFrames')

...Finished saving the DataFrames


In [59]:
print('The total time was: {} seconds'.format(time.time() - startTime))

The total time was: 442.5566794872284 seconds
