# **Project: Default of Credit Card Clients**

## <font color=darkgreen>*Members*: 
> ### <font color=blue>**Chandan Patel**<br>**Rahul Bankey** <br>**Chandni Kumari**<br>**Divyanshi Singh**

## Objective: 

### Dataset Description
    
### Methods and Steps followed in this notebook:
> - #### Step 1: Importing the Data
> - #### Step 2: Data Preprocessing
> - #### Step 3: Data Exploration using PySpark SQL
> - #### Step 4: Feature Engineering
> - #### Step 5: Machine Learning Models
    Prepare data for machine learning.<br>
    Split into training and testing data.<br>
    Create and evaluate Decision Trees model.<br>
    Create and evaluate Random Forest model.<br>
    Create and evaluate Naive Bayes model.<br>
    Create and evaluate Multilayer Perceptron Neural Network model.
> - #### Step 6: Clustering K-Means
    Clustering using KMeans
> - #### Step 7: Rest of the Machine Learning Models
    Additional machine learning models (Decision Trees, Random Forest, Naive Bayes, MLP)
> - #### Step 8: Model Evaluation
    Model evaluation (accuracy, precision, recall, F1)
> - #### Step 9: Interpreting the Best Classification
    Displaying and interpreting the best classification model<br>
    Displaying performance metrics<br>

In this notebook we will use the  dataset that provides information related to the credit cards clients and store it the spark datawarehouse. We will then use the datawarehouse to access this data using spark sql.


## Step 1: Importing the Data

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import (RandomForestClassifier, 
                                      DecisionTreeClassifier, 
                                      NaiveBayes,
                                      MultilayerPerceptronClassifier)
from pyspark.ml.clustering import KMeans
from pyspark.sql import Row

#### Initialize Spark Session

In [2]:
from pyspark.sql import SparkSession;
from pyspark.context import SparkContext;

# warehouse_location points to the default location for managed databases and tables
from os.path import abspath
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    . builder \
    .master("local[*]") \
    .appName("Big Data Analytics Project - Team Data Wizards") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

sc =spark.sparkContext
sc.setLogLevel("ERROR") # only display errors (not warnings)

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

23/11/07 16:13:27 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.21.19.148 instead (on interface eth0)
23/11/07 16:13:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/07 16:13:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session WebUI Port: 4040


In [3]:
spark

In [4]:
spark.catalog.listTables()

[]

In [5]:
## Show theexisting  databases 
df=spark.sql("show databases")
df.show()

+---------+
|namespace|
+---------+
|  default|
+---------+



#### List the tables in the database.

In [6]:
tables = spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



Now, let's load the credit card dataset into the datawarehouse. We will use the spark dataframe API to load the data. We will then use the spark sql API to create a table from the dataframe. Load the file into a DataFrame and remove the header row.

In [7]:
df=spark.read.csv('credit_card_clients_csv.csv', header=True, inferSchema=True)
df.show(5)

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|default payment next month|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|                         1|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|  

In [9]:
df.printSchema() ## Print the schema of the dataframe

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: integer (nullable = true)
 |-- BILL_AMT2: integer (nullable = true)
 |-- BILL_AMT3: integer (nullable = true)
 |-- BILL_AMT4: integer (nullable = true)
 |-- BILL_AMT5: integer (nullable = true)
 |-- BILL_AMT6: integer (nullable = true)
 |-- PAY_AMT1: integer (nullable = true)
 |-- PAY_AMT2: integer (nullable = true)
 |-- PAY_AMT3: integer (nullable = true)
 |-- PAY_AMT4: integer (nullable = true)
 |-- PAY_AMT5: integer (nullable = true)
 |-- PAY_AMT6: integer (nullable = true)
 |-- default payment next month: inte

In [10]:
## Check the summary statistics and descriptions of numeric columns
df.describe().show()

[Stage 3:>                                                          (0 + 1) / 1]

