In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from functools import reduce

# PySpark specific imports
from pyspark.sql.functions import col, isnan, when, count, udf
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import VectorAssembler

# Special version of pandas for spark
from pyspark import pandas as ps
ps.set_option('display.max_rows', 10)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/03 23:50:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Self Defined functions (for code re-usability)

In [2]:
def union_all(dfs: list):
    '''
    combines multiple Spark DataFrames in a given list

    input: a list of Spark DataFrames
    output: a single/unified Spark DataFrames
    '''
    return reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)

def shape_df(a_spark_dataframe) -> tuple:
    '''
    input: a Spark data frame
    output: shape of the DataFrame (as a tuple of row & col --> (#rows, #cols) )
    '''
    return (a_spark_dataframe.count(), len(a_spark_dataframe.columns))

def cast_col_type(a_spark_df, col_list: list, new_type: str):
    ''''
    input: 
        1. a pyspark dataframe (to be used as reference)
        2. a list of col names (to convert their value types)
        3. new type (to convert values to)
            e.g., 'integer', 'float', 'string', 'double', and etc.
    output: a DataFrame with the specified col type changed new_type
    '''
    return a_spark_df.select(
        *[c for c in a_spark_df.columns if c not in col_list], 
        *[col(feat).cast(new_type).alias(feat) for feat in col_list]
        )

def missing_imputer(df_pyspark, col_list, strategy='mean'):
    '''
    filling missing values by the defined "measure of centeral tendency"
    input:
        1. a spark's DataFrame
        2. a list of cols/feats to be used for imputation
        3. the strategy by which missign values are filled
            e.g., mean, median, mode, etc.
    output: the imputed spark DataFrame
    '''
    imputer = Imputer(
        inputCols=col_list,
        outputCols=col_list
    ).setStrategy(strategy)
    return imputer.fit(df_pyspark).transform(df_pyspark)

def balance_dataset(df_pyspark, label_col='label'):
    ''' 
    Re-balancing (weighting) of records in case of imbalanced #labels 
        * e.g., in healthcare or fraud detection the causes 

    fromula to calc weight for label j:
        w_j = n_recs / (n_labels * n_recs_j)  

    input: 
        1. df_pyspark: a spark DataFrame
        2. label_col: the name the label/class col
            * note that the labels must be integers (e.g., 0, 1, 2, ...)
        3. important_label = the label/class which is the important one

    output: a spark DataFrame with an added 'class_weight' to be used for classification
        e.g., to be used in the logistic loss objective function
    '''
    n_recs = df_pyspark.count()
    labels = sorted(list(train.select(label_col).distinct().toPandas()[label_col]))
    n_labels = len(labels)

    labels_freq = {label: df_pyspark.filter(df_pyspark[label_col]==label).count() for label in labels}
    labels_weights = {label: (n_recs / (n_labels * labels_freq[label])) for label in labels}

    # define a udf (user defined function) to return the weight with respect to the label of each record
    assign_weights = udf(lambda x: labels_weights[x])

    return df_pyspark.withColumn("class_weight", assign_weights(col('is_match')).cast('double'))

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('bigdata_final_project_preprocessing').getOrCreate()
spark

### Import Dataset
- import each chunk individually
- combine all chunks together

#### import all dataset chunks

In [4]:
df1 = spark.read.csv('./Data/block_1.csv', header=True, inferSchema=True)
df2 = spark.read.csv('./Data/block_2.csv', header=True, inferSchema=True)
df3 = spark.read.csv('./Data/block_3.csv', header=True, inferSchema=True)
df4 = spark.read.csv('./Data/block_4.csv', header=True, inferSchema=True)
df5 = spark.read.csv('./Data/block_5.csv', header=True, inferSchema=True)
df6 = spark.read.csv('./Data/block_6.csv', header=True, inferSchema=True)
df7 = spark.read.csv('./Data/block_7.csv', header=True, inferSchema=True)
df8 = spark.read.csv('./Data/block_8.csv', header=True, inferSchema=True)
df9 = spark.read.csv('./Data/block_9.csv', header=True, inferSchema=True)
df10 = spark.read.csv('./Data/block_10.csv', header=True, inferSchema=True)

                                                                                

#### combine all chunks of dataset into one cohesive unit

In [5]:
df_list = [df1, df2, df3, df4, df5, df6, df7, df8, df9, df10]

# combine all chunks of dataset into one cohesive unit
df = union_all(df_list)
df.show()

+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|           ?|         1.0|           ?|      1|     1|     1|     1|      0|    true|
|39086|47614|                1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|70031|70237|                1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|84795|97439|                1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|36950|42116|                1|           ?|         1.0|           1|      1|     1|     1|     1|      1|    true|
|42413|48491|                1|           ?|         1.0|       

