In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Finance-Complaint').getOrCreate()

In [None]:
from pyspark.sql.functions import col, when

# read the JSON file
df = spark.read.json("gs://finance-complaint-bucket/complaints.json/complaints.json")

# replace empty strings with "null"
for col_name in df.columns:
    df = df.na.fill({col_name: "null"})
    df = df.withColumn(col_name, when(col(col_name) == "", "null").otherwise(col(col_name)))

# show the result
df.show()

In [3]:
# df.write.mode('append').parquet('gs://finance-complaint-bucket/parquet_file')

In [4]:
df = spark.read.parquet('gs://finance-complaint-bucket/parquet_file')

                                                                                

In [5]:
df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+---------------+--------------------+-----------------------+--------------------+------------+-----------------------+-------------------------+-----------------+-------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+-------------+--------------+------+--------+
|_corrupt_record|             company|company_public_response|    company_response|complaint_id|complaint_what_happened|consumer_consent_provided|consumer_disputed|date_received|date_sent_to_company|               issue|             product|state|           sub_issue|         sub_product|submitted_via|          tags|timely|zip_code|
+---------------+--------------------+-----------------------+--------------------+------------+-----------------------+-------------------------+-----------------+-------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+-------------+--------------+------+-----

                                                                                

In [6]:
df.count()

                                                                                

3294357

In [7]:
from pyspark.sql.functions import col, when

for col_name in df.columns:
    df = df.withColumn(col_name, when(col(col_name) == "null", None).otherwise(col(col_name)))

In [8]:
from pyspark.sql.functions import col

In [9]:
df.select('complaint_id')

DataFrame[complaint_id: string]

## Conf

In [10]:
def update_column_attribute(df):
    for column in df.columns:
        setattr(df, column, column)

In [11]:
update_column_attribute(df)

## Printing unique values in each columns

In [12]:
for column in df.columns:
    print(f"{column}:{df.select(column).distinct().count()}")

                                                                                

_corrupt_record:3


                                                                                

company:6628


                                                                                

company_public_response:12


                                                                                

company_response:9


                                                                                

complaint_id:3294356


                                                                                

complaint_what_happened:1032710


                                                                                

consumer_consent_provided:6


                                                                                

consumer_disputed:4


                                                                                

date_received:4081


                                                                                

date_sent_to_company:4030


                                                                                

issue:166


                                                                                

product:19


                                                                                

state:64


                                                                                

sub_issue:222


                                                                                

sub_product:77


                                                                                

submitted_via:8


                                                                                

tags:4


                                                                                

timely:3




zip_code:34740


                                                                                

In [13]:
complaint_table = "complaint"
df.createOrReplaceTempView(complaint_table)

In [14]:
sql = spark.sql

In [15]:
n_row = df.count()
n_row

3294357

In [16]:
# Target_column:
df.groupBy(df.consumer_disputed).count().show()



+-----------------+-------+
|consumer_disputed|  count|
+-----------------+-------+
|             null|      2|
|               No| 620049|
|              N/A|2525928|
|              Yes| 148378|
+-----------------+-------+



                                                                                