+-------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+--------------------+------------------+--------------------+------------------+-----------------+-----------------+-----------------+-----------------+------------------+-----------------+----------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+--------------------------+
|summary|               ID|         LIMIT_BAL|               SEX|         EDUCATION|          MARRIAGE|              AGE|             PAY_0|               PAY_2|             PAY_3|               PAY_4|             PAY_5|            PAY_6|        BILL_AMT1|        BILL_AMT2|        BILL_AMT3|         BILL_AMT4|        BILL_AMT5|       BILL_AMT6|         PAY_AMT1|          PAY_AMT2|         PAY_AMT3|          PAY_AMT4|          PAY_AMT5|         PAY_AMT6|default payment next month|
+-------+-----------------+---

                                                                                

In [14]:
 ### remove unncessary row
df_raw = df.filter(lambda z: "EDUCATION" not in z)
df_raw.count()
df_raw.take(5)

PySparkTypeError: [NOT_COLUMN_OR_STR] Argument `condition` should be a Column or str, got function.

In [16]:
# Remove double quotes from records
df_raw = df.withColumn("value", col("value").cast("string"))
df_raw = df.withColumn("value", col("value").cast("double"))

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `value` cannot be resolved. Did you mean one of the following? [`AGE`, `ID`, `SEX`, `PAY_0`, `PAY_2`].;
'Project [ID#295, LIMIT_BAL#296, SEX#297, EDUCATION#298, MARRIAGE#299, AGE#300, PAY_0#301, PAY_2#302, PAY_3#303, PAY_4#304, PAY_5#305, PAY_6#306, BILL_AMT1#307, BILL_AMT2#308, BILL_AMT3#309, BILL_AMT4#310, BILL_AMT5#311, BILL_AMT6#312, PAY_AMT1#313, PAY_AMT2#314, PAY_AMT3#315, PAY_AMT4#316, PAY_AMT5#317, PAY_AMT6#318, ... 2 more fields]
+- Relation [ID#295,LIMIT_BAL#296,SEX#297,EDUCATION#298,MARRIAGE#299,AGE#300,PAY_0#301,PAY_2#302,PAY_3#303,PAY_4#304,PAY_5#305,PAY_6#306,BILL_AMT1#307,BILL_AMT2#308,BILL_AMT3#309,BILL_AMT4#310,BILL_AMT5#311,BILL_AMT6#312,PAY_AMT1#313,PAY_AMT2#314,PAY_AMT3#315,PAY_AMT4#316,PAY_AMT5#317,PAY_AMT6#318,default payment next month#319] csv


In [15]:
# Normalize sex to 1 and 2
df_raw = df.withColumn("SEX", (col("SEX") == "F").cast("int"))

In [8]:
#Load the file into a RDD
ccRaw = spark.sparkContext.textFile("credit_card_clients_csv.csv")
ccRaw.take(5)

                                                                                

['ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,default payment next month',
 '1,20000,2,2,1,24,2,2,-1,-1,-2,-2,3913,3102,689,0,0,0,0,689,0,0,0,0,1',
 '2,120000,2,2,2,26,-1,2,0,0,0,2,2682,1725,2682,3272,3455,3261,0,1000,1000,1000,0,2000,1',
 '3,90000,2,2,2,34,0,0,0,0,0,0,29239,14027,13559,14331,14948,15549,1518,1500,1000,1000,1000,5000,0',
 '4,50000,2,2,1,37,0,0,0,0,0,0,46990,48233,49291,28314,28959,29547,2000,2019,1200,1100,1069,1000,0']

In [9]:

#Remove header row
dataLines = ccRaw.filter(lambda z: "EDUCATION" not in z)
dataLines.count()

30000

In [10]:
dataLines.take(5)

['1,20000,2,2,1,24,2,2,-1,-1,-2,-2,3913,3102,689,0,0,0,0,689,0,0,0,0,1',
 '2,120000,2,2,2,26,-1,2,0,0,0,2,2682,1725,2682,3272,3455,3261,0,1000,1000,1000,0,2000,1',
 '3,90000,2,2,2,34,0,0,0,0,0,0,29239,14027,13559,14331,14948,15549,1518,1500,1000,1000,1000,5000,0',
 '4,50000,2,2,1,37,0,0,0,0,0,0,46990,48233,49291,28314,28959,29547,2000,2019,1200,1100,1069,1000,0',
 '5,50000,1,2,1,57,-1,0,-1,0,0,0,8617,5670,35835,20940,19146,19131,2000,36681,10000,9000,689,679,0']

In [11]:
#Remove double quotes that are present in few records.
cleanedLines = dataLines.map(lambda x: x.replace("\"", ""))
cleanedLines.count()

30000

In [12]:
cleanedLines.cache()

PythonRDD[27] at RDD at PythonRDD.scala:53

In [13]:
def convertToRow(instr) :
    attributeList = instr.split(",")
 
    # rounding of age to range of 10s.    
    ageRound = round(float(attributeList[5]) / 10.0) * 10
    
    #Normalize sex to only 1 and 2.
    sex = attributeList[2]
    if sex =="M":
        sex=1
    elif sex == "F":
        sex=2
    
    #average billed Amount.
    avgBillAmt = (float(attributeList[12]) +  \
                    float(attributeList[13]) + \
                    float(attributeList[15]) + \
                    float(attributeList[16]) + \
                    float(attributeList[16]) + \
                    float(attributeList[17]) ) / 6.0
                    
    #average pay amount
    avgPayAmt = (float(attributeList[18]) +  \
                    float(attributeList[19]) + \
                    float(attributeList[20]) + \
                    float(attributeList[21]) + \
                    float(attributeList[22]) + \
                    float(attributeList[23]) ) / 6.0
                    
    #Find average pay duration. 
    #Make sure numbers are rounded and negative values are eliminated
    avgPayDuration = round((abs(float(attributeList[6])) + \
                        abs(float(attributeList[7])) + \
                        abs(float(attributeList[8])) +\
                        abs(float(attributeList[9])) +\
                        abs(float(attributeList[10])) +\
                        abs(float(attributeList[11]))) / 6)
    
    #Average percentage paid. add this as an additional field to see
    #if this field has any predictive capabilities. This is 
    #additional creative work that you do to see possibilities.                    
    perPay = round((avgPayAmt/(avgBillAmt+1) * 100) / 25) * 25
                    
    values = Row (  CUSTID = attributeList[0], \
                    LIMIT_BAL = float(attributeList[1]), \
                    SEX = float(sex),\
                    EDUCATION = float(attributeList[3]),\
                    MARRIAGE = float(attributeList[4]),\
                    AGE = float(ageRound), \
                    AVG_PAY_DUR = float(avgPayDuration),\
                    AVG_BILL_AMT = abs(float(avgBillAmt)), \
                    AVG_PAY_AMT = float(avgPayAmt), \
                    PER_PAID= abs(float(perPay)), \
                    DEFAULTED = float(attributeList[24]) 
                    )

    return values

In [14]:
#Cleanedup RDD    
ccRows = cleanedLines.map(convertToRow)
ccRows.take(5)
# ccRows = df.map(convertToRow)
# ccRows.take(5)

[Row(CUSTID='1', LIMIT_BAL=20000.0, SEX=2.0, EDUCATION=2.0, MARRIAGE=1.0, AGE=20.0, AVG_PAY_DUR=2.0, AVG_BILL_AMT=1169.1666666666667, AVG_PAY_AMT=114.83333333333333, PER_PAID=0.0, DEFAULTED=1.0),
 Row(CUSTID='2', LIMIT_BAL=120000.0, SEX=2.0, EDUCATION=2.0, MARRIAGE=2.0, AGE=30.0, AVG_PAY_DUR=1.0, AVG_BILL_AMT=2975.0, AVG_PAY_AMT=833.3333333333334, PER_PAID=25.0, DEFAULTED=1.0),
 Row(CUSTID='3', LIMIT_BAL=90000.0, SEX=2.0, EDUCATION=2.0, MARRIAGE=2.0, AGE=30.0, AVG_PAY_DUR=0.0, AVG_BILL_AMT=17173.666666666668, AVG_PAY_AMT=1836.3333333333333, PER_PAID=0.0, DEFAULTED=0.0),
 Row(CUSTID='4', LIMIT_BAL=50000.0, SEX=2.0, EDUCATION=2.0, MARRIAGE=1.0, AGE=40.0, AVG_PAY_DUR=0.0, AVG_BILL_AMT=35167.0, AVG_PAY_AMT=1398.0, PER_PAID=0.0, DEFAULTED=0.0),
 Row(CUSTID='5', LIMIT_BAL=50000.0, SEX=1.0, EDUCATION=2.0, MARRIAGE=1.0, AGE=60.0, AVG_PAY_DUR=0.0, AVG_BILL_AMT=15441.666666666666, AVG_PAY_AMT=9841.5, PER_PAID=75.0, DEFAULTED=0.0)]

In [15]:
#Create a data frame.
ccDf = spark.createDataFrame(ccRows)
#ccDf.cache()
ccDf.show(5)

+------+---------+---+---------+--------+----+-----------+------------------+------------------+--------+---------+
|CUSTID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE| AGE|AVG_PAY_DUR|      AVG_BILL_AMT|       AVG_PAY_AMT|PER_PAID|DEFAULTED|
+------+---------+---+---------+--------+----+-----------+------------------+------------------+--------+---------+
|     1|  20000.0|2.0|      2.0|     1.0|20.0|        2.0|1169.1666666666667|114.83333333333333|     0.0|      1.0|
|     2| 120000.0|2.0|      2.0|     2.0|30.0|        1.0|            2975.0| 833.3333333333334|    25.0|      1.0|
|     3|  90000.0|2.0|      2.0|     2.0|30.0|        0.0|17173.666666666668|1836.3333333333333|     0.0|      0.0|
|     4|  50000.0|2.0|      2.0|     1.0|40.0|        0.0|           35167.0|            1398.0|     0.0|      0.0|
|     5|  50000.0|1.0|      2.0|     1.0|60.0|        0.0|15441.666666666666|            9841.5|    75.0|      0.0|
+------+---------+---+---------+--------+----+-----------+--------------

In [16]:
ccDf.printSchema()

root
 |-- CUSTID: string (nullable = true)
 |-- LIMIT_BAL: double (nullable = true)
 |-- SEX: double (nullable = true)
 |-- EDUCATION: double (nullable = true)
 |-- MARRIAGE: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- AVG_PAY_DUR: double (nullable = true)
 |-- AVG_BILL_AMT: double (nullable = true)
 |-- AVG_PAY_AMT: double (nullable = true)
 |-- PER_PAID: double (nullable = true)
 |-- DEFAULTED: double (nullable = true)



In [26]:
#Enhance Data
import pandas as pd

In [27]:

#Add SEXNAME to the data using SQL Joins.
genderDict = [{"SEX" : 1.0, "SEX_NAME" : "Male"}, \
                {"SEX" : 2.0, "SEX_NAME" : "Female"}]  

In [28]:
genderDf = spark.createDataFrame(pd.DataFrame(genderDict, \
            columns=['SEX', 'SEX_NAME']))

In [29]:
genderDf.collect()

[Row(SEX=1.0, SEX_NAME='Male'), Row(SEX=2.0, SEX_NAME='Female')]

In [31]:
ccDf1 = ccDf.join( genderDf, ccDf.SEX== genderDf.SEX ).drop(genderDf.SEX)
ccDf1.printSchema()


root
 |-- CUSTID: string (nullable = true)
 |-- LIMIT_BAL: double (nullable = true)
 |-- SEX: double (nullable = true)
 |-- EDUCATION: double (nullable = true)
 |-- MARRIAGE: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- AVG_PAY_DUR: double (nullable = true)
 |-- AVG_BILL_AMT: double (nullable = true)
 |-- AVG_PAY_AMT: double (nullable = true)
 |-- PER_PAID: double (nullable = true)
 |-- DEFAULTED: double (nullable = true)
 |-- SEX_NAME: string (nullable = true)



In [32]:
#Add ED_STR to the data with SQL joins.
eduDict = [{"EDUCATION" : 1.0, "ED_STR" : "Graduate"}, \
                {"EDUCATION" : 2.0, "ED_STR" : "University"}, \
                {"EDUCATION" : 3.0, "ED_STR" : "High School" }, \
                {"EDUCATION" : 4.0, "ED_STR" : "Others"}]  

In [34]:
eduDf = spark.createDataFrame(pd.DataFrame(eduDict, \
            columns=['EDUCATION', 'ED_STR']))

In [35]:
eduDf.collect()

[Row(EDUCATION=1.0, ED_STR='Graduate'),
 Row(EDUCATION=2.0, ED_STR='University'),
 Row(EDUCATION=3.0, ED_STR='High School'),
 Row(EDUCATION=4.0, ED_STR='Others')]

In [38]:
ccDf2 = ccDf1.join( eduDf, ccDf1.EDUCATION== eduDf.EDUCATION ).drop(eduDf.EDUCATION)
ccDf2.printSchema()

root
 |-- CUSTID: string (nullable = true)
 |-- LIMIT_BAL: double (nullable = true)
 |-- SEX: double (nullable = true)
 |-- EDUCATION: double (nullable = true)
 |-- MARRIAGE: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- AVG_PAY_DUR: double (nullable = true)
 |-- AVG_BILL_AMT: double (nullable = true)
 |-- AVG_PAY_AMT: double (nullable = true)
 |-- PER_PAID: double (nullable = true)
 |-- DEFAULTED: double (nullable = true)
 |-- SEX_NAME: string (nullable = true)
 |-- ED_STR: string (nullable = true)



In [39]:

#Add MARR_DESC to the data. Required for PR#03
marrDict = [{"MARRIAGE" : 1.0, "MARR_DESC" : "Single"}, \
                {"MARRIAGE" : 2.0, "MARR_DESC" : "Married"}, \
                {"MARRIAGE" : 3.0, "MARR_DESC" : "Others"}]  

In [41]:
marrDf = spark.createDataFrame(pd.DataFrame(marrDict, \
            columns=['MARRIAGE', 'MARR_DESC']))

In [42]:
marrDf.collect()

[Row(MARRIAGE=1.0, MARR_DESC='Single'),
 Row(MARRIAGE=2.0, MARR_DESC='Married'),
 Row(MARRIAGE=3.0, MARR_DESC='Others')]

In [43]:
ccFinalDf = ccDf2.join( marrDf, ccDf2.MARRIAGE== marrDf.MARRIAGE ).drop(marrDf.MARRIAGE)

In [44]:
ccFinalDf.cache()

DataFrame[CUSTID: string, LIMIT_BAL: double, SEX: double, EDUCATION: double, MARRIAGE: double, AGE: double, AVG_PAY_DUR: double, AVG_BILL_AMT: double, AVG_PAY_AMT: double, PER_PAID: double, DEFAULTED: double, SEX_NAME: string, ED_STR: string, MARR_DESC: string]

In [45]:
ccFinalDf.show(5)

23/11/07 16:39:57 ERROR Executor: Exception in task 0.0 in stage 31.0 (TID 236)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_103094/1815832823.py", lin

Py4JJavaError: An error occurred while calling o552.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 236) (linux executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_103094/1815832823.py", line 42, in convertToRow
ZeroDivisionError: float division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/student/miniconda3/envs/bd/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_103094/1815832823.py", line 42, in convertToRow
ZeroDivisionError: float division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