#### Cleaning the Dataset
- replace "?" signes with Null values
- convert (cast) all col types with new (correct) type
- 

#### replace "?" signes with Null values

In [6]:
# replace ? with Null values
# as ? is not known as missing value to pyspark
df_with_null = df.na.replace('?', None)

print('the shape of the dataset (before preprocessing):', shape_df(df_with_null))
df_with_null.show()
df_with_null.printSchema()

the shape of the dataset (before preprocessing): (5749132, 12)
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|        null|         1.0|        null|      1|     1|     1|     1|      0|    true|
|39086|47614|                1|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|70031|70237|                1|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|84795|97439|                1|        null|         1.0|        null|      1|     1|     1|     1|      1|    true|
|36950|42116|                1|        null|         1.0|           1|      1|     1|     1|     1|      1|    true|
|

#### convert (cast) all col types with new (correct) type

In [7]:
# print the value type of each feat/col
# as we see the Schemas are not correct
# e.g., we have string for cmp_fname_c1 which must be 'float'/'double'
print('\n','Before type conversion'.center(79, "-"))
df_with_null.printSchema()

# make the cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2 cols double_precision float
df_new_type_1 = cast_col_type(df_with_null, df.columns[2:6], 'double')
# df_new_type_1.show(21000)
# df_new_type_1.printSchema()

# make the cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match cols integer
df_new_type_2 = cast_col_type(df_new_type_1, df.columns[6:11], 'int')
# df_new_type_2.show(21000)
# df_new_type_2.printSchema()

# make the labels/classes binary (0, 1) instead of True/False
df_new_type_3 = cast_col_type(df_new_type_2, ['is_match'], 'integer')
# df_new_type_3.show(21000)
# df_new_type_3.printSchema()

df_new_type_final = df_new_type_3
print('\n','After type conversion'.center(79, "-"))
df_new_type_final.printSchema()


 -----------------------------Before type conversion----------------------------
