In [None]:
import os

spark_version = 'spark-3.1.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Ign:1 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Hit:4 http://archive.ubuntu.com/ubuntu focal InRelease
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Hit:6 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease
Get:7 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Get:8 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:11 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Hit:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease
Get:14 http://archive.ubuntu.com/ubuntu focal-updates

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-02-04 07:48:39--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2023-02-04 07:48:40 (6.00 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data_Cleaning").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [None]:
# read in data from s3
from pyspark import SparkFiles
url = 'https://hmda-data-bucket.s3.amazonaws.com/data/hmda_2017_ca_all-records_labels.csv'
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get('hmda_2017_ca_all-records_labels.csv'), sep=',', header=True, inferSchema=True)
df.show()

+----------+-------------+--------------------+-----------+-----------+--------------+---------+--------------------+-------------+-----------------+------------+--------------------+---------------+----------------+--------------------+-----------+--------------------+------------+--------------------+-----+----------+----------+----------+--------------------+-----------+-------------------+------------------------+-------------------+---------------------------+----------------------+---------------------+----------------+---------------------+----------------+---------------------+----------------+---------------------+----------------+---------------------+----------------+------------------------+-------------------+------------------------+-------------------+------------------------+-------------------+------------------------+-------------------+------------------------+-------------------+--------------------+-------------+---------------------+----------------+--------------

In [None]:
print(f'Column Count: {len(df.columns)} | Row Count: {df.count()}')

Column Count: 78 | Row Count: 1714459


In [None]:
def get_na_counts(data):
  isna_dict = {}

  for col in data.columns:
    null_count = data.filter(data[col].isNull()).count()
    isna_dict[f'{col}'] = null_count
    

  return isna_dict

In [None]:
df_isnull = get_na_counts(df)
df_isnull