In [17]:
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- company: string (nullable = true)
 |-- company_public_response: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_id: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- date_received: string (nullable = true)
 |-- date_sent_to_company: string (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- sub_issue: string (nullable = true)
 |-- sub_product: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [18]:
missing_target_df = sql(f"select * from {complaint_table} where {df.consumer_disputed}='N/A'")

In [19]:
df = sql(f"select * from {complaint_table} where {df.consumer_disputed}<>'N/A'")

In [20]:
df.count()

                                                                                

768427

In [21]:
col_name = "complaint_what_happened"

# replace null values with "N/A"
df = df.withColumn(col_name, when(col(col_name).isNull(), "N/A").otherwise(col(col_name)))

In [22]:
complaint_table = 'complaint'
df.createOrReplaceTempView(complaint_table)

In [23]:
update_column_attribute(df)

In [24]:
def perform_null_analysis(df,table_name):
    null_value_analysis=[]
    n_row=df.count()
    for column in df.columns:
        print(column)
        response = sql(f"select {n_row} as  total_row,count(*) as null_row_{column},(count(*)*100)/{n_row} as missing_percentage,'{column}' as  column_name from {table_name} where {column} is null").collect()
        null_value_analysis.append(response)
    return null_value_analysis

In [25]:
null_report = perform_null_analysis(df, complaint_table)

                                                                                

_corrupt_record


                                                                                

company


                                                                                

company_public_response


                                                                                

company_response


                                                                                

complaint_id


                                                                                

complaint_what_happened


                                                                                

consumer_consent_provided


                                                                                

consumer_disputed


                                                                                

date_received


                                                                                

date_sent_to_company


                                                                                

issue


                                                                                

product


                                                                                

state


                                                                                

sub_issue


                                                                                

sub_product


                                                                                

submitted_via


                                                                                

tags


                                                                                

timely


                                                                                

zip_code


                                                                                

In [26]:
null_report

[[Row(total_row=768427, null_row__corrupt_record=768427, missing_percentage=100.0, column_name='_corrupt_record')],
 [Row(total_row=768427, null_row_company=0, missing_percentage=0.0, column_name='company')],
 [Row(total_row=768427, null_row_company_public_response=572718, missing_percentage=74.5312176693427, column_name='company_public_response')],
 [Row(total_row=768427, null_row_company_response=0, missing_percentage=0.0, column_name='company_response')],
 [Row(total_row=768427, null_row_complaint_id=0, missing_percentage=0.0, column_name='complaint_id')],
 [Row(total_row=768427, null_row_complaint_what_happened=0, missing_percentage=0.0, column_name='complaint_what_happened')],
 [Row(total_row=768427, null_row_consumer_consent_provided=38, missing_percentage=0.0049451672052127265, column_name='consumer_consent_provided')],
 [Row(total_row=768427, null_row_consumer_disputed=0, missing_percentage=0.0, column_name='consumer_disputed')],
 [Row(total_row=768427, null_row_date_received=0

In [27]:
def unwanted_column_by_missing_pct(null_value_analysis, pct_thres=20):
    columns = []
    for row in null_value_analysis:
        row_info = row[0]
        if row_info.missing_percentage > pct_thres:
            print(row_info)
            columns.append(row_info.column_name)
    return columns

In [28]:
columns_unwanted = unwanted_column_by_missing_pct(null_report)

Row(total_row=768427, null_row__corrupt_record=768427, missing_percentage=100.0, column_name='_corrupt_record')
Row(total_row=768427, null_row_company_public_response=572718, missing_percentage=74.5312176693427, column_name='company_public_response')
Row(total_row=768427, null_row_sub_issue=455400, missing_percentage=59.26392487510199, column_name='sub_issue')
Row(total_row=768427, null_row_sub_product=235162, missing_percentage=30.603037113479875, column_name='sub_product')
Row(total_row=768427, null_row_tags=659931, missing_percentage=85.88076681324316, column_name='tags')


In [29]:
columns_unwanted

['_corrupt_record',
 'company_public_response',
 'sub_issue',
 'sub_product',
 'tags']

In [30]:
df.columns

['_corrupt_record',
 'company',
 'company_public_response',
 'company_response',
 'complaint_id',
 'complaint_what_happened',
 'consumer_consent_provided',
 'consumer_disputed',
 'date_received',
 'date_sent_to_company',
 'issue',
 'product',
 'state',
 'sub_issue',
 'sub_product',
 'submitted_via',
 'tags',
 'timely',
 'zip_code']

In [31]:
def drop_column(df, columns):
    selected_column = list(filter(lambda x:x not in columns, df.columns))
    selected_column = ",".join(selected_column)
    df = sql(f"select {selected_column} from {complaint_table}")
    return df

In [32]:
df = drop_column(df, columns_unwanted)

In [33]:
#dropping feature as we have found more than 20% of null value in above columns

In [None]:
df.collect()

                                                                                

In [71]:
# total number of rows:
df.count()

                                                                                

768427

In [72]:
# unique values in each columns:

for column in df.columns:
    print(f"{column}:{df.select(column).distinct().count()}")

                                                                                

company:4282


                                                                                

company_response:7


                                                                                

complaint_id:768427


                                                                                

complaint_what_happened:160961


                                                                                

consumer_consent_provided:6


                                                                                

consumer_disputed:2


                                                                                

date_received:1970


                                                                                

date_sent_to_company:2049


                                                                                

issue:99


                                                                                

product:13


                                                                                

state:63


                                                                                

submitted_via:6


                                                                                

timely:2




zip_code:28757


                                                                                

In [73]:
update_column_attribute(df)

In [74]:
df.printSchema()

root
 |-- company: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_id: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- date_received: timestamp (nullable = true)
 |-- date_sent_to_company: timestamp (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [75]:
df.groupBy(df.company_response).count().show()
df.groupBy(df.consumer_consent_provided).count().show()
df.groupBy(df.consumer_disputed).count().show()
df.groupBy(df.product).count().show()
df.groupBy(df.submitted_via).count().show()
df.groupBy(df.timely).count().show()

                                                                                

+--------------------+------+
|    company_response| count|
+--------------------+------+
|   Untimely response|  2870|
|Closed with non-m...| 95432|
|Closed without re...| 17868|
|Closed with monet...| 51382|
|Closed with expla...|577960|
|              Closed| 17611|
|  Closed with relief|  5304|
+--------------------+------+



                                                                                

+-------------------------+------+
|consumer_consent_provided| count|
+-------------------------+------+
|                     null|    38|
|        Consent withdrawn|     8|
|                    Other|  8451|
|         Consent provided|164082|
|     Consent not provided|125338|
|                      N/A|470510|
+-------------------------+------+



                                                                                

+-----------------+------+
|consumer_disputed| count|
+-----------------+------+
|               No|620049|
|              Yes|148378|
+-----------------+------+



                                                                                

+--------------------+------+
|             product| count|
+--------------------+------+
|     Debt collection|145780|
|    Virtual currency|    18|
|         Payday loan|  5543|
|     Money transfers|  5354|
|Checking or savin...|     3|
|            Mortgage|226894|
|        Prepaid card|  3819|
|    Credit reporting|140429|
|       Consumer Loan| 31596|
|         Credit card| 89190|
|Bank account or s...| 86206|
|Other financial s...|  1058|
|        Student loan| 32537|
+--------------------+------+



                                                                                

+-------------+------+
|submitted_via| count|
+-------------+------+
|        Phone| 52197|
|          Fax| 10924|
|        Email|   348|
|     Referral|133247|
|  Postal mail| 48672|
|          Web|523039|
+-------------+------+

+------+------+
|timely| count|
+------+------+
|    No| 21465|
|   Yes|746962|
+------+------+



                                                                                

In [76]:
df.groupBy(df.product).count().collect()

                                                                                

[Row(product='Debt collection', count=145780),
 Row(product='Virtual currency', count=18),
 Row(product='Payday loan', count=5543),
 Row(product='Money transfers', count=5354),
 Row(product='Checking or savings account', count=3),
 Row(product='Mortgage', count=226894),
 Row(product='Prepaid card', count=3819),
 Row(product='Credit reporting', count=140429),
 Row(product='Consumer Loan', count=31596),
 Row(product='Credit card', count=89190),
 Row(product='Bank account or service', count=86206),
 Row(product='Other financial service', count=1058),
 Row(product='Student loan', count=32537)]

In [77]:
#df.product no null value

####  Feature Engineering Decision

In [78]:
ONE_HOT_FEATURE = [df.company_response, df.consumer_consent_provided, df.submitted_via, df.timely]
BINARY_ENCODING = [df.product]
TARGET_FEATURE = [df.consumer_disputed] #df.consumer_disputed target feature label encoding


In [79]:
from pyspark.sql.functions import count, isnull

column_name = 'company_response'

null_count = df.filter(isnull(df[column_name])).count()
null_count
#df.company_response No null value

                                                                                

0

In [80]:
df.groupBy(df.consumer_consent_provided).count().show()



+-------------------------+------+
|consumer_consent_provided| count|
+-------------------------+------+
|                     null|    38|
|        Consent withdrawn|     8|
|                    Other|  8451|
|         Consent provided|164082|
|     Consent not provided|125338|
|                      N/A|470510|
+-------------------------+------+



                                                                                

In [81]:
#df.consumer_consent_provided  replace null with top category 

In [82]:
remaining_column = list(filter(lambda x:x not in ONE_HOT_FEATURE+BINARY_ENCODING+TARGET_FEATURE,df.columns))

In [83]:
remaining_column

['company',
 'complaint_id',
 'complaint_what_happened',
 'date_received',
 'date_sent_to_company',
 'issue',
 'state',
 'zip_code']

In [84]:
df.groupBy(df.company).count().show()



+--------------------+-----+
|             company|count|
+--------------------+-----+
|FORD MOTOR CREDIT...|  477|
|ATLANTIC COAST MO...|    2|
|Eagle Home Mortga...|   65|
|       PlusFour, Inc|   57|
|   CU Recovery, Inc.|   19|
|Delta Management ...|  109|
|Law Office of Joe...|   27|
|Praxis Financial ...|   50|
|Worldwide Process...|   58|
|Professional Serv...|    1|
|National Recoveri...|   42|
|SUN FINANCE COMPA...|    6|
|           AFNI INC.| 1644|
|Medical Data Syst...|  297|
|A.R.M. Solutions,...|   73|
|Law Office of Har...|   15|
|Southern Credit R...|   37|
|  DMB Financial, LLC|    4|
|Capital Markets C...|   32|
|   DCN Holdings Inc.|   23|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [85]:
df.groupBy(df.company).count().count()

                                                                                

4282

In [86]:
FREQUENCY_ENCODING = [df.company]

In [87]:
df.select(df.company).count() - df.select(df.company).dropna().count()

                                                                                

0

In [88]:
column_name = 'zip_code'

null_count = df.filter(isnull(df[column_name])).count()
null_count

                                                                                

5647

In [89]:
column_name = 'state'

null_count = df.filter(isnull(df[column_name])).count()
null_count

                                                                                

5650

In [90]:
column_name = 'consumer_consent_provided'

null_count = df.filter(isnull(df[column_name])).count()
null_count

                                                                                

38

In [91]:
REPLACE_NULL_WITH_TOP_VALUE = [df.zip_code, df.state, df.consumer_consent_provided]

In [92]:
REPLACE_NULL_WITH_TOP_VALUE

['zip_code', 'state', 'consumer_consent_provided']

In [93]:
print(f"Total number of row: {df.count()}")
for column in remaining_column:
    print(f"{column}:  {df.select(column).distinct().count()}")

                                                                                

Total number of row: 768427


                                                                                

company:  4282


                                                                                

complaint_id:  768427


                                                                                

complaint_what_happened:  160961


                                                                                

date_received:  1970


                                                                                

date_sent_to_company:  2049


                                                                                

issue:  99


                                                                                

state:  63




zip_code:  28757


                                                                                

In [94]:
df.groupBy(df.issue).count().show()



+--------------------+------+
|               issue| count|
+--------------------+------+
|Communication tac...| 23832|
|Application proce...|   540|
|Advertising and m...|  2959|
|Balance transfer fee|   221|
|        Adding money|   202|
|Customer service/...|   283|
|Closing/Cancellin...|  6389|
|Credit card prote...|  2728|
|Received a loan I...|   615|
|Can't stop charge...|   510|
|Forbearance / Wor...|   556|
|          Bankruptcy|   448|
|                Fees|   232|
|Credit determination|  3057|
|Loan modification...|112307|
|    Cash advance fee|   196|
|Other transaction...|  1491|
|Customer service ...|  3504|
|      Getting a loan|   664|
|  Delinquent account|  3218|
+--------------------+------+
only showing top 20 rows



                                                                                

In [95]:
remaining_column

['company',
 'complaint_id',
 'complaint_what_happened',
 'date_received',
 'date_sent_to_company',
 'issue',
 'state',
 'zip_code']

In [96]:
sql(f"select {df.complaint_what_happened} from {complaint_table} limit 5").collect()

[Row(complaint_what_happened='I have talked to American Express customer service six times and their Credit Bureau Agency three times and they still have not removed the late payment status affecting my credit score. I have been in a relationship with American Express for eight years ( longer than any boyfriend I \'ve had! ) I was assured by XXXX in Customer Service three times over the phone that this late payment would be removed from my account. I had set up an auto pay and there must have been a computer glitch because the payment did not go through. This is XXXX payment over eight years, people! It was quickly rectified and my account has been in excellent standing ever since. \n\nWhen the late payment status was not removed, I called customer service again, and asked them to review the phone call with XXXX because they said they would n\'t remove the late payment. They said they would do so and I would get a phone call from a manager within XXXX to ten days. I never got a call fr

In [97]:
TOKENIZER_FEATURE = [df.complaint_what_happened]

In [98]:
ONE_HOT_FEATURE

['company_response', 'consumer_consent_provided', 'submitted_via', 'timely']

In [99]:
FREQUENCY_ENCODING = [df.company,df.issue,df.state,df.zip_code]

In [100]:
from pyspark.sql.types import TimestampType

In [101]:
df=df.withColumn(df.date_received,col(df.date_received).cast(TimestampType()))

In [102]:
update_column_attribute(df)

In [103]:
df=df.withColumn(df.date_sent_to_company,col(df.date_sent_to_company).cast(TimestampType()))
df.printSchema()

root
 |-- company: string (nullable = true)
 |-- company_response: string (nullable = true)
 |-- complaint_id: string (nullable = true)
 |-- complaint_what_happened: string (nullable = true)
 |-- consumer_consent_provided: string (nullable = true)
 |-- consumer_disputed: string (nullable = true)
 |-- date_received: timestamp (nullable = true)
 |-- date_sent_to_company: timestamp (nullable = true)
 |-- issue: string (nullable = true)
 |-- product: string (nullable = true)
 |-- state: string (nullable = true)
 |-- submitted_via: string (nullable = true)
 |-- timely: string (nullable = true)
 |-- zip_code: string (nullable = true)



In [104]:
df.select([df.date_received,df.date_sent_to_company]).show()

+-------------------+--------------------+
|      date_received|date_sent_to_company|
+-------------------+--------------------+
|2017-01-03 00:00:00| 2017-01-03 00:00:00|
|2016-07-25 00:00:00| 2016-07-26 00:00:00|
|2016-06-09 00:00:00| 2016-06-09 00:00:00|
|2016-05-31 00:00:00| 2016-05-31 00:00:00|
|2016-10-28 00:00:00| 2016-10-28 00:00:00|
|2016-09-17 00:00:00| 2016-09-17 00:00:00|
|2016-03-16 00:00:00| 2016-04-03 00:00:00|
|2012-08-20 00:00:00| 2012-08-21 00:00:00|
|2017-02-07 00:00:00| 2017-02-07 00:00:00|
|2017-02-16 00:00:00| 2017-02-16 00:00:00|
|2014-10-03 00:00:00| 2014-10-08 00:00:00|
|2016-08-06 00:00:00| 2016-08-06 00:00:00|
|2016-10-20 00:00:00| 2016-10-20 00:00:00|
|2016-05-24 00:00:00| 2016-05-31 00:00:00|
|2013-09-30 00:00:00| 2013-10-01 00:00:00|
|2013-11-12 00:00:00| 2013-11-12 00:00:00|
|2015-08-12 00:00:00| 2015-08-12 00:00:00|
|2015-07-16 00:00:00| 2015-07-16 00:00:00|
|2017-02-02 00:00:00| 2017-02-02 00:00:00|
|2014-08-04 00:00:00| 2014-08-04 00:00:00|
+----------

In [105]:
from pyspark.sql.types import  LongType

In [106]:
df = df.withColumn("diff_in_days",(col(df.date_sent_to_company).cast(LongType())-col(df.date_received).cast(LongType()))/(60*60*24))

TypeError: Column is not iterable

In [107]:
df = df.withColumn("diff_in_days", (col(df.date_sent_to_company).cast(LongType())-col(df.date_received).cast(LongType()))/(60*60*24)).alias("diff_in_days")

TypeError: Column is not iterable

In [108]:
from pyspark.sql.functions import col
from pyspark.sql.types import LongType

df = df.withColumn("diff_in_days", (col("date_sent_to_company").cast(LongType())-col("date_received").cast(LongType()))/(60*60*24)).alias("diff_in_days")

In [110]:
update_column_attribute(df)

In [111]:
remove_column = [df.date_received,df.date_sent_to_company]

In [112]:
df=df.drop(col(df.date_received)).drop(col(df.date_sent_to_company))

In [113]:
df.show()

+--------------------+--------------------+------------+-----------------------+-------------------------+-----------------+--------------------+--------------------+-----+-------------+------+--------+------------+
|             company|    company_response|complaint_id|complaint_what_happened|consumer_consent_provided|consumer_disputed|               issue|             product|state|submitted_via|timely|zip_code|diff_in_days|
+--------------------+--------------------+------------+-----------------------+-------------------------+-----------------+--------------------+--------------------+-----+-------------+------+--------+------------+
|AMERICAN EXPRESS ...|Closed with non-m...|     2272516|   I have talked to ...|         Consent provided|               No|Customer service ...|         Credit card|   NY|          Web|   Yes|   11415|         0.0|
|PENTAGON FEDERAL ...|Closed with monet...|     2027052|   I have been a Pen...|         Consent provided|               No|Shopping for

In [114]:
NUMERICAL_FEATURE = [df.diff_in_days,]
ONE_HOT_FEATURE+\
FREQUENCY_ENCODING+\
BINARY_ENCODING+\
TARGET_FEATURE

['company_response',
 'consumer_consent_provided',
 'submitted_via',
 'timely',
 'company',
 'issue',
 'state',
 'zip_code',
 'product',
 'consumer_disputed']

In [115]:
BINARY_ENCODING

['product']

In [116]:
FREQUENCY_ENCODING=FREQUENCY_ENCODING+BINARY_ENCODING

In [117]:
FREQUENCY_ENCODING.remove('issue')

In [118]:
FREQUENCY_ENCODING

['company', 'state', 'zip_code', 'product']

In [119]:
ONE_HOT_FEATURE

['company_response', 'consumer_consent_provided', 'submitted_via', 'timely']