root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: string (nullable = true)
 |-- cmp_fname_c2: string (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: string (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: string (nullable = true)
 |-- cmp_bm: string (nullable = true)
 |-- cmp_by: string (nullable = true)
 |-- cmp_plz: string (nullable = true)
 |-- is_match: boolean (nullable = true)


 -----------------------------After type conversion-----------------------------
root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: double (nullable = true)
 |-- cmp_fname_c2: double (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: double (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: integer (nullable = true)
 |-- cmp_bm:

### Handle Missing Values
- remove recs with more than a threshold Missing values
- replace the Missing values with 'mean', 'median', or 'mode' of their respective col

#### remove recs with more than a threshold Missing values

In [8]:
threshold = 8
df_rm_null_by_thresh = df_new_type_final.dropna(thresh=threshold)
print('number of removed recs with more than threshold Null values:',
      shape_df(df_new_type_final)[0] - shape_df(df_rm_null_by_thresh)[0])



number of removed recs with more than threshold Null values: 793


                                                                                

### remove the cols with high #Null values or insignificant ones.
the removed cols are:
- cmp_lname_c2
- cmp_fname_c2

In [9]:
df_removed_insig_feats = df_rm_null_by_thresh.drop(*['cmp_fname_c2', 'cmp_lname_c2'])
df_removed_insig_feats.show()

+-----+-----+-----------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|     cmp_fname_c1|cmp_lname_c1|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+-----------------+------------+-------+------+------+------+-------+--------+
|37291|53113|0.833333333333333|         1.0|      1|     1|     1|     1|      0|       1|
|39086|47614|              1.0|         1.0|      1|     1|     1|     1|      1|       1|
|70031|70237|              1.0|         1.0|      1|     1|     1|     1|      1|       1|
|84795|97439|              1.0|         1.0|      1|     1|     1|     1|      1|       1|
|36950|42116|              1.0|         1.0|      1|     1|     1|     1|      1|       1|
|42413|48491|              1.0|         1.0|      1|     1|     1|     1|      1|       1|
|25965|64753|              1.0|         1.0|      1|     1|     1|     1|      1|       1|
|49451|90407|              1.0|         1.0|      1|     1|     1|     1|      0|       1|

#### Split the data into train & test
- we split the data inot 0.9, 0.1 splits for train and test sets respectively.
- we didn't use 0.7, 0.3 split because we are dealing with big data and as data grows the give less portion of the data to test set.

In [10]:
train, test = df_removed_insig_feats.randomSplit([0.7, 0.3], seed = 18)

print("the distribution of labels for train set".center(79, "-"))
train.groupBy('is_match').count().show()
print("the distribution of labels for test set".center(79, "-"))
test.groupBy('is_match').count().show()

--------------------the distribution of labels for train set-------------------


                                                                                

+--------+-------+
|is_match|  count|
+--------+-------+
|       1|  14702|
|       0|4008522|
+--------+-------+

--------------------the distribution of labels for test set--------------------




+--------+-------+
|is_match|  count|
+--------+-------+
|       1|   6223|
|       0|1718892|
+--------+-------+



                                                                                

#### replace Missing values of: 
- numerical real valued cols with the 'mean' of their respective col
- numerical binary valued cols with the 'mode' of their respective col

##### 1. replace the Missing values for train set

In [11]:
from pyspark.ml.feature import Imputer



train_imputed_1 = missing_imputer(train, ['cmp_fname_c1', 'cmp_lname_c1'], 'mean')
train_imputed_2 = missing_imputer(train_imputed_1, ['cmp_sex','cmp_bd','cmp_bm','cmp_by','cmp_plz'], 'mode')

print('\n#NaN or #Null values in each col of train set (after imputation)')
count_miss_train = train_imputed_2.select(
    [count(when(isnan(c) | col(c).isNull(), c)).alias(c)
     for c in train.columns if c != 'is_match']
)

display(count_miss_train.toPandas())

                                                                                


#NaN or #Null values in each col of train set (after imputation)


                                                                                

Unnamed: 0,id_1,id_2,cmp_fname_c1,cmp_lname_c1,cmp_sex,cmp_bd,cmp_bm,cmp_by,cmp_plz
0,0,0,0,0,0,0,0,0,0


##### 2. replace the Missing values for test set

In [12]:
test_imputed_1 = missing_imputer(test, ['cmp_fname_c1', 'cmp_lname_c1'], 'mean')
test_imputed_2 = missing_imputer(test_imputed_1, ['cmp_sex','cmp_bd','cmp_bm','cmp_by','cmp_plz'], 'mode')

print('\n#NaN or #Null values in each col of test set (after imputation)')
count_miss_test = test_imputed_2.select(
    [count(when(isnan(c) | col(c).isNull(), c)).alias(c)
     for c in test.columns if c != 'is_match']
)

display(count_miss_test.toPandas())

                                                                                


#NaN or #Null values in each col of test set (after imputation)


                                                                                

Unnamed: 0,id_1,id_2,cmp_fname_c1,cmp_lname_c1,cmp_sex,cmp_bd,cmp_bm,cmp_by,cmp_plz
0,0,0,0,0,0,0,0,0,0


#### Create a Class_Weight, as our labels are unbalanced

In [13]:
train_class_weight = balance_dataset(train_imputed_2, label_col='is_match')
train_class_weight.show()
# train_class_weight.coalesce(1).write.csv("train_final")

test_class_weight = balance_dataset(test_imputed_2, label_col='is_match')
test_class_weight.show()
# test_class_weight.coalesce(1).write.csv("test_final")

                                                                                

+----+-----+-----------------+------------------+-------+------+------+------+-------+--------+------------------+
|id_1| id_2|     cmp_fname_c1|      cmp_lname_c1|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|      class_weight|
+----+-----+-----------------+------------------+-------+------+------+------+-------+--------+------------------+
|   2|78418|              1.0|               0.5|      1|     0|     0|     1|      0|       0|0.5018338429974938|
|   5|46062|0.222222222222222|               1.0|      1|     0|     1|     0|      0|       0|0.5018338429974938|
|   6|45145|              1.0| 0.222222222222222|      1|     0|     1|     0|      0|       0|0.5018338429974938|
|   6|51814|              1.0| 0.166666666666667|      0|     1|     0|     0|      0|       0|0.5018338429974938|
|   7|76887|              1.0|               0.2|      1|     0|     0|     1|      0|       0|0.5018338429974938|
|   8|11109|              1.0|               0.0|      1|     0|     1|     0|  

                                                                                

+----+-----+-----------------+-----------------+-------+------+------+------+-------+--------+------------------+
|id_1| id_2|     cmp_fname_c1|     cmp_lname_c1|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|      class_weight|
+----+-----+-----------------+-----------------+-------+------+------+------+-------+--------+------------------+
|   5|    6|              1.0|              1.0|      1|     1|     1|     1|      1|       1| 138.6079865016873|
|  18|69293|              1.0|              0.0|      1|     0|     1|     0|      0|       0|0.5018101777191354|
|  19| 1961|0.222222222222222|0.555555555555556|      1|     1|     1|     1|      0|       0|0.5018101777191354|
|  19|78044|0.222222222222222|             0.25|      0|     1|     1|     1|      0|       0|0.5018101777191354|
|  21|86148|              0.4|             0.25|      1|     0|     1|     0|      0|       0|0.5018101777191354|
|  26|46196|0.333333333333333|              0.5|      1|     0|     0|     0|      0|   