# Notebook 1 of 4 - Data Cleaning & Wrangling

**Code Author:** _Tuaha Aftab_ and _Muaaz Ahmed Noor_

**Markdown Authors:** _Muaaz Ahmed Noor_ and _Boom Devahastin Na Ayudhya_

This notebook achieves the goal of wrangling the initial raw dataset to produce a cleaned dataset for use in subsequent EDA and modeling pipelines. S3 is used as the source/destination for the raw dataset and the processed dataset and spark is utilized to carry out the data wrangling process.

## Prologue

Consumer level data contains a plethora of information that provides insight into customer behavior. Whether these are more general macro-level demographic characteristics or micro-level transaction data, the wide spectrum of existing features can easily be used to engineer new features either through brute force EDA, or more creative unsupervised learning algorithms such as clustering into interpretable personas that may themselves be used as categorical features in a supervised model. This is critical insight for marketing data scientists and product managers to recommend the appropriate strategy to target potential customers.

Throughout the series of 4 notebooks, this project seeks to predict whether a customer will open/have a new Securities account by the end of May 2016 (the last month in time series axis if of the dataset). The original data source and base version of this problem was [posed by Santander Bank here](https://www.kaggle.com/c/santander-product-recommendation/data), where you may also find the data dictionary.

You may find the raw data downloaded as `train_ver2.csv` in this Google Drive [via this link](https://drive.google.com/drive/folders/19anLU9JuQ5DRRB38Ff5yzKcD4pRJi0i3?usp=sharing).

## Section 1: Administrative Logistics and Set Ups

### Install Libraries

In [None]:
%%capture
!apt update
!apt install gcc python-dev libkrb5-dev
!pip install sparkmagic

### Spark Cluster Set Up

In [None]:
%%capture
%load_ext sparkmagic.magics 

First, perform the general setup to create the Spark cluster hosted on AWS. If you ever need to restart, you may need to `%spark delete -s spark_session` **OR** just factory reset runtime under the runtime tab.

In [None]:
# Create Spark Sessions: Enter your Master Public DNS with the proper formatting and host, and update the password
%spark add -s spark_session -l python -u http://ec2-3-238-201-92.compute-1.amazonaws.com -a cis545-livy -p Password1 -t Basic_Access

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1638556520274_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


In [None]:
%%spark
from pyspark.sql.types import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Reading Raw Data from Spark

Read raw massive datafile from S3 into Spark

In [None]:
%%spark
raw_data_sdf = spark.read.options(header=True, inferSchema=True).csv('s3://cis545projectbmt/train_ver2.csv')
raw_data_sdf.createOrReplaceTempView("raw_data")
raw_data_sdf.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+------------+---------------+----+---+----------+---------+----------+------+--------------+-----------+-----------+-------+------+--------+-------------+-------+-------+--------+-----------+---------------------+------------------+------------------+-----------------+-----------------+----------------+-----------------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+---------------+-----------------+---------------+
|fecha_dato| ncodpers|ind_empleado|pais_residencia|sexo|age|fecha_alta|ind_nuevo|antiguedad|indrel|ult_fec_cli_1t|indrel_1mes|tiprel_1mes|indresi|indext|conyuemp|canal_entrada|indfall|tipodom|cod_prov|    nomprov|ind_actividad_cliente|             renta|          segmento|ind_ahor_fin_ult1|ind_aval_fi

## Section 2: Data Cleaning

### Renaming Columns

Since the column names are in Spanish and at times not so self-explanatory, it would make it much more convenient to convert these into self-explanatory English names. We accomplish this by using column name aliasing in the SELECT clause in a Spark SQL query.

In [None]:
%%spark

# Declare SQL query to be excecuted
rename_query = """
        SELECT 
              fecha_dato as year_month,
              ncodpers as cust_code,
              ind_empleado as emp_status,
              pais_residencia as residence_country,
              sexo as sex,
              age,
              fecha_alta as first_contract_date,
              ind_nuevo as new_cust,
              antiguedad as months_as_senior,
              indrel as primary,
              ult_fec_cli_1t as last_date_as_primary,
              indrel_1mes as cust_type,
              tiprel_1mes as rel_type,
              indresi as same_residence_country,
              indext as diff_birth_country,
              conyuemp as emp_spouse,
              canal_entrada as joining_channel,
              indfall as deceased,
              tipodom as addr,
              cod_prov as province_cd,
              nomprov as province,
              ind_actividad_cliente as active,
              renta as income,
              segmento as segment,
              ind_ahor_fin_ult1 as savings_acct,
              ind_aval_fin_ult1 as guarantees,
              ind_cco_fin_ult1 as current_acct,
              ind_cder_fin_ult1 as derivada_acct,
              ind_cno_fin_ult1 as payroll_acct,
              ind_ctju_fin_ult1 as junior_acct,
              ind_ctma_fin_ult1 as max_particular_acct,
              ind_ctop_fin_ult1 as particular_acct,
              ind_ctpp_fin_ult1 as particular_plus_acct,
              ind_deco_fin_ult1 as short_term_deposits,
              ind_deme_fin_ult1 as medium_term_deposits,
              ind_dela_fin_ult1 as long_term_deposits,
              ind_ecue_fin_ult1 as e_account,
              ind_fond_fin_ult1 as funds,
              ind_hip_fin_ult1 as mortgage,
              ind_plan_fin_ult1 as pensions,
              ind_pres_fin_ult1 as loans,
              ind_reca_fin_ult1 as taxes,
              ind_tjcr_fin_ult1 as credit_card,
              ind_valo_fin_ult1 as securities,
              ind_viv_fin_ult1 as home_account,
              ind_nomina_ult1 as payroll,
              ind_nom_pens_ult1 as pensions_2,
              ind_recibo_ult1 as direct_debit
          FROM raw_data 
          """


renamed_sdf = spark.sql(rename_query)
renamed_sdf.createOrReplaceTempView("renamed_sdf")
renamed_sdf.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+----------+-----------------+---+---+-------------------+--------+----------------+-------+--------------------+---------+--------+----------------------+------------------+----------+---------------+--------+----+-----------+-----------+------+------------------+------------------+------------+----------+------------+-------------+------------+-----------+-------------------+---------------+--------------------+-------------------+--------------------+------------------+---------+-----+--------+--------+-----+-----+-----------+----------+------------+-------+----------+------------+
|year_month|cust_code|emp_status|residence_country|sex|age|first_contract_date|new_cust|months_as_senior|primary|last_date_as_primary|cust_type|rel_type|same_residence_country|diff_birth_country|emp_spouse|joining_channel|deceased|addr|province_cd|   province|active|            income|           segment|savings_acct|guarantees|current_acct|derivada_acct|payroll_acct|junior_acct|max_pa

### Dropping Nulls

There were about 70 null rows in the `sex` column, which was not possible to impute. Since this was a negligible portion of the over 13 million rows of data that we have, we dropped those rows since we intend to use the Sex feature in our model eventually.

Furthermore, we dropped all rows that were NOT null in the `last_date_as_primary` column since we only want the record of current customers. Our project aims to make a forward-looking prediction of whether a customer would open a securities account in May 2016, but such predictions will not make sense for customers who have left the bank before those dates. Since the remaining data will be only nulls for this column, we will drop the entire column eventually.

There were about 2-3 thousand rows in the data that had null in many of the feature columns. These records were dropped as they were sparsely populated.

In [None]:
# Drop rows

%%spark
na_dropped_sdf = renamed_sdf.dropna(how='all', 
                                    subset=['emp_status', 'new_cust', 'primary', 'cust_type', 'rel_type', 'same_residence_country', 
                                        'diff_birth_country', 'emp_spouse', 'joining_channel', 'deceased', 'addr', 'active', 'segment']) \
                                      .filter("sex is not NULL AND last_date_as_primary is NULL AND addr is not NULL")
na_dropped_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+----------+-----------------+---+---+-------------------+--------+----------------+-------+--------------------+---------+--------+----------------------+------------------+----------+---------------+--------+----+-----------+-----------+------+------------------+------------------+------------+----------+------------+-------------+------------+-----------+-------------------+---------------+--------------------+-------------------+--------------------+------------------+---------+-----+--------+--------+-----+-----+-----------+----------+------------+-------+----------+------------+
|year_month|cust_code|emp_status|residence_country|sex|age|first_contract_date|new_cust|months_as_senior|primary|last_date_as_primary|cust_type|rel_type|same_residence_country|diff_birth_country|emp_spouse|joining_channel|deceased|addr|province_cd|   province|active|            income|           segment|savings_acct|guarantees|current_acct|derivada_acct|payroll_acct|junior_acct|max_pa

### Renaming Fields

Here we further translate the values for segment column to English.

We also map the floating string values in `cust_type` column to integer string values so the values are consistent with already existing values in the column. The column originaly contained values like 1.0, 1, 2.0, 2, P.

Finally, we also map string "NA" to a number 9999 since "NA" is not the same as the NaN type.

In [None]:
# Rename values

%%spark

from pyspark.sql.functions import when
values_renamed_sdf = na_dropped_sdf.withColumn("segment", 
                                      when(na_dropped_sdf.segment == "02 - PARTICULARES", "individual")
                                      .when(na_dropped_sdf.segment == "03 - UNIVERSITARIO", "graduate_academic")
                                      .when(na_dropped_sdf.segment == "01 - TOP", "top")
                                      .otherwise(na_dropped_sdf.segment)) \
                                    .withColumn("cust_type", 
                                      when(na_dropped_sdf.cust_type == "1.0", "1") \
                                      .when(na_dropped_sdf.cust_type == "2.0", "2") \
                                      .when(na_dropped_sdf.cust_type == "3.0", "3") \
                                      .when(na_dropped_sdf.cust_type == "4.0", "4") \
                                      .otherwise(na_dropped_sdf.cust_type))\
                                    .withColumn("province_cd", 
                                      when(na_dropped_sdf.province_cd == "NA", '9999') \
                                      .otherwise(na_dropped_sdf.province_cd))
                                    

values_renamed_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+----------+-----------------+---+---+-------------------+--------+----------------+-------+--------------------+---------+--------+----------------------+------------------+----------+---------------+--------+----+-----------+-----------+------+------------------+-----------------+------------+----------+------------+-------------+------------+-----------+-------------------+---------------+--------------------+-------------------+--------------------+------------------+---------+-----+--------+--------+-----+-----+-----------+----------+------------+-------+----------+------------+
|year_month|cust_code|emp_status|residence_country|sex|age|first_contract_date|new_cust|months_as_senior|primary|last_date_as_primary|cust_type|rel_type|same_residence_country|diff_birth_country|emp_spouse|joining_channel|deceased|addr|province_cd|   province|active|            income|          segment|savings_acct|guarantees|current_acct|derivada_acct|payroll_acct|junior_acct|max_part

## Section 3: Missing Data Imputation

Here we apply necessary imputations to handle all of the null features.

### Missing Data Imputation: Creating `Other` Class

For categorical features, we cannot apply our numerical value imputations nor KNN imputation as we learned in class since there are no calculations to be done here. We work with the assumption that the data is not missing at random, but in fact missing because the option was none of the existing classes. As a result, categorical missing data can be lumped into a class of its own, which we will name "other".

The one exception is that if the data is missing for `emp_spouse`, we assume this means no (the mode).

Note that since is not numerical imputation, we don't need to add a Boolean flag field/column that indicates that we imputed values in a column because these categorical featuers will be one-hot encoded eventually anyway.

In [None]:
# Imputation with 'other'

%%spark

imputed_sdf = values_renamed_sdf\
              .fillna('other', subset=['cust_type', 'rel_type', 'joining_channel', 'province', 'segment'])\
              .fillna('N', subset='emp_spouse')
imputed_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+----------+-----------------+---+---+-------------------+--------+----------------+-------+--------------------+---------+--------+----------------------+------------------+----------+---------------+--------+----+-----------+-----------+------+------------------+-----------------+------------+----------+------------+-------------+------------+-----------+-------------------+---------------+--------------------+-------------------+--------------------+------------------+---------+-----+--------+--------+-----+-----+-----------+----------+------------+-------+----------+------------+
|year_month|cust_code|emp_status|residence_country|sex|age|first_contract_date|new_cust|months_as_senior|primary|last_date_as_primary|cust_type|rel_type|same_residence_country|diff_birth_country|emp_spouse|joining_channel|deceased|addr|province_cd|   province|active|            income|          segment|savings_acct|guarantees|current_acct|derivada_acct|payroll_acct|junior_acct|max_part

Casting columns in data to appropriate types.

In [None]:
%%spark

# Casting datatypes

from pyspark.sql.types import StringType

# Type conversions
type_casted_sdf = imputed_sdf.withColumn("year_month", imputed_sdf.year_month.cast("date")) \
                              .withColumn("first_contract_date", imputed_sdf.first_contract_date.cast("date")) \
                              .withColumn("last_date_as_primary", imputed_sdf.last_date_as_primary.cast("date"))

# Cast target variables to integer types
for col in type_casted_sdf.columns[24:]:
  type_casted_sdf = type_casted_sdf.withColumn(col, type_casted_sdf[col].cast('int'))

type_casted_sdf = type_casted_sdf \
  .withColumn("cust_code", type_casted_sdf["cust_code"].cast('int'))   \
  .withColumn("new_cust", type_casted_sdf["new_cust"].cast('int'))   \
  .withColumn("age", type_casted_sdf["age"].cast('int') )  \
  .withColumn("months_as_senior", type_casted_sdf["months_as_senior"].cast('int'))   \
  .withColumn("primary", type_casted_sdf["primary"].cast('int'))   \
  .withColumn("addr", type_casted_sdf["addr"].cast('int'))   \
  .withColumn("province_cd", type_casted_sdf["province_cd"].cast('int'))   \
  .withColumn("active", type_casted_sdf["active"].cast('int'))   \
  .withColumn("income", type_casted_sdf["income"].cast('int'))

type_casted_sdf.createOrReplaceTempView('type_casted_sdf')

# Verify datatype conversion
type_casted_sdf.show()
type_casted_sdf.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+----------+-----------------+---+---+-------------------+--------+----------------+-------+--------------------+---------+--------+----------------------+------------------+----------+---------------+--------+----+-----------+-----------+------+------+-----------------+------------+----------+------------+-------------+------------+-----------+-------------------+---------------+--------------------+-------------------+--------------------+------------------+---------+-----+--------+--------+-----+-----+-----------+----------+------------+-------+----------+------------+
|year_month|cust_code|emp_status|residence_country|sex|age|first_contract_date|new_cust|months_as_senior|primary|last_date_as_primary|cust_type|rel_type|same_residence_country|diff_birth_country|emp_spouse|joining_channel|deceased|addr|province_cd|   province|active|income|          segment|savings_acct|guarantees|current_acct|derivada_acct|payroll_acct|junior_acct|max_particular_acct|particular_a

In [None]:
%%spark

type_casted_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+---------+----------+-----------------+---+---+-------------------+--------+----------------+-------+--------------------+---------+--------+----------------------+------------------+----------+---------------+--------+----+-----------+-----------+------+------+-----------------+------------+----------+------------+-------------+------------+-----------+-------------------+---------------+--------------------+-------------------+--------------------+------------------+---------+-----+--------+--------+-----+-----+-----------+----------+------------+-------+----------+------------+
|year_month|cust_code|emp_status|residence_country|sex|age|first_contract_date|new_cust|months_as_senior|primary|last_date_as_primary|cust_type|rel_type|same_residence_country|diff_birth_country|emp_spouse|joining_channel|deceased|addr|province_cd|   province|active|income|          segment|savings_acct|guarantees|current_acct|derivada_acct|payroll_acct|junior_acct|max_particular_acct|particular_a

### Missing Data Imputation: Other Methods

Imputing payroll and pensions columns using forward fill strategy. The data consists of records of a customer over multiple months. For this time series data, it is appropriate to impute the missing values in these columns based on the value in the previous month for a particular customer.

Extracting customers that have null in either payroll or pensions_2 in atleast one of their records (cust_with_null_pay_pen).

In [None]:
%%spark
cust_with_null_pay_pen = spark.sql("""select cust_code from type_casted_sdf where payroll is null or pensions_2 is null""")

cust_with_null_pay_pen.createOrReplaceTempView("cust_with_null_pay_pen")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Extracting the first existing records for each of the above customers. These records will be joined with original dataset to identify and impute first records of customers with missing values for pensions_2/payroll.

In [None]:
%%spark
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Extracting first existing records for those customers that have at least one record having null in pensions/payroll
first_records_sdf =  type_casted_sdf.where("cust_code in (select cust_code from cust_with_null_pay_pen) ") \
    .withColumn("rank", dense_rank().over(Window.partitionBy("cust_code").orderBy("year_month"))) \
    .where("rank = 1") \
    .select("cust_code", "year_month", "payroll", "pensions_2")

first_records_sdf.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+-------+----------+
|cust_code|year_month|payroll|pensions_2|
+---------+----------+-------+----------+
|   832491|2015-01-28|      0|         0|
|   859777|2015-01-28|   null|      null|
|  1135961|2015-01-28|      0|         0|
|  1266833|2015-01-28|      0|         0|
|    51895|2015-01-28|      0|         0|
|   382647|2015-01-28|   null|      null|
|   942057|2015-01-28|   null|      null|
|  1132935|2015-01-28|   null|      null|
|  1183456|2015-01-28|   null|      null|
|  1342015|2015-01-28|      0|         0|
|    18009|2015-01-28|      0|         0|
|   126184|2015-01-28|   null|      null|
|   155925|2015-01-28|   null|      null|
|   731928|2015-01-28|      0|         0|
|  1366499|2015-01-28|      0|         0|
|   320551|2015-01-28|      0|         0|
|   871329|2015-01-28|   null|      null|
|  1097831|2015-01-28|   null|      null|
|  1171754|2015-01-28|      0|         0|
|  1379131|2015-01-28|      0|         0|
+---------+----------+-------+----

Joining the first records of the selected customer with the original dataset. In case the first record for those customers in the original dataset has a null then impute it with 0, otherwise orginal value is used. 

This will be necessary for forward filling values for customers that had null in their first existing records for the specific column. The resulting columns (first_imputed_payroll, first_imputed_pensions_2) will have payroll, pensions_2 data populated for atleast each customers first existing records. These columns will be utilized in forward fill imputation ahead.

Some of the columns are common in both joining tables (type_casted_sdf and first_records_renamed_sdf). These columns are renamed so there is no amiguity for spark when selecting these columns from the result of the join query.

In [None]:
%%spark
# First records imputed for pension/payroll
first_records_renamed_sdf = first_records_sdf.select( col("cust_code").alias("cc"), col("year_month").alias("ym"), col("payroll").alias("pr"), col("pensions_2").alias("pen_2") )

first_records_imputed_sdf =  type_casted_sdf.select("cust_code", "year_month", "payroll", "pensions_2") \
  .join(first_records_renamed_sdf, [type_casted_sdf.cust_code == first_records_renamed_sdf.cc, type_casted_sdf.year_month == first_records_renamed_sdf.ym], "left_outer") \
   .select(type_casted_sdf.cust_code, type_casted_sdf.year_month, type_casted_sdf.payroll, type_casted_sdf.pensions_2,
           when(type_casted_sdf.payroll.isNull() & first_records_renamed_sdf.cc.isNotNull(), 0) \
            .otherwise(type_casted_sdf.payroll).alias("first_imputed_payroll"),
          when(type_casted_sdf.pensions_2.isNull() & first_records_renamed_sdf.cc.isNotNull(), 0) \
            .otherwise(type_casted_sdf.pensions_2).alias("first_imputed_pensions_2")                                 
          )
   
cust_with_null_pay_pen.createOrReplaceTempView("cust_with_null_pay_pen")

first_records_imputed_sdf.where("cust_code in (select cust_code from cust_with_null_pay_pen)").orderBy('cust_code', 'year_month').show(1000)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+-------+----------+---------------------+------------------------+
|cust_code|year_month|payroll|pensions_2|first_imputed_payroll|first_imputed_pensions_2|
+---------+----------+-------+----------+---------------------+------------------------+
|    18009|2015-01-28|      0|         0|                    0|                       0|
|    18009|2015-02-28|   null|      null|                 null|                    null|
|    18009|2015-07-28|      0|         0|                    0|                       0|
|    18009|2015-08-28|      0|         0|                    0|                       0|
|    18009|2015-09-28|      0|         0|                    0|                       0|
|    51895|2015-01-28|      0|         0|                    0|                       0|
|    51895|2015-02-28|   null|      null|                 null|                    null|
|    51895|2015-07-28|      0|         0|                    0|                       0|
|    51895|2015-08-28

Using forward fill imputation to impute missing values for each customer using the pervious months value for the customer. Imputing for payroll column in this code block. The resluting column with all values imputed is named as imputed_payroll.

In [None]:
%%spark
# Forward fill
# Code adapted from: https://johnpaton.net/posts/forward-fill-spark/

from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

# define the window
window = Window.partitionBy('cust_code')\
               .orderBy('year_month')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column = last(first_records_imputed_sdf['first_imputed_payroll'], ignorenulls=True).over(window)

# do the fill for each row
payroll_imputed_sdf = first_records_imputed_sdf.withColumn('imputed_payroll', filled_column)

payroll_imputed_sdf.where("cust_code in (select cust_code from cust_with_null_pay_pen)").orderBy('cust_code', 'year_month').show(500)   

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+-------+----------+---------------------+------------------------+---------------+
|cust_code|year_month|payroll|pensions_2|first_imputed_payroll|first_imputed_pensions_2|imputed_payroll|
+---------+----------+-------+----------+---------------------+------------------------+---------------+
|    18009|2015-01-28|      0|         0|                    0|                       0|              0|
|    18009|2015-02-28|   null|      null|                 null|                    null|              0|
|    18009|2015-07-28|      0|         0|                    0|                       0|              0|
|    18009|2015-08-28|      0|         0|                    0|                       0|              0|
|    18009|2015-09-28|      0|         0|                    0|                       0|              0|
|    51895|2015-01-28|      0|         0|                    0|                       0|              0|
|    51895|2015-02-28|   null|      null|              

Using forward imputation to impute missing values for each customer using the previous months value for that customer. Imputing for pensions column in this code block. The resluting column with all values imputed is named as imputed_pensions_2.

In [None]:
%%spark
# Forward fill

# define the window
window = Window.partitionBy('cust_code')\
               .orderBy('year_month')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column = last(payroll_imputed_sdf['first_imputed_pensions_2'], ignorenulls=True).over(window)

# do the fill for each row
pensions_imputed_sdf = payroll_imputed_sdf.withColumn('imputed_pensions_2', filled_column)

pensions_imputed_sdf.where("cust_code in (select cust_code from cust_with_null_pay_pen)").orderBy('cust_code', 'year_month').show(5000) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+----------+-------+----------+---------------------+------------------------+---------------+------------------+
|cust_code|year_month|payroll|pensions_2|first_imputed_payroll|first_imputed_pensions_2|imputed_payroll|imputed_pensions_2|
+---------+----------+-------+----------+---------------------+------------------------+---------------+------------------+
|    18009|2015-01-28|      0|         0|                    0|                       0|              0|                 0|
|    18009|2015-02-28|   null|      null|                 null|                    null|              0|                 0|
|    18009|2015-07-28|      0|         0|                    0|                       0|              0|                 0|
|    18009|2015-08-28|      0|         0|                    0|                       0|              0|                 0|
|    18009|2015-09-28|      0|         0|                    0|                       0|              0|                 0|
|    518

Joining imputed columns (imputed_payroll, imputed_pensions_2) back to the original dataset.

In [None]:
# Join with original dataset
%%spark
all_imputed_sdf = pensions_imputed_sdf

all_imputed_sdf_renamed_sdf = all_imputed_sdf.select( col("cust_code").alias("cc"), col("year_month").alias("ym"), col("imputed_payroll"), col("imputed_pensions_2"), col("first_imputed_payroll") , col("first_imputed_pensions_2") )

pp_imputed_sdf =  type_casted_sdf.join(all_imputed_sdf_renamed_sdf, [type_casted_sdf.cust_code == all_imputed_sdf_renamed_sdf.cc, type_casted_sdf.year_month == all_imputed_sdf_renamed_sdf.ym], "inner")

pp_imputed_sdf.where("cust_code in (select cust_code from cust_with_null_pay_pen)").orderBy('cust_code', 'year_month').show(500)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Export: Uploading Cleaned Dataframe to S3

Setting Hadoop configuration to write to S3. Writing cleaned dataframe back to s3 as a single file.

In [None]:
%%spark

spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3.canned.acl', 'BucketOwnerFullControl')
sc._jsc.hadoopConfiguration().set('fs.s3.canned.acl', 'BucketOwnerFullControl')

pp_imputed_sdf.coalesce(1).write.format("csv").option("header","true").save('s3://cis545projectbmt/wrangle_data', mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# ===================  END HERE  ===========================