{'as_of_year': 0,
 'respondent_id': 0,
 'agency_name': 0,
 'agency_abbr': 0,
 'agency_code': 0,
 'loan_type_name': 0,
 'loan_type': 0,
 'property_type_name': 0,
 'property_type': 0,
 'loan_purpose_name': 0,
 'loan_purpose': 0,
 'owner_occupancy_name': 0,
 'owner_occupancy': 0,
 'loan_amount_000s': 945,
 'preapproval_name': 0,
 'preapproval': 0,
 'action_taken_name': 0,
 'action_taken': 0,
 'msamd_name': 46808,
 'msamd': 46529,
 'state_name': 0,
 'state_abbr': 0,
 'state_code': 0,
 'county_name': 1929,
 'county_code': 1929,
 'census_tract_number': 7461,
 'applicant_ethnicity_name': 0,
 'applicant_ethnicity': 0,
 'co_applicant_ethnicity_name': 0,
 'co_applicant_ethnicity': 0,
 'applicant_race_name_1': 0,
 'applicant_race_1': 0,
 'applicant_race_name_2': 1701227,
 'applicant_race_2': 1701227,
 'applicant_race_name_3': 1713539,
 'applicant_race_3': 1713539,
 'applicant_race_name_4': 1714209,
 'applicant_race_4': 1714209,
 'applicant_race_name_5': 1714278,
 'applicant_race_5': 1714278,
 'co

In [None]:
drop_df = df.drop(
    'as_of_year',
    'respondent_id',
    'edit_status',
    'edit_status_name',
    'sequence_number',
    'application_date_indicator',
    'applicant_race_2',
    'applicant_race_3',
    'applicant_race_4',
    'applicant_race_5',
    'applicant_race_name_2',
    'applicant_race_name_3',
    'applicant_race_name_4',
    'applicant_race_name_5',
    'co_applicant_race_2',
    'co_applicant_race_3',
    'co_applicant_race_4',
    'co_applicant_race_5',
    'co_applicant_race_name_2',
    'co_applicant_race_name_3',
    'co_applicant_race_name_4',
    'co_applicant_race_name_5',
    'denial_reason_1',
    'denial_reason_2',
    'denial_reason_3',
    'denial_reason_name_1',
    'denial_reason_name_2',
    'denial_reason_name_3',
    'rate_spread'
    )

In [None]:
print(f'Column Count: {len(drop_df.columns)} | Row Count: {drop_df.count()}')

Column Count: 49 | Row Count: 1714459


In [None]:
drop_df_isnull = get_na_counts(drop_df)
drop_df_isnull

{'agency_name': 0,
 'agency_abbr': 0,
 'agency_code': 0,
 'loan_type_name': 0,
 'loan_type': 0,
 'property_type_name': 0,
 'property_type': 0,
 'loan_purpose_name': 0,
 'loan_purpose': 0,
 'owner_occupancy_name': 0,
 'owner_occupancy': 0,
 'loan_amount_000s': 945,
 'preapproval_name': 0,
 'preapproval': 0,
 'action_taken_name': 0,
 'action_taken': 0,
 'msamd_name': 46808,
 'msamd': 46529,
 'state_name': 0,
 'state_abbr': 0,
 'state_code': 0,
 'county_name': 1929,
 'county_code': 1929,
 'census_tract_number': 7461,
 'applicant_ethnicity_name': 0,
 'applicant_ethnicity': 0,
 'co_applicant_ethnicity_name': 0,
 'co_applicant_ethnicity': 0,
 'applicant_race_name_1': 0,
 'applicant_race_1': 0,
 'co_applicant_race_name_1': 0,
 'co_applicant_race_1': 0,
 'applicant_sex_name': 0,
 'applicant_sex': 0,
 'co_applicant_sex_name': 0,
 'co_applicant_sex': 0,
 'applicant_income_000s': 195886,
 'purchaser_type_name': 0,
 'purchaser_type': 0,
 'hoepa_status_name': 0,
 'hoepa_status': 0,
 'lien_status_na

In [None]:
from pyspark.sql.functions import monotonically_increasing_id

def clean_data(data):
  
  data = data.dropna(how='any')
  data = data.dropDuplicates()

  clean_data = data.withColumn('application_id', monotonically_increasing_id())

  return clean_data

In [None]:
cleaned_df = clean_data(drop_df)

In [None]:
print(f'Column Count: {len(cleaned_df.columns)} | Row Count: {cleaned_df.count()}')

Column Count: 50 | Row Count: 1473053


In [None]:
df_type = []
for dtype in cleaned_df.dtypes:
  if dtype[1] not in df_type:
    df_type.append(dtype[1])

print(df_type)

['string', 'int', 'double', 'bigint']


In [None]:
string_columns = [dtype[0] for dtype in cleaned_df.dtypes if 'string' in dtype[1]] + ['application_id']
text_df = cleaned_df.select(string_columns)
text_df.columns

['agency_name',
 'agency_abbr',
 'loan_type_name',
 'property_type_name',
 'loan_purpose_name',
 'owner_occupancy_name',
 'preapproval_name',
 'action_taken_name',
 'msamd_name',
 'state_name',
 'state_abbr',
 'county_name',
 'applicant_ethnicity_name',
 'co_applicant_ethnicity_name',
 'applicant_race_name_1',
 'co_applicant_race_name_1',
 'applicant_sex_name',
 'co_applicant_sex_name',
 'purchaser_type_name',
 'hoepa_status_name',
 'lien_status_name',
 'application_id']

In [None]:
num_columns = [dtype[0] for dtype in cleaned_df.dtypes if 'string' not in dtype[1]]
numeric_df = cleaned_df.select(num_columns)
numeric_df.dtypes

[('agency_code', 'int'),
 ('loan_type', 'int'),
 ('property_type', 'int'),
 ('loan_purpose', 'int'),
 ('owner_occupancy', 'int'),
 ('loan_amount_000s', 'int'),
 ('preapproval', 'int'),
 ('action_taken', 'int'),
 ('msamd', 'int'),
 ('state_code', 'int'),
 ('county_code', 'int'),
 ('census_tract_number', 'double'),
 ('applicant_ethnicity', 'int'),
 ('co_applicant_ethnicity', 'int'),
 ('applicant_race_1', 'int'),
 ('co_applicant_race_1', 'int'),
 ('applicant_sex', 'int'),
 ('co_applicant_sex', 'int'),
 ('applicant_income_000s', 'int'),
 ('purchaser_type', 'int'),
 ('hoepa_status', 'int'),
 ('lien_status', 'int'),
 ('population', 'int'),
 ('minority_population', 'double'),
 ('hud_median_family_income', 'int'),
 ('tract_to_msamd_income', 'double'),
 ('number_of_owner_occupied_units', 'int'),
 ('number_of_1_to_4_family_units', 'int'),
 ('application_id', 'bigint')]

In [None]:
double_types = [dtype[0] for dtype in cleaned_df.dtypes if 'double' in dtype[1]]
double_types

['census_tract_number', 'minority_population', 'tract_to_msamd_income']

In [None]:
print(f'text_data: Column Count: {len(text_df.columns)} | Row Count: {text_df.count()}')

text_data: Column Count: 22 | Row Count: 1473053


In [None]:
print(f'numeric_data: Column Count: {len(numeric_df.columns)} | Row Count: {numeric_df.count()}')

numeric_data: Column Count: 29 | Row Count: 1473053


In [None]:
cleaned_df.write.csv('./data/cleaned_data.csv')
text_df.write.csv('./data/text_data.csv')
numeric_df.write.csv('./data/numeric_data.csv')

# New Section

In [None]:
cleaned_columns = list(cleaned_df.columns)
text_columns = list(text_df.columns)
numeric_columns = list(numeric_df.columns)

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://hmda-final-db.cfo8yzx2osax.us-east-1.rds.amazonaws.com:5432/hmda_db"
config = {"user":"postgres", 
          "password": "Password", 
          "driver":"org.postgresql.Driver"}

In [None]:
text_df.write.jdbc(url=jdbc_url, table='text_data', mode=mode, properties=config)

In [None]:
numeric_df.write.jdbc(url=jdbc_url, table='numeric_data', mode=mode, properties=config)

In [None]:
print(text_columns)
print(numeric_columns)

['agency_name', 'agency_abbr', 'loan_type_name', 'property_type_name', 'loan_purpose_name', 'owner_occupancy_name', 'preapproval_name', 'action_taken_name', 'msamd_name', 'state_name', 'state_abbr', 'county_name', 'applicant_ethnicity_name', 'co_applicant_ethnicity_name', 'applicant_race_name_1', 'co_applicant_race_name_1', 'applicant_sex_name', 'co_applicant_sex_name', 'purchaser_type_name', 'hoepa_status_name', 'lien_status_name', 'application_id']
['agency_code', 'loan_type', 'property_type', 'loan_purpose', 'owner_occupancy', 'loan_amount_000s', 'preapproval', 'action_taken', 'msamd', 'state_code', 'county_code', 'census_tract_number', 'applicant_ethnicity', 'co_applicant_ethnicity', 'applicant_race_1', 'co_applicant_race_1', 'applicant_sex', 'co_applicant_sex', 'applicant_income_000s', 'purchaser_type', 'hoepa_status', 'lien_status', 'population', 'minority_population', 'hud_median_family_income', 'tract_to_msamd_income', 'number_of_owner_occupied_units', 'number_of_1_to_4_family_