# Homework 3

We are greatly inspired by the [Consumer Complaints](https://github.com/InsightDataScience/consumer_complaints) challenge from [InsightDataScience](https://github.com/InsightDataScience/). In fact, we are going to tackle the same challenge but using Apache Spark. Please read through the challenge at the following link:

<https://github.com/InsightDataScience/consumer_complaints>

The most important sections are **Input dataset** and **Expected output**, which are quoted below:

## Input dataset
For this challenge, when we grade your submission, an input file, `complaints.csv`, will be moved to the top-most `input` directory of your repository. Your code must read that input file, process it and write the results to an output file, `report.csv` that your code must place in the top-most `output` directory of your repository.

Below are the contents of an example `complaints.csv` file:
```
Date received,Product,Sub-product,Issue,Sub-issue,Consumer complaint narrative,Company public response,Company,State,ZIP code,Tags,Consumer consent provided?,Submitted via,Date sent to company,Company response to consumer,Timely response?,Consumer disputed?,Complaint ID
2019-09-24,Debt collection,I do not know,Attempts to collect debt not owed,Debt is not yours,"transworld systems inc. is trying to collect a debt that is not mine, not owed and is inaccurate.",,TRANSWORLD SYSTEMS INC,FL,335XX,,Consent provided,Web,2019-09-24,Closed with explanation,Yes,N/A,3384392
2019-09-19,"Credit reporting, credit repair services, or other personal consumer reports",Credit reporting,Incorrect information on your report,Information belongs to someone else,,Company has responded to the consumer and the CFPB and chooses not to provide a public response,Experian Information Solutions Inc.,PA,15206,,Consent not provided,Web,2019-09-20,Closed with non-monetary relief,Yes,N/A,3379500
2020-01-06,"Credit reporting, credit repair services, or other personal consumer reports",Credit reporting,Incorrect information on your report,Information belongs to someone else,,,Experian Information Solutions Inc.,CA,92532,,N/A,Email,2020-01-06,In progress,Yes,N/A,3486776
2019-10-24,"Credit reporting, credit repair services, or other personal consumer reports",Credit reporting,Incorrect information on your report,Information belongs to someone else,,Company has responded to the consumer and the CFPB and chooses not to provide a public response,"TRANSUNION INTERMEDIATE HOLDINGS, INC.",CA,925XX,,Other,Web,2019-10-24,Closed with explanation,Yes,N/A,3416481
2019-11-20,"Credit reporting, credit repair services, or other personal consumer reports",Credit reporting,Incorrect information on your report,Account information incorrect,I would like the credit bureau to correct my XXXX XXXX XXXX XXXX balance. My correct balance is XXXX,Company has responded to the consumer and the CFPB and chooses not to provide a public response,"TRANSUNION INTERMEDIATE HOLDINGS, INC.",TX,77004,,Consent provided,Web,2019-11-20,Closed with explanation,Yes,N/A,3444592
```
Each line of the input file, except for the first-line header, represents one complaint. Consult the [Consumer Finance Protection Bureau's technical documentation](https://cfpb.github.io/api/ccdb/fields.html) for a description of each field.  

* Notice that complaints were not listed in chronological order
* In 2019, there was a complaint against `TRANSWORLD SYSTEMS INC` for `Debt collection`
* Also in 2019, `Experian Information Solutions Inc.` received one complaint for `Credit reporting, credit repair services, or other personal consumer reports` while `TRANSUNION INTERMEDIATE HOLDINGS, INC.` received two
* In 2020, `Experian Information Solutions Inc.` received a complaint for `Credit reporting, credit repair services, or other personal consumer reports`

In summary that means
* In 2019, there was one complaint for `Debt collection`, and 100% of it went to one company
* Also in 2019, three complaints against two companies were received for `Credit reporting, credit repair services, or other personal consumer reports` and 2/3rd of them (or 67% if we rounded the percentage to the nearest whole number) were against one company (TRANSUNION INTERMEDIATE HOLDINGS, INC.)
* In 2020, only one complaint was received for `Credit reporting, credit repair services, or other personal consumer reports`, and so the highest percentage received by one company would be 100%

For this challenge, we want for each product and year that complaints were received, the total number of complaints, number of companies receiving a complaint and the highest percentage of complaints directed at a single company.

For the purposes of this challenge, all names, including company and product, should be treated as case insensitive. For example, "Acme", "ACME", and "acme" would represent the same company.

## Expected output

After reading and processing the input file, your code should create an output file, `report.csv`, with as many lines as unique pairs of product and year (of `Date received`) in the input file.

Each line in the output file should list the following fields in the following order:
* product (name should be written in all lowercase)
* year
* total number of complaints received for that product and year
* total number of companies receiving at least one complaint for that product and year
* highest percentage (rounded to the nearest whole number) of total complaints filed against one company for that product and year. Use standard rounding conventions (i.e., Any percentage between 0.5% and 1%, inclusive, should round to 1% and anything less than 0.5% should round to 0%)

The lines in the output file should be sorted by product (alphabetically) and year (ascending)

Given the above `complaints.csv` input file, we'd expect an output file, `report.csv`, in the following format
```
"credit reporting, credit repair services, or other personal consumer reports",2019,3,2,67
"credit reporting, credit repair services, or other personal consumer reports",2020,1,1,100
debt collection,2019,1,1,100
```
Notice that because `debt collection` was only listed for 2019 and not 2020, the output file only has a single entry for debt collection. Also, notice that when a product has a comma (`,`) in the name, the name should be enclosed by double quotation marks (`"`). Finally, notice that percentages are listed as numbers and do not have `%` in them.

# Objectives

In this homework, we will tackle the above problem in two steps (2 tasks):

1. In Task 1, we work on a solution with PySpark on Google Colab using a sample of the data. The data is available on Google Drive and is to be downloaded by the `gdown` command in Task 1.

2. In Task 2, we create a standalone Python script that work on the full dataset using GCP DataProc. The full dataset is downloaded from [here](https://www.consumerfinance.gov/data-research/consumer-complaints/#download-the-data). The data is available on the class bucket as: `gs://bdma/data/complaints.csv`



## Environment Setup

In [None]:
%%shell
gdown --quiet 1-IeoZDwT5wQzBUpsaS5B6vTaP-2ZBkam
pip --quiet install pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone




In [None]:
COMPLAINTS_FN = 'complaints_sample.csv'

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
spark

In [None]:
!head -n 20 {COMPLAINTS_FN}

Date received,Product,Sub-product,Issue,Sub-issue,Consumer complaint narrative,Company public response,Company,State,ZIP code,Tags,Consumer consent provided?,Submitted via,Date sent to company,Company response to consumer,Timely response?,Consumer disputed?,Complaint ID
2015-12-31,Bank account or service,Checking account,"Making/receiving payments, sending money",,,,FIRSTBANK PUERTO RICO,PR,00902,Older American,N/A,Referral,2016-02-04,Closed with explanation,Yes,No,1723943
2016-03-15,Bank account or service,Other bank product/service,Problems caused by my funds being low,,,,FIRSTBANK PUERTO RICO,PR,00926,,Consent not provided,Web,2016-03-15,Closed with explanation,Yes,No,1833740
2016-10-24,Bank account or service,Checking account,"Account opening, closing, or management",,"In the month of XX/XX/2015, my email address ( XXXX ) was hacked and used to send messages to people associated with my business. At that time, transactions for the purchase and sales of products were made. The ha

In [None]:
## Importing all necessary libraries for calculation
from pyspark.sql.functions import year
from pyspark.sql.functions import lower, year, count, round
from pyspark.sql.functions import sum, expr
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import col, count, sum, ceil, max
from pyspark.sql.window import Window

In [None]:
## Reading the csv file "COMPLAINTS_FN"
df=spark.read.csv(COMPLAINTS_FN,header=True, multiLine=True, escape='"')
## selecting only 4 columns for the analysis which are Date received(take only year), Product, Company and Complaint ID
dyear=df.select("Product",(year("Date received")).alias("year"),"Company","Complaint ID").sort("Product","year")
## Lowercasing the Product and Company columns values for further grouping and other calculation
ds_product = dyear.withColumn("Product", lower(dyear["Product"].cast(StringType())))
ds_company = ds_product.withColumn("Company", lower(ds_product["Company"].cast(StringType())))
## Print the dataframe after the cleaning process and named as 'ds_company'
ds_company.show(2)

+--------------------+----+--------------------+------------+
|             Product|year|             Company|Complaint ID|
+--------------------+----+--------------------+------------+
|bank account or s...|2015|firstbank puerto ...|     1723943|
|bank account or s...|2016|firstbank puerto ...|     1833740|
+--------------------+----+--------------------+------------+
only showing top 2 rows



In [None]:
## Print the count of the data frame to check number of rows
ds_company.count()

6623

In [None]:

##Grouping the 'ds_company' dataframe  by product and year
##counting the number of Compplaint ID  and number of distinct Company for total complaints and Companies count
ds_complaint=ds_company.groupBy("Product", "year").agg(countDistinct("Complaint ID").alias("total_complaints"),countDistinct("Company").alias("Companies Count")).sort("Product","year")
ds_complaint.show(2)

+--------------------+----+----------------+---------------+
|             Product|year|total_complaints|Companies Count|
+--------------------+----+----------------+---------------+
|bank account or s...|2015|               1|              1|
|bank account or s...|2016|               2|              2|
+--------------------+----+----------------+---------------+
only showing top 2 rows



In [None]:
## Copunt of the grouped dsataframe "ds_complaint"
ds_complaint.count()

46

In [None]:
# Group by product, year, and company, and count Complaint ID for atleast one  complain of the company
## The grouped new data frame is "grouped_df"
grouped_df = ds_company.groupBy("Product", "year", "Company").agg(count("Complaint ID").alias("atleast_complaint_count"))
grouped_df.show(2)


+--------------------+----+--------------------+-----------------------+
|             Product|year|             Company|atleast_complaint_count|
+--------------------+----+--------------------+-----------------------+
|credit reporting,...|2019|service finance h...|                      1|
|     debt collection|2019|security credit s...|                      1|
+--------------------+----+--------------------+-----------------------+
only showing top 2 rows



In [None]:
##"grouped_df" count for rows
grouped_df.count()

1209

In [None]:
## Joining two data frames by Product and year
grouped_df1=ds_complaint.join(grouped_df, ["Product", "year"])
grouped_df1.show(2)

+--------------------+----+----------------+---------------+--------------------+-----------------------+
|             Product|year|total_complaints|Companies Count|             Company|atleast_complaint_count|
+--------------------+----+----------------+---------------+--------------------+-----------------------+
|credit reporting,...|2019|            3114|            203|phoenix financial...|                      1|
|credit reporting,...|2019|            3114|            203|        synovus bank|                      2|
+--------------------+----+----------------+---------------+--------------------+-----------------------+
only showing top 2 rows



In [None]:
## Rows count of "grouped_df1"
grouped_df1.count()

1209

In [None]:
# Calculate the percentage of total complaints filed against one company for each product and year by dividing with 'total_complaints'
grouped_df2 = grouped_df1.withColumn(("percentage"), round(grouped_df1["atleast_complaint_count"] / grouped_df1["total_complaints"] * 100, 0).cast(IntegerType())).sort("Product","year")
grouped_df2.show(2)



+--------------------+----+----------------+---------------+--------------------+-----------------------+----------+
|             Product|year|total_complaints|Companies Count|             Company|atleast_complaint_count|percentage|
+--------------------+----+----------------+---------------+--------------------+-----------------------+----------+
|bank account or s...|2015|               1|              1|firstbank puerto ...|                      1|       100|
|bank account or s...|2016|               2|              2|wells fargo & com...|                      1|        50|
+--------------------+----+----------------+---------------+--------------------+-----------------------+----------+
only showing top 2 rows



In [None]:
## Joining the 'ds_complaint' dataframe and "grouped_df2" dataframe to bring all the above
##calculation together like company count, total complaint, atleast_complaint_count and percentage
dp=ds_complaint.join(grouped_df2, ["Product", "year","total_complaints", "Companies Count"], "left").sort("Product","year")
dp.show(2)



+--------------------+----+----------------+---------------+--------------------+-----------------------+----------+
|             Product|year|total_complaints|Companies Count|             Company|atleast_complaint_count|percentage|
+--------------------+----+----------------+---------------+--------------------+-----------------------+----------+
|bank account or s...|2015|               1|              1|firstbank puerto ...|                      1|       100|
|bank account or s...|2016|               2|              2|wells fargo & com...|                      1|        50|
+--------------------+----+----------------+---------------+--------------------+-----------------------+----------+
only showing top 2 rows



In [None]:
## Grouping the dataframe 'dp' by product and year and printing only highest percentage
## of total complaints filed against one company for that product and year
gr = dp.groupBy("Product", "year") .agg(max("percentage").alias("highest_percentage")).sort("Product", "year")
gr.show(2)

+--------------------+----+------------------+
|             Product|year|highest_percentage|
+--------------------+----+------------------+
|bank account or s...|2015|               100|
|bank account or s...|2016|                50|
+--------------------+----+------------------+
only showing top 2 rows



In [None]:
## rows count of dataframe 'gr'
gr.count()

46

In [None]:
## merging dataframe 'ds' and 'gr' to put 'total complaints', 'companies count' and 'highest percentage of complaint' together
final=ds_complaint.join(gr,["Product", "year"]).sort("Product","year")
final.show()

+--------------------+----+----------------+---------------+------------------+
|             Product|year|total_complaints|Companies Count|highest_percentage|
+--------------------+----+----------------+---------------+------------------+
|bank account or s...|2015|               1|              1|               100|
|bank account or s...|2016|               2|              2|                50|
|checking or savin...|2017|               1|              1|               100|
|checking or savin...|2018|              20|             10|                25|
|checking or savin...|2019|             461|             72|                13|
|checking or savin...|2020|               3|              3|                33|
|       consumer loan|2015|               1|              1|               100|
|       consumer loan|2016|               1|              1|               100|
|       consumer loan|2017|               1|              1|               100|
|         credit card|2016|             

In [None]:
## writning the csv file for above dataframe "final"
final.write.csv("report.csv")
## Converting the csv into spark rdd
rdd=spark.sparkContext.textFile("report.csv")
## assigning the desired output name for rdd
outputTask1=rdd

## Task 1

Use PySpark to derive the expected output. Your computation must be done entirely on Spark's transformation. The output MUST be in the CSV form, i.e. each output line is a complete comma separated string that can be fed into a CSV reader. It is okay if your output are divided into multiple parts (due to the nature of distributed computing of Spark).

In [None]:
# outputTask1 is an output RDD, you can use DataFrame as well but each line
# still needs to be a string
outputTask1.take(20)

['bank account or service,2015,1,1,100',
 'bank account or service,2016,2,2,50',
 'checking or savings account,2017,1,1,100',
 'checking or savings account,2018,20,10,25',
 'checking or savings account,2019,461,72,13',
 'checking or savings account,2020,3,3,33',
 'consumer loan,2015,1,1,100',
 'consumer loan,2016,1,1,100',
 'consumer loan,2017,1,1,100',
 'credit card,2016,4,4,25',
 'credit card,2017,1,1,100',
 'credit card or prepaid card,2017,1,1,100',
 'credit card or prepaid card,2018,27,12,33',
 'credit card or prepaid card,2019,437,42,15',
 'credit card or prepaid card,2020,13,10,23',
 '"credit reporting, credit repair services, or other personal consumer reports",2017,7,5,29',
 '"credit reporting, credit repair services, or other personal consumer reports",2018,238,22,56',
 '"credit reporting, credit repair services, or other personal consumer reports",2019,3114,203,50',
 '"credit reporting, credit repair services, or other personal consumer reports",2020,144,10,51',
 'debt colle

## Task 2

For this task, please convert what you have in Task 1 to a standalone file that can be run on any DataProc cluster. The input and output locations must be taken from the command line, e.g. using my cluster named `bdma`:

```shell
gcloud dataproc jobs submit pyspark --cluster bdma BDM_HW3_EMPLID_LastName.py -- gs://bdma/data/complaints.csv gs://bdma/shared/2023_spring/HW3/EMPLID_LastName
```

As part of the test, you must be able to run your code and output to the class shared folder, i.e.: `gs://bdma/shared/2023_spring/HW3/EMPLID_LastName`, **replacing `EMPLID` and `LastName` with your actual EMPL ID and Last Name**.

Note that, if you run your code multiple times, make sure to only run your working version when output to the shared folder, or you must remove the existing output to run your code again.



In [None]:
!pip install google-cloud-dataproc

In [None]:
!gcloud auth login

In [None]:
!gcloud projects list

In [None]:
!gcloud config set project bdma-384100
!gcloud config set compute/region us-west1
!gcloud config set compute/zone us-west1-a
!gcloud config set dataproc/region us-west1

In [None]:
!gcloud dataproc clusters create bdm-hw3 --enable-component-gateway --region us-west1 --zone us-west1-a --master-machine-type n1-standard-4 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-4 --worker-boot-disk-size 500 --image-version 2.0-debian10 --project bdma-384100

In [None]:
!gcloud dataproc clusters describe bdm-hw3

clusterName: bdm-hw3
clusterUuid: 6479032e-cccd-4572-82a1-8a7c6a1ba9a8
config:
  configBucket: dataproc-staging-us-west1-473379980650-f1bjrajq
  endpointConfig:
    enableHttpPortAccess: true
    httpPorts:
      HDFS NameNode: https://cjvc4yql3rb2vjowuoqncgkfuy-dot-us-west1.dataproc.googleusercontent.com/hdfs/dfshealth.html
      HiveServer2 (bdm-hw3-m): https://cjvc4yql3rb2vjowuoqncgkfuy-dot-us-west1.dataproc.googleusercontent.com/hiveserver2ui/bdm-hw3-m?host=bdm-hw3-m
      MapReduce Job History: https://cjvc4yql3rb2vjowuoqncgkfuy-dot-us-west1.dataproc.googleusercontent.com/jobhistory/
      Spark History Server: https://cjvc4yql3rb2vjowuoqncgkfuy-dot-us-west1.dataproc.googleusercontent.com/sparkhistory/
      Tez: https://cjvc4yql3rb2vjowuoqncgkfuy-dot-us-west1.dataproc.googleusercontent.com/apphistory/tez-ui/
      YARN Application Timeline: https://cjvc4yql3rb2vjowuoqncgkfuy-dot-us-west1.dataproc.googleusercontent.com/apphistory/
      YARN ResourceManager: https://cjvc4yql3rb2vj

In [None]:
%%writefile BDM_HW3_23735863_Ghimire.py
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
spark
## Importing all necessary libraries for calculation
from pyspark.sql.functions import year
from pyspark.sql.functions import lower, year, count, round
from pyspark.sql.functions import sum, expr
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import col, count, sum, ceil, max
from pyspark.sql.window import Window
import csv
##Dowloading the file fomr gc



df=spark.read.csv("gs://bdma/data/complaints.csv",header=True, multiLine=True, inferSchema=True, escape='"')
## selecting only 4 columns for the analysis which are Date received(take only year), Product, Company and Complaint ID
dyear=df.select("Product",(year("Date received")).alias("year"),"Company","Complaint ID").sort("Product","year")
## Lowercasing the Product and Company columns values for further grouping and other calculation
ds_product = dyear.withColumn("Product", lower(dyear["Product"].cast(StringType())))
ds_company = ds_product.withColumn("Company", lower(ds_product["Company"].cast(StringType())))
##Grouping the 'ds_company' dataframe  by product and year
##counting the number of Compplaint ID  and number of distinct Company for total complaints and Companies count
ds_complaint=ds_company.groupBy("Product", "year").agg(countDistinct("Complaint ID").alias("total_complaints"),countDistinct("Company").alias("Companies Count")).sort("Product","year")

# Group by product, year, and company, and count Complaint ID for atleast one  complain of the company
## The grouped new data frame is "grouped_df"
grouped_df = ds_company.groupBy("Product", "year", "Company").agg(count("Complaint ID").alias("atleast_complaint_count"))
## Joining two data frames by Product and year
grouped_df1=ds_complaint.join(grouped_df, ["Product", "year"])

# Calculate the percentage of total complaints filed against one company for each product and year by dividing with 'total_complaints'
grouped_df2 = grouped_df1.withColumn(("percentage"), round(grouped_df1["atleast_complaint_count"] / grouped_df1["total_complaints"] * 100, 0).cast(IntegerType())).sort("Product","year")
## Joining the 'ds_complaint' dataframe and "grouped_df2" dataframe to bring all the above
##calculation together like company count, total complaint, atleast_complaint_count and percentage
dp=ds_complaint.join(grouped_df2, ["Product", "year","total_complaints", "Companies Count"], "left").sort("Product","year")
## Grouping the dataframe 'dp' by product and year and printing only highest percentage
## of total complaints filed against one company for that product and year
gr = dp.groupBy("Product", "year") .agg(max("percentage").alias("highest_percentage")).sort("Product", "year")
## merging dataframe 'ds' and 'gr' to put 'total complaints', 'companies count' and 'highest percentage of complaint' together
final=ds_complaint.join(gr,["Product", "year"]).sort("Product","year")
final.show(20)

final.write.csv("gs://bdma/shared/2023_spring/HW3/23735863_Ghimire")
## Converting the csv into spark rdd




Writing BDM_HW3_23735863_Ghimire.py


In [None]:
!gcloud dataproc jobs submit pyspark --cluster bdm-hw3 BDM_HW3_23735863_Ghimire.py -- gs://bdma/data/complaints.csv gs://bdma/shared/2023_spring/HW3/23735863_Ghimire


Job [a3d4d6b674ef4b449d18cc4ba93ebbbc] submitted.
Waiting for job output...
23/04/19 19:48:39 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/04/19 19:48:39 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/04/19 19:48:39 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/19 19:48:39 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
23/04/19 19:48:39 INFO org.sparkproject.jetty.util.log: Logging initialized @3269ms to org.sparkproject.jetty.util.log.Slf4jLog
23/04/19 19:48:39 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_362-b09
23/04/19 19:48:39 INFO org.sparkproject.jetty.server.Server: Started @3377ms
23/04/19 19:48:39 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@501f5554{HTTP/1.1, (http/1.1)}{0.0.0.0:36215}
23/04/19 19:48:40 INFO org.apache.hadoop.yarn.client.RMPro

In [None]:
##NOTE:- If running from begining
## Once the above ouput is printed look for report2. csv in collab and inside that look for a csv file "part......." replace the current csv file path


Copying file:///content/report2.csv/part-00000-6b1375fd-488c-4eed-9499-82f18a318372-c000.csv [Content-Type=text/csv]...
/ [0 files][    0.0 B/  5.0 KiB]                                                / [1 files][  5.0 KiB/  5.0 KiB]                                                
Operation completed over 1 objects/5.0 KiB.                                      


In [None]:
!gsutil ls  gs://bdma/shared/2023_spring/

In [None]:
                        #####################################################################THE END#############################################################################