# Consumer Complaints Challenge

Refer to [Consumer Complaints](https://github.com/InsightDataScience/consumer_complaints) challenge from [InsightDataScience](https://github.com/InsightDataScience/) for more details. 

The goal of this notebook is to solve the challenge using Apache Spark. 

The most important sections are **Input dataset** and **Expected output**, which are quoted below:

## Input dataset

Below are the contents of an example `complaints.csv` file: 
```
Date received,Product,Sub-product,Issue,Sub-issue,Consumer complaint narrative,Company public response,Company,State,ZIP code,Tags,Consumer consent provided?,Submitted via,Date sent to company,Company response to consumer,Timely response?,Consumer disputed?,Complaint ID
2019-09-24,Debt collection,I do not know,Attempts to collect debt not owed,Debt is not yours,"transworld systems inc. is trying to collect a debt that is not mine, not owed and is inaccurate.",,TRANSWORLD SYSTEMS INC,FL,335XX,,Consent provided,Web,2019-09-24,Closed with explanation,Yes,N/A,3384392
```
Each line of the input file, except for the first-line header, represents one complaint. Consult the [Consumer Finance Protection Bureau's technical documentation](https://cfpb.github.io/api/ccdb/fields.html) for a description of each field.  

* Notice that complaints were not listed in chronological order
* In 2019, there was a complaint against `TRANSWORLD SYSTEMS INC` for `Debt collection` 
* Also in 2019, `Experian Information Solutions Inc.` received one complaint for `Credit reporting, credit repair services, or other personal consumer reports` while `TRANSUNION INTERMEDIATE HOLDINGS, INC.` received two
* In 2020, `Experian Information Solutions Inc.` received a complaint for `Credit reporting, credit repair services, or other personal consumer reports`

In summary that means 
* In 2019, there was one complaint for `Debt collection`, and 100% of it went to one company 
* Also in 2019, three complaints against two companies were received for `Credit reporting, credit repair services, or other personal consumer reports` and 2/3rd of them (or 67% if we rounded the percentage to the nearest whole number) were against one company (TRANSUNION INTERMEDIATE HOLDINGS, INC.)
* In 2020, only one complaint was received for `Credit reporting, credit repair services, or other personal consumer reports`, and so the highest percentage received by one company would be 100%

For this challenge, we want for each product and year that complaints were received, the total number of complaints, number of companies receiving a complaint and the highest percentage of complaints directed at a single company.

For the purposes of this challenge, all names, including company and product, should be treated as case insensitive. For example, "Acme", "ACME", and "acme" would represent the same company.

## Expected output

Each line in the output file should list the following fields in the following order:
* product (name should be written in all lowercase)
* year
* total number of complaints received for that product and year
* total number of companies receiving at least one complaint for that product and year
* highest percentage (rounded to the nearest whole number) of total complaints filed against one company for that product and year. Use standard rounding conventions (i.e., Any percentage between 0.5% and 1%, inclusive, should round to 1% and anything less than 0.5% should round to 0%)

The lines in the output file should be sorted by product (alphabetically) and year (ascending)

Given the above `complaints.csv` input file, we'd expect an output file, `report.csv`, in the following format
```
"credit reporting, credit repair services, or other personal consumer reports",2019,3,2,67
"credit reporting, credit repair services, or other personal consumer reports",2020,1,1,100
debt collection,2019,1,1,100
```
Notice that because `debt collection` was only listed for 2019 and not 2020, the output file only has a single entry for debt collection. Also, notice that when a product has a comma (`,`) in the name, the name should be enclosed by double quotation marks (`"`). Finally, notice that percentages are listed as numbers and do not have `%` in them.

# Objectives

1. In Task 1, we work on a solution with PySpark on Google Colab using a sample of the data. The data is available on Google Drive and is to be downloaded by the `gdown` command in Task 1.

2. In Task 2, we create a standalone Python script that work on the full dataset using GCP DataProc. The full dataset is downloaded from [here](https://www.consumerfinance.gov/data-research/consumer-complaints/#download-the-data). The data is available on the class bucket as: `gs://bdma/data/complaints.csv`



## Environment Setup

In [19]:
%%shell
gdown --quiet 1-IeoZDwT5wQzBUpsaS5B6vTaP-2ZBkam
pip --quiet install pyspark



In [20]:
COMPLAINTS_FN = 'complaints_sample.csv'

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
sc = pyspark.SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
spark

## Task 1

In [21]:
import csv 
#there are some records that have multiple lines per record (csv reader can handle this, but spark cannot handle this)
#make sure that the when using spark, you get 6624 records
len(list(csv.reader(open(COMPLAINTS_FN,'r'))))

6624

In [22]:
#getting headers
with open(COMPLAINTS_FN, 'r') as file:
  data = csv.reader(file)
  for row in data:
    print(row)
    break

['Date received', 'Product', 'Sub-product', 'Issue', 'Sub-issue', 'Consumer complaint narrative', 'Company public response', 'Company', 'State', 'ZIP code', 'Tags', 'Consumer consent provided?', 'Submitted via', 'Date sent to company', 'Company response to consumer', 'Timely response?', 'Consumer disputed?', 'Complaint ID']


In [23]:
#function to handle multiline csv reading 
def extractData(partId, records):
  if partId == 0:
    next(records)
  import csv
  reader = csv.reader(records)
  for row in reader:
    if len(row) == 18:
      yield row

In [24]:
#loading data into rdd using extractData function
data = sc.textFile(COMPLAINTS_FN, use_unicode=True).cache().mapPartitionsWithIndex(extractData)

In [25]:
df = data.toDF(['Date received', 'Product', 'Sub-product', 'Issue', 'Sub-issue', 'Consumer complaint narrative', 'Company public response', 'Company', 'State', 'ZIP code', 'Tags', 'Consumer consent provided?', 'Submitted via', 'Date sent to company', 'Company response to consumer', 'Timely response?', 'Consumer disputed?', 'Complaint ID'])
df.show(5)

+-------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+-----+--------+--------------+--------------------------+-------------+--------------------+----------------------------+----------------+------------------+------------+
|Date received|             Product|         Sub-product|               Issue|           Sub-issue|Consumer complaint narrative|Company public response|             Company|State|ZIP code|          Tags|Consumer consent provided?|Submitted via|Date sent to company|Company response to consumer|Timely response?|Consumer disputed?|Complaint ID|
+-------------+--------------------+--------------------+--------------------+--------------------+----------------------------+-----------------------+--------------------+-----+--------+--------------+--------------------------+-------------+--------------------+----------------------------+----------------+-

In [26]:
#converting Date received column to year, and convert product to all lowercase 
dfA = df.withColumn('Date received', F.split('Date received','-')[0]) \
  .withColumnRenamed('Date received','year') \
  .withColumn('Product', F.lower('Product'))

dfA.show(3)

+----+--------------------+--------------------+--------------------+---------+----------------------------+-----------------------+--------------------+-----+--------+--------------+--------------------------+-------------+--------------------+----------------------------+----------------+------------------+------------+
|year|             Product|         Sub-product|               Issue|Sub-issue|Consumer complaint narrative|Company public response|             Company|State|ZIP code|          Tags|Consumer consent provided?|Submitted via|Date sent to company|Company response to consumer|Timely response?|Consumer disputed?|Complaint ID|
+----+--------------------+--------------------+--------------------+---------+----------------------------+-----------------------+--------------------+-----+--------+--------------+--------------------------+-------------+--------------------+----------------------------+----------------+------------------+------------+
|2015|bank account or s...| 

In [27]:
#grouping by product and year; counting total number of complaints, and unique number of companies 
dfB = dfA.groupBy(['Product','year']).agg(F.count('*').alias('Total Complaints'),\
                                          F.countDistinct('Company').alias('Total Companies'))\
                                          .sort('Product','year')
dfB.show()

+--------------------+----+----------------+---------------+
|             Product|year|Total Complaints|Total Companies|
+--------------------+----+----------------+---------------+
|bank account or s...|2015|               1|              1|
|bank account or s...|2016|               2|              2|
|checking or savin...|2017|               1|              1|
|checking or savin...|2018|              20|             10|
|checking or savin...|2019|             461|             72|
|checking or savin...|2020|               3|              3|
|       consumer loan|2015|               1|              1|
|       consumer loan|2016|               1|              1|
|       consumer loan|2017|               1|              1|
|         credit card|2016|               4|              4|
|         credit card|2017|               1|              1|
|credit card or pr...|2017|               1|              1|
|credit card or pr...|2018|              27|             12|
|credit card or pr...|20

In [28]:
#create a table grouped by product, year, and company so that we can calculate the sum complaints for each company
dfC = dfA.groupBy(['Product','year','Company']).agg(F.count('*')\
        .alias('Total Complaints')).sort('Product','year')
dfC.show()

+--------------------+----+--------------------+----------------+
|             Product|year|             Company|Total Complaints|
+--------------------+----+--------------------+----------------+
|bank account or s...|2015|FIRSTBANK PUERTO ...|               1|
|bank account or s...|2016|WELLS FARGO & COM...|               1|
|bank account or s...|2016|FIRSTBANK PUERTO ...|               1|
|checking or savin...|2017|            Comerica|               1|
|checking or savin...|2018|REGIONS FINANCIAL...|               2|
|checking or savin...|2018|NAVY FEDERAL CRED...|               3|
|checking or savin...|2018|JPMORGAN CHASE & CO.|               5|
|checking or savin...|2018|       PNC Bank N.A.|               1|
|checking or savin...|2018|WELLS FARGO & COM...|               1|
|checking or savin...|2018|            Comerica|               1|
|checking or savin...|2018|UNITED SERVICES A...|               1|
|checking or savin...|2018|BANK OF AMERICA, ...|               3|
|checking 

In [29]:
#find the max total complaints for each product, year pair 
dfE = dfC.groupBy(['Product','year']).agg(F.max('Total Complaints')).sort('Product','year')
dfE.show()

+--------------------+----+---------------------+
|             Product|year|max(Total Complaints)|
+--------------------+----+---------------------+
|bank account or s...|2015|                    1|
|bank account or s...|2016|                    1|
|checking or savin...|2017|                    1|
|checking or savin...|2018|                    5|
|checking or savin...|2019|                   62|
|checking or savin...|2020|                    1|
|       consumer loan|2015|                    1|
|       consumer loan|2016|                    1|
|       consumer loan|2017|                    1|
|         credit card|2016|                    1|
|         credit card|2017|                    1|
|credit card or pr...|2017|                    1|
|credit card or pr...|2018|                    9|
|credit card or pr...|2019|                   66|
|credit card or pr...|2020|                    3|
|credit reporting,...|2017|                    2|
|credit reporting,...|2018|                  134|


In [30]:
#joining the tables to calculate the last column of highest percent of total complaints filed against one company for each product/year 
dfF = dfB.join(dfE, (dfB.Product == dfE.Product) & (dfB.year == dfE.year)).select(dfB['*'],dfE['max(Total Complaints)'])
dfF.show()

+--------------------+----+----------------+---------------+---------------------+
|             Product|year|Total Complaints|Total Companies|max(Total Complaints)|
+--------------------+----+----------------+---------------+---------------------+
|payday loan, titl...|2018|               7|              2|                    6|
|checking or savin...|2020|               3|              3|                    1|
|     debt collection|2019|            1130|            400|                   72|
|credit card or pr...|2017|               1|              1|                    1|
|        student loan|2019|             157|             37|                   58|
|       consumer loan|2016|               1|              1|                    1|
|            mortgage|2019|             415|             98|                   40|
|money transfer, v...|2017|               1|              1|                    1|
|        student loan|2020|               1|              1|                    1|
|ban

In [31]:
#calculating the highest percent of total complaints, rounding to the nearest percent and casting to integer
dfG = dfF.withColumn('Highest Perc of Total Complaints',F.round(dfF['max(Total Complaints)']/dfF['Total Complaints']*100).cast('integer'))\
      .select('Product','year','Total Complaints','Total Companies','Highest Perc of Total Complaints').sort('Product','year')

#converting all columns to strings 
dfG = dfG.select([dfG[c].cast('string') for c in dfG.columns])

dfG.show()

+--------------------+----+----------------+---------------+--------------------------------+
|             Product|year|Total Complaints|Total Companies|Highest Perc of Total Complaints|
+--------------------+----+----------------+---------------+--------------------------------+
|bank account or s...|2015|               1|              1|                             100|
|bank account or s...|2016|               2|              2|                              50|
|checking or savin...|2017|               1|              1|                             100|
|checking or savin...|2018|              20|             10|                              25|
|checking or savin...|2019|             461|             72|                              13|
|checking or savin...|2020|               3|              3|                              33|
|       consumer loan|2015|               1|              1|                             100|
|       consumer loan|2016|               1|              1|

In [32]:
# outputTask1 is an output RDD, you can use DataFrame as well but each line
# still needs to be a string
outputTask1 = dfG.rdd.map(lambda x: ','.join(x))
outputTask1.take(20)

['bank account or service,2015,1,1,100',
 'bank account or service,2016,2,2,50',
 'checking or savings account,2017,1,1,100',
 'checking or savings account,2018,20,10,25',
 'checking or savings account,2019,461,72,13',
 'checking or savings account,2020,3,3,33',
 'consumer loan,2015,1,1,100',
 'consumer loan,2016,1,1,100',
 'consumer loan,2017,1,1,100',
 'credit card,2016,4,4,25',
 'credit card,2017,1,1,100',
 'credit card or prepaid card,2017,1,1,100',
 'credit card or prepaid card,2018,27,12,33',
 'credit card or prepaid card,2019,437,42,15',
 'credit card or prepaid card,2020,13,10,23',
 'credit reporting, credit repair services, or other personal consumer reports,2017,7,5,29',
 'credit reporting, credit repair services, or other personal consumer reports,2018,238,22,56',
 'credit reporting, credit repair services, or other personal consumer reports,2019,3114,203,50',
 'credit reporting, credit repair services, or other personal consumer reports,2020,144,10,51',
 'debt collection,20

## Task 2

For this task, task 1 will be converted into a standalone file that can be run on any DataProc cluster.

In [50]:
%%writefile BDM_HW3_24363838_Lau.py
#!/usr/bin/python

#importing libraries 
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import sys 

#spark session
spark = SparkSession.builder.getOrCreate()
sc = pyspark.SparkContext.getOrCreate()

#defining the input path 
path = sys.argv[1]


#function to handle multiline csv reading 
def extractData(partId, records):
  if partId == 0:
    next(records)
  import csv
  reader = csv.reader(records)
  for row in reader:
    if len(row) == 18:
      yield row


#loading data into rdd using extractData function
data = sc.textFile(path, use_unicode=True).cache().mapPartitionsWithIndex(extractData)

#converting to pyspark dataframe to compute output 
df = data.toDF(['Date received', 'Product', 'Sub-product', 'Issue', 'Sub-issue', 'Consumer complaint narrative', 'Company public response', 'Company', 'State', 'ZIP code', 'Tags', 'Consumer consent provided?', 'Submitted via', 'Date sent to company', 'Company response to consumer', 'Timely response?', 'Consumer disputed?', 'Complaint ID'])

print(df.count())

#reading in the csv data, retrieving only the year for date, and making all product names lowercase 
df = df.withColumn('Date received', F.split('Date received','-')[0]) \
  .withColumnRenamed('Date received','year') \
  .withColumn('Product', F.lower('Product'))

#find the total complaints and total companies by product, year 
dfA = df.groupBy(['Product','year']).agg(F.count('*').alias('Total Complaints'),\
                                        F.countDistinct('Company').alias('Total Companies'))

#find the max total complaints by product, year when grouped by product, year, and company 
dfB = df.groupBy(['Product','year','Company']).agg(F.count('*')\
        .alias('Total Complaints')).groupBy(['Product','year']).agg(F.max('Total Complaints'))

#joining tables 
dfC = dfA.join(dfB, (dfA.Product == dfB.Product) & (dfA.year == dfB.year)).select(dfA['*'],dfB['max(Total Complaints)'])

#calculating the percentage, formatting table for output 
output = dfC.withColumn('Highest Perc of Total Complaints',F.round(dfC['max(Total Complaints)']/dfC['Total Complaints']*100).cast('integer'))\
      .select('Product','year','Total Complaints','Total Companies','Highest Perc of Total Complaints').sort('Product','year')

#converting columns to string, and converting into rdd containing strings with comma separated values 
output = output.select([output[c].cast('string') for c in output.columns]).rdd.map(lambda x: ','.join(x))

#saving as csv 
output.saveAsTextFile(sys.argv[2])

print(output.count())


Overwriting BDM_HW3_24363838_Lau.py


In [51]:
#testing on sample_complaints.csv
!python BDM_HW3_24363838_Lau.py /content/complaints_sample.csv /content/testingCode

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/18 16:01:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/18 16:01:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/04/18 16:01:11 WARN BlockManager: Task 0 already completed, not releasing lock for rdd_1_0
6623
Traceback (most recent call last):
  File "/content/BDM_HW3_24363838_Lau.py", line 60, in <module>
    output.saveAsTextFile(sys.argv[2])
  File "/usr/local/lib/python3.9/dist-packages/pyspark/rdd.py", line 3406, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File "/usr/local/lib/python3.9/dist-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.9/dist-packages/pyspark/errors/exceptions/captured.py", line 169,

In [52]:
!pip install google-cloud-dataproc

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [53]:
!gcloud auth login

Go to the following link in your browser:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=32555940559.apps.googleusercontent.com&redirect_uri=https%3A%2F%2Fsdk.cloud.google.com%2Fauthcode.html&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fappengine.admin+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=VIxgRg9afXP8UhRtIFD1gjKTABG3kX&prompt=consent&access_type=offline&code_challenge=RvJstd50YMI21xqO9YKsfvhdd32krZKXJcEUKm-0Te8&code_challenge_method=S256

Enter authorization code: 4/0AVHEtk5ly9MtOnV5afaewKFvNc2RR9l_Ca9LzI-QsxqPyoSOJ7mKqWt4DirTcnPalyQ02g

You are now logged in as [alau002@citymail.cuny.edu].
Your current project is [bigdata-380720].  You can change this setting by running:
  $ gcloud config set project 

In [54]:
!gcloud projects list

PROJECT_ID      NAME     PROJECT_NUMBER
bigdata-380720  BigData  267580964279


In [55]:
!gcloud config set project bigdata-380720
!gcloud config set compute/region us-west1
!gcloud config set compute/zone us-west1-a
!gcloud config set dataproc/region us-west1

Updated property [core/project].
Updated property [compute/region].
Updated property [compute/zone].
Updated property [dataproc/region].


In [56]:
!gcloud dataproc clusters create bdm-hw3 --enable-component-gateway --region us-west1 --zone us-west1-a --master-machine-type n1-standard-4 --master-boot-disk-size 500 --num-workers 2 --worker-machine-type n1-standard-4 --worker-boot-disk-size 500 --image-version 2.0-debian10 --project bigdata-380720

Waiting on operation [projects/bigdata-380720/regions/us-west1/operations/2459d235-c514-3258-ad27-7d21679a37e1].

Created [https://dataproc.googleapis.com/v1/projects/bigdata-380720/regions/us-west1/clusters/bdm-hw3] Cluster placed in zone [us-west1-a].


In [57]:
!gcloud dataproc clusters list

NAME     PLATFORM  WORKER_COUNT  PREEMPTIBLE_WORKER_COUNT  STATUS   ZONE        SCHEDULED_DELETE
bdm-hw3  GCE       2                                       RUNNING  us-west1-a


In [58]:
!gcloud dataproc jobs submit pyspark --cluster bdm-hw3 BDM_HW3_24363838_Lau.py -- gs://bdma/data/complaints.csv gs://bdma/shared/2023_spring/HW3/24363838_Lau

Job [e144875e79ba47f1a00f6878d8a4f3f9] submitted.
Waiting for job output...
23/04/18 16:04:22 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/04/18 16:04:22 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/04/18 16:04:22 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/18 16:04:22 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
23/04/18 16:04:22 INFO org.sparkproject.jetty.util.log: Logging initialized @4852ms to org.sparkproject.jetty.util.log.Slf4jLog
23/04/18 16:04:22 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_362-b09
23/04/18 16:04:22 INFO org.sparkproject.jetty.server.Server: Started @4946ms
23/04/18 16:04:22 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@3e49a872{HTTP/1.1, (http/1.1)}{0.0.0.0:42669}
23/04/18 16:04:23 INFO org.apache.hadoop.yarn.client.RMPro

In [61]:
!gsutil ls gs://bdma/shared/2023_spring/HW3/

gs://bdma/shared/2023_spring/HW3/14211712_Salas/
gs://bdma/shared/2023_spring/HW3/24363838_Lau/
gs://bdma/shared/2023_spring/HW3/24373710_Uddin/
gs://bdma/shared/2023_spring/HW3/24438996_Radaelli/


In [62]:
!gsutil cat gs://bdma/shared/2023_spring/HW3/24363838_Lau/part*

bank account or service,2012,12212,98,19
bank account or service,2013,13388,164,18
bank account or service,2014,14662,258,17
bank account or service,2015,17140,215,17
bank account or service,2016,21848,230,15
bank account or service,2017,6955,173,16
checking or savings account,2017,12763,183,17
checking or savings account,2018,21211,214,16
checking or savings account,2019,21735,249,15
checking or savings account,2020,24238,269,14
checking or savings account,2021,29555,289,14
checking or savings account,2022,37585,346,14
checking or savings account,2023,12279,224,37
consumer loan,2012,1986,84,19
consumer loan,2013,3117,159,12
consumer loan,2014,5456,357,8
consumer loan,2015,7882,596,9
consumer loan,2016,9591,664,7
consumer loan,2017,3544,424,8
credit card,2011,1260,33,19
credit card,2012,15353,76,20
credit card,2013,13105,108,19
credit card,2014,13974,178,17
credit card,2015,17300,225,17
credit card,2016,21065,221,21
credit card,2017,7133,127,18
credit card or prepaid card,2017,15404,17

In [49]:
#!gsutil rm -r gs://bdma/shared/2023_spring/HW3/24363838_Lau/

Removing gs://bdma/shared/2023_spring/HW3/24363838_Lau/#1681832440900976...
Removing gs://bdma/shared/2023_spring/HW3/24363838_Lau/_SUCCESS#1681832441163957...
Removing gs://bdma/shared/2023_spring/HW3/24363838_Lau/part-00000#1681832438219404...
Removing gs://bdma/shared/2023_spring/HW3/24363838_Lau/part-00001#1681832438059436...
/ [4 objects]                                                                   
==> NOTE: You are performing a sequence of gsutil operations that may
run significantly faster if you instead use gsutil -m rm ... Please
see the -m section under "gsutil help options" for further information
about when gsutil -m can be advantageous.

Removing gs://bdma/shared/2023_spring/HW3/24363838_Lau/part-00002#1681832438154391...
Removing gs://bdma/shared/2023_spring/HW3/24363838_Lau/part-00003#1681832438057270...
Removing gs://bdma/shared/2023_spring/HW3/24363838_Lau/part-00004#1681832438226394...
Removing gs://bdma/shared/2023_spring/HW3/24363838_Lau/part-00005#16818324380

In [63]:
!gcloud dataproc clusters delete bdm-hw3 -q
!gcloud dataproc clusters list

Waiting on operation [projects/bigdata-380720/regions/us-west1/operations/785b5652-7d49-39cf-8597-b228b5cbf1c5].
Deleted [https://dataproc.googleapis.com/v1/projects/bigdata-380720/regions/us-west1/clusters/bdm-hw3].
Listed 0 items.
