In [1]:
from pyspark.sql import SparkSession,Row
from pyspark.sql import functions as F 
from pyspark.ml import feature as MF
from pyspark.ml.stat import Correlation

spark = SparkSession.builder \
    .appName("etl_project") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

In [None]:
# spark.stop()

In [2]:
df_train = spark.read.csv('train.csv',header=True,inferSchema=True)
# df_test = spark.read.csv('test.csv',header=True,inferSchema=True)
# sample_sub = spark.read.csv('sample_submission.csv',header=True,inferSchema=True)

In [3]:
display("Display First Few Rows of Training Data", df_train.head())
# display("Display First Few Rows of Testing Data", df_test.head())
# display("Display First Few Rows of Sample Submission", sample_sub.head())

'Display First Few Rows of Training Data'

Row(id=0, Age=19.0, Gender='Female', Annual Income=10049.0, Marital Status='Married', Number of Dependents=1.0, Education Level="Bachelor's", Occupation='Self-Employed', Health Score=22.59876067181393, Location='Urban', Policy Type='Premium', Previous Claims=2.0, Vehicle Age=17.0, Credit Score=372.0, Insurance Duration=5.0, Policy Start Date=datetime.datetime(2023, 12, 23, 15, 21, 39, 134960), Customer Feedback='Poor', Smoking Status='No', Exercise Frequency='Weekly', Property Type='House', Premium Amount=2869.0)

In [4]:
print("Summary of the Training Data")
df_train.describe().show()


Summary of the Training Data
+-------+-----------------+------------------+-------+-----------------+--------------+--------------------+---------------+----------+------------------+--------+-----------+------------------+-----------------+-----------------+------------------+-----------------+--------------+------------------+-------------+------------------+
|summary|               id|               Age| Gender|    Annual Income|Marital Status|Number of Dependents|Education Level|Occupation|      Health Score|Location|Policy Type|   Previous Claims|      Vehicle Age|     Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|    Premium Amount|
+-------+-----------------+------------------+-------+-----------------+--------------+--------------------+---------------+----------+------------------+--------+-----------+------------------+-----------------+-----------------+------------------+-----------------+--------------+------------------+

In [5]:
# print("Missing values of Training Data:\n", df_train.isnull().sum()/len(df_train)*100)  #pandas
from pyspark.sql.functions import col, sum

print("Missing values of Training Data (%):")
df_train.select(
    [(sum(col(c).isNull().cast("int")) / df_train.count() * 100).alias(c) for c in df_train.columns]
).show()


Missing values of Training Data (%):
+---+-------+------+------------------+------------------+--------------------+---------------+------------------+------------+--------+-----------+---------------+-----------+------------------+--------------------+-----------------+-----------------+--------------+------------------+-------------+--------------+
| id|    Age|Gender|     Annual Income|    Marital Status|Number of Dependents|Education Level|        Occupation|Health Score|Location|Policy Type|Previous Claims|Vehicle Age|      Credit Score|  Insurance Duration|Policy Start Date|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+-------+------+------------------+------------------+--------------------+---------------+------------------+------------+--------+-----------+---------------+-----------+------------------+--------------------+-----------------+-----------------+--------------+------------------+-------------+--------------+
|0.0|1.55875|  

In [6]:
df_train.show(8)

+---+----+------+-------------+--------------+--------------------+---------------+-------------+------------------+--------+-------------+---------------+-----------+------------+------------------+--------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|   Occupation|      Health Score|Location|  Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|   Policy Start Date|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+-------------+------------------+--------+-------------+---------------+-----------+------------+------------------+--------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|Self-Employ

In [7]:
df_train.dtypes

[('id', 'int'),
 ('Age', 'double'),
 ('Gender', 'string'),
 ('Annual Income', 'double'),
 ('Marital Status', 'string'),
 ('Number of Dependents', 'double'),
 ('Education Level', 'string'),
 ('Occupation', 'string'),
 ('Health Score', 'double'),
 ('Location', 'string'),
 ('Policy Type', 'string'),
 ('Previous Claims', 'double'),
 ('Vehicle Age', 'double'),
 ('Credit Score', 'double'),
 ('Insurance Duration', 'double'),
 ('Policy Start Date', 'timestamp'),
 ('Customer Feedback', 'string'),
 ('Smoking Status', 'string'),
 ('Exercise Frequency', 'string'),
 ('Property Type', 'string'),
 ('Premium Amount', 'double')]

In [8]:
from pyspark.sql import functions as F
# df_train.select('Credit Score').show()
df_train.select(F.min("Credit Score")).collect()[0][0]

300.0

# outliers

In [9]:
df_train.select("Credit Score").summary("min", "25%", "50%", "75%", "max").show()


+-------+------------+
|summary|Credit Score|
+-------+------------+
|    min|       300.0|
|    25%|       468.0|
|    50%|       595.0|
|    75%|       721.0|
|    max|       849.0|
+-------+------------+



In [10]:
df_train.describe()

DataFrame[summary: string, id: string, Age: string, Gender: string, Annual Income: string, Marital Status: string, Number of Dependents: string, Education Level: string, Occupation: string, Health Score: string, Location: string, Policy Type: string, Previous Claims: string, Vehicle Age: string, Credit Score: string, Insurance Duration: string, Customer Feedback: string, Smoking Status: string, Exercise Frequency: string, Property Type: string, Premium Amount: string]

#  main concerns about handling null

> keep the null and then string index it

> fill the null but how? 

> drop the null

In [11]:
from pyspark.sql.functions import  count

# column_names  = ['Gender',  'Marital Status']
column_names = ['Gender', 'Marital Status', 'Number of Dependents', 'Education Level', 'Occupation',  'Location', 'Policy Type', 'Previous Claims',   'Insurance Duration',  'Customer Feedback', 'Smoking Status', 'Exercise Frequency', 'Property Type']

# Count frequency of each value
for column in column_names:

    df_freq = df_train.groupBy(column).agg(count("*").alias("Frequency"))
    # print(f"{column}")
    df_freq.show()
    print("===================\n")

# Show result
# df_freq.show()

+------+---------+
|Gender|Frequency|
+------+---------+
|Female|   597429|
|  Male|   602571|
+------+---------+


+--------------+---------+
|Marital Status|Frequency|
+--------------+---------+
|       Married|   394316|
|      Divorced|   391764|
|          NULL|    18529|
|        Single|   395391|
+--------------+---------+


+--------------------+---------+
|Number of Dependents|Frequency|
+--------------------+---------+
|                 0.0|   218124|
|                 1.0|   215076|
|                 3.0|   221475|
|                 4.0|   220340|
|                NULL|   109672|
|                 2.0|   215313|
+--------------------+---------+


+---------------+---------+
|Education Level|Frequency|
+---------------+---------+
|    High School|   289441|
|            PhD|   303507|
|       Master's|   303818|
|     Bachelor's|   303234|
+---------------+---------+


+-------------+---------+
|   Occupation|Frequency|
+-------------+---------+
|     Employed|   282750|
|   

In [12]:
# # Count occurrences of each unique string
# counts = df_train.groupBy("Gender").count()
# print(counts)

from pyspark.sql.functions import col, count
# Count frequency of each value
df_freq = df_train.groupBy("Marital Status").agg(count("*").alias("Frequency"))

# Show result
df_freq.show()

# df_train.groupBy("Gender")


+--------------+---------+
|Marital Status|Frequency|
+--------------+---------+
|       Married|   394316|
|      Divorced|   391764|
|          NULL|    18529|
|        Single|   395391|
+--------------+---------+



# middle ai to fill null ( ocupations )

In [14]:
#the second dataframe
df2 = df_train

In [15]:
df2.show(8)

+---+----+------+-------------+--------------+--------------------+---------------+-------------+------------------+--------+-------------+---------------+-----------+------------+------------------+--------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|   Occupation|      Health Score|Location|  Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|   Policy Start Date|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+-------------+------------------+--------+-------------+---------------+-----------+------------+------------------+--------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|Self-Employ

In [16]:
df2.select("Policy Start Date").first()

Row(Policy Start Date=datetime.datetime(2023, 12, 23, 15, 21, 39, 134960))

# filling (string) Nulls

In [17]:
from pyspark.sql.functions import col, count
a= df2.filter(col("Age").isNotNull()).count()
b =df2.filter(col("Age").isNull()).count()
print(a,b)

1181295 18705


In [18]:
# df2 = df2.dropna(subset=['Occupation'])

df2 = df2.fillna({'Occupation': 'Unknown_Occupation'})
df2 = df2.fillna({'Marital Status': 'Unknown_marital_status'})
df2 = df2.fillna({'Customer Feedback': 'Unknown_customer_feedback'})
# df2 = df2.fillna({'Number of Dependents': '0'})
# df2 = df2.fillna({'Previous Claims': '0'})

In [19]:
# smoking status to 0 and 1
from pyspark.sql.functions import when
# 'Smoking Status'

df2 = df2.withColumn("Smoking Status", when(df2["Smoking Status"] == "Yes", 1)
                                      .when(df2["Smoking Status"] == "No", 0)
                                      .otherwise(None)
                                      ) 

# df2 = df2.replace({"Smoking Status": {"Yes": 1, "No": 0}})


# df2.show(10)

In [20]:
df2 = df2.drop('Policy Start Date')

In [21]:
df2.show(10)

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-------------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|  Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-------------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|     Self-Employed| 22.59876067181393|   Urban|      Premiu

In [22]:
# df2.show(10)
from pyspark.sql.functions import col, count
df_freq_occu = df2.groupBy("Occupation").agg(count("*").alias("Frequency"))

# Show result
df_freq_occu.show()

+------------------+---------+
|        Occupation|Frequency|
+------------------+---------+
|Unknown_Occupation|   358075|
|          Employed|   282750|
|        Unemployed|   276530|
|     Self-Employed|   282645|
+------------------+---------+



# Now filling the (numerical) Nulls

In [23]:
# fill with 0 
# fill_with_zero = ['Previous Claims','Number of Dependents']

#Previous Claims to 0
# df2 = df2.fillna(0,subset=["Previous Claims"])

#Number of Dependents
df2 = df2.fillna(0,subset=["Number of Dependents"])


In [24]:
mean_cols = ['Age','Health Score','Annual Income','Vehicle Age','Credit Score','Insurance Duration'] 

from pyspark.sql.functions import mean, col,count

def fillna_with_mean(data_frame,col_names_list):
    df2 = data_frame
    for col_name in col_names_list:
        mean_value = round(df2.select(mean(col(col_name))).collect()[0][0],2)
        print(f"{col_name} mean is {mean_value}")
        df2 = df2.fillna(mean_value,subset=[col_name])
        # full check
        number_of_null = df2.filter(col(col_name).isNull()).count()
        print(f"Null values for {col_name} is : {number_of_null}")
        
    return df2

df2 = fillna_with_mean(df2,mean_cols)
# outlier:
IQR = [''] # then use (median) to replace them

# mode_cols = 

#Annual Income > took all the values more than 20k as iqr is showing bizzare values # and less than those we replaced than with median

# Now ================================================
# Number of Dependents


Age mean is 41.15
Null values for Age is : 0
Health Score mean is 25.61
Null values for Health Score is : 0
Annual Income mean is 32745.22
Null values for Annual Income is : 0
Vehicle Age mean is 9.57
Null values for Vehicle Age is : 0
Credit Score mean is 592.92
Null values for Credit Score is : 0
Insurance Duration mean is 5.02
Null values for Insurance Duration is : 0


In [25]:
from pyspark.sql.functions import col, sum

print("Missing values of df2 Training Data (%):")
df2.select(
    [(sum(col(c).isNull().cast("int")) / df2.count() * 100).alias(c) for c in df2.columns]
).show()

Missing values of df2 Training Data (%):
+---+---+------+-------------+--------------+--------------------+---------------+----------+------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id|Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|Occupation|Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+---+------+-------------+--------------+--------------------+---------------+----------+------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|0.0|0.0|   0.0|          0.0|           0.0|                 0.0|            0.0|       0.0|         0.0|     0.0|        0.0|       30.33575|        0

In [26]:
df2.filter(col('Annual Income').isNull()).count()

0

# IQR

In [27]:
# we'll leave annual income for now

df_train   = df2
from pyspark.sql.functions import col, expr

# Step 1: Compute IQR and Median
summary = df_train.selectExpr(
    "percentile_approx(`Annual Income`, 0.25) as Q1",
    "percentile_approx(`Annual Income`, 0.75) as Q3",
    "percentile_approx(`Annual Income`, 0.5) as Median"
).collect()[0]

Q1 = summary["Q1"]
Q3 = summary["Q3"]
IQR = Q3 - Q1
Lower_Bound = Q1 - 1.5 * IQR
Upper_Bound = Q3 + 1.5 * IQR
median_income = summary["Median"]

# Step 2: Replace Outliers with Median
df_train = df_train.withColumn(
    "Annual Income",
    expr(f"CASE WHEN `Annual Income` < {Lower_Bound} OR `Annual Income` > {Upper_Bound} THEN {median_income} ELSE `Annual Income` END")
)

# Step 3: Fill Null Values with Median
df_train = df_train.fillna({"Annual Income": median_income})

# Show the final result
df_train.select("Annual Income").describe().show()


+-------+------------------+
|summary|     Annual Income|
+-------+------------------+
|  count|           1200000|
|   mean|27095.636900646812|
| stddev| 22269.90291187683|
|    min|               1.0|
|    max|           96871.0|
+-------+------------------+



In [28]:
from pyspark.sql.functions import col, count
# Count frequency of each value
df_freq = df2.groupBy("Annual Income").agg(count("*").alias("Frequency"))

# Show result
df_freq.count()

88594

In [29]:
# df_freq.show(48)
# df_freq.filter((col("Annual Income") >= 1) & (col("Annual Income") <= 30)).show(30)
# df_freq.filter((col("Annual Income") <= 20000) ).count()
df_freq.filter((col("Annual Income") <= 100) ).count()
# df_freq.filter((col("Frequency") >= 1) & (col("Frequency") <= 100)).show(50)

92

In [30]:
from pyspark.sql.functions import col, expr
# Identify numeric columns (int and double)
# numeric_columns = [col_name for col_name, dtype in df.dtypes if dtype in ("int", "double")]
# print("Numeric Columns:", numeric_columns)

# Function to remove outliers using IQR
column_now ='Age'

# Compute Q1, Q3, and IQR
quantiles = df2.approxQuantile(column_now, [0.25, 0.75], 0.05)
Q1, Q3 = quantiles[0], quantiles[1]
IQR = Q3 - Q1

#define lower & upper bound
lower_bound, upper_bound = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR

# Filter DataFrame to remove outliers
df2_ann=df2.filter((col(column_now) >= lower_bound) & (col(column_now) <= upper_bound))

# Apply outlier removal for each numeric column
# for col_name in numeric_columns:
#     df = remove_outliers(df, col_name)

In [31]:
df2_ann.count()
# df2_ann.show()

1200000

# sometimes the numerical values also satys as sting to we didn't try to find the colunms using code which are type (sting)

In [32]:
df2.dtypes

[('id', 'int'),
 ('Age', 'double'),
 ('Gender', 'string'),
 ('Annual Income', 'double'),
 ('Marital Status', 'string'),
 ('Number of Dependents', 'double'),
 ('Education Level', 'string'),
 ('Occupation', 'string'),
 ('Health Score', 'double'),
 ('Location', 'string'),
 ('Policy Type', 'string'),
 ('Previous Claims', 'double'),
 ('Vehicle Age', 'double'),
 ('Credit Score', 'double'),
 ('Insurance Duration', 'double'),
 ('Customer Feedback', 'string'),
 ('Smoking Status', 'int'),
 ('Exercise Frequency', 'string'),
 ('Property Type', 'string'),
 ('Premium Amount', 'double')]

In [33]:
df2.columns

['id',
 'Age',
 'Gender',
 'Annual Income',
 'Marital Status',
 'Number of Dependents',
 'Education Level',
 'Occupation',
 'Health Score',
 'Location',
 'Policy Type',
 'Previous Claims',
 'Vehicle Age',
 'Credit Score',
 'Insurance Duration',
 'Customer Feedback',
 'Smoking Status',
 'Exercise Frequency',
 'Property Type',
 'Premium Amount']

In [34]:
df2.show(10)

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-------------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|  Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-------------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|     Self-Employed| 22.59876067181393|   Urban|      Premiu

In [35]:
df2 = df2.withColumn("Customer Feedback", when(df2["Customer Feedback"] == "Poor", 1)
                                      .when(df2["Customer Feedback"] == "Average", 2)
                                      .when(df2["Customer Feedback"] == "Good", 3)
                                      .otherwise(0)
                                      ) 




df2.show(10)

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-------------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|  Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-------------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|     Self-Employed| 22.59876067181393|   Urban|      Premiu

In [36]:
df2 = df2.withColumn("Policy Type", when(df2["Policy Type"] == "Basic", 1)
                                      .when(df2["Policy Type"] == "Comprehensive", 2)
                                      .when(df2["Policy Type"] == "Premium", 3)
                                      .otherwise(0)
                                      ) 




df2.show(10)

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|     Self-Employed| 22.59876067181393|   Urban|          3|      

In [37]:
col_e = 'Exercise Frequency'
df2 = df2.withColumn(col_e, when(df2[col_e] == "Daily", 4)
                                      .when(df2[col_e] == "Weekly", 3)
                                      .when(df2[col_e] == "Monthly", 2)
                                      .when(df2[col_e] == "Rarely", 1)
                                      .otherwise(0)
                                      ) 




df2.show(10)

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|     Self-Employed| 22.59876067181393|   Urban|          3|      

In [38]:
# Count null values for each column
null_counts = df2.select([sum(col(c).isNull().cast("int")).alias(c) for c in df2.columns])

# Show the result
null_counts.show()

+---+---+------+-------------+--------------+--------------------+---------------+----------+------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id|Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|Occupation|Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+---+------+-------------+--------------+--------------------+---------------+----------+------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|  0|     0|            0|             0|                   0|              0|         0|           0|       0|          0|         364029|          0|           0|                 0|      

In [39]:
df2.count()

1200000

In [40]:
df2_dropped = df2

In [41]:
df2_dropped=df2_dropped.dropna()

In [42]:
df2_dropped.count()

835971

In [43]:
(df2_dropped.count()/df2.count())*100

69.66425000000001

In [44]:
# from pyspark.sql.functions import col, sum, when

# total_nulls = df2_dropped.select([sum(when(col(c).isNull(), 1).otherwise(0)) for c in df2_dropped.columns]).collect()[0].asDict().values()
# total_null_count = sum(total_nulls)

# print(total_null_count)

from pyspark.sql.functions import col, sum

# Count null values for each column
null_counts = df2_dropped.select([sum(col(c).isNull().cast("int")).alias(c) for c in df2_dropped.columns])

# Show the result
null_counts.show()



+---+---+------+-------------+--------------+--------------------+---------------+----------+------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id|Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|Occupation|Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+---+------+-------------+--------------+--------------------+---------------+----------+------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|  0|     0|            0|             0|                   0|              0|         0|           0|       0|          0|              0|          0|           0|                 0|      

# df2_dropped

In [45]:



# string index
#string indexer
from pyspark.ml.feature import StringIndexer

# to_string_indxn_columns = ['Gender', 'Marital Status', 'Education Level', 'Occupation',
#                        'Location', 'Policy Type', 'Customer Feedback', 
#                        'Exercise Frequency', 'Property Type']


to_string_indxn_columns = ['Gender', 'Marital Status', 'Education Level', 
                           'Occupation','Location',  'Property Type']

# # index
# indexers = [
#     StringIndexer(inputCol=col, outputCol=f"{col}_indexed").fit(df2)
#     for col in to_string_indxn_columns
# ]

# # transform
# df2 = indexers.transform(df2)


# both index and transfmation together
for col_name in to_string_indxn_columns:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_Indexed" , handleInvalid="keep")
    df2_dropped = indexer.fit(df2_dropped).transform(df2_dropped)

df2_dropped.show(10)


+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+---------

# Now drop the coulmns that we used to index new columns

In [46]:
df_indexed = df2_dropped.drop(*to_string_indxn_columns)
df_indexed.show(10)

+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|
+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
|  0|19.0|      10049.0|    

# Model training for ( Previous claims)

In [47]:
# Splitting df2 into 80% training and 20% testing
train_df, test_df = df_indexed.randomSplit([0.8, 0.2], seed=42)
train_df.show(10)

+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|
+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
|  0|19.0|      10049.0|    

In [48]:
test_df.show(10)

+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|
+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
|  2|23.0|      25602.0|    

In [49]:
train_df.columns

['id',
 'Age',
 'Annual Income',
 'Number of Dependents',
 'Health Score',
 'Policy Type',
 'Previous Claims',
 'Vehicle Age',
 'Credit Score',
 'Insurance Duration',
 'Customer Feedback',
 'Smoking Status',
 'Exercise Frequency',
 'Premium Amount',
 'Gender_Indexed',
 'Marital Status_Indexed',
 'Education Level_Indexed',
 'Occupation_Indexed',
 'Location_Indexed',
 'Property Type_Indexed']

In [50]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ['id',
 'Age',
 'Annual Income',
 'Number of Dependents',
 'Health Score',
 'Policy Type',
 'Vehicle Age',
 'Credit Score',
 'Insurance Duration',
 'Customer Feedback',
 'Smoking Status',
 'Exercise Frequency',
 'Premium Amount',
 'Gender_Indexed',
 'Marital Status_Indexed',
 'Education Level_Indexed',
 'Occupation_Indexed',
 'Location_Indexed',
 'Property Type_Indexed']

label_col = 'Previous Claims'


assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [51]:
print(assembler)

VectorAssembler_c69b4cb9f906


# transform using assembler

In [52]:
train_data = assembler.transform(train_df)
test_data = assembler.transform(test_df)

In [53]:
train_data.show(8)

+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+--------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|            features|
+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------

# Random forest

In [54]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features",labelCol=label_col)



In [55]:
rf_model = rf.fit(train_data)

In [56]:
rf_predictions = rf_model.transform(test_data)

In [57]:
rf_predictions.show(10)

+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+--------------------+--------------------+--------------------+----------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|            features|       rawPrediction|         probability|prediction|
+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+-

# Accuracy using multiclass classifier

In [58]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize the evaluator
# evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator = MulticlassClassificationEvaluator(labelCol=label_col, predictionCol="prediction", metricName="accuracy")

# Calculate accuracy
accuracy = evaluator.evaluate(rf_predictions)
print(f"Accuracy for random foest with multicalss classifier : {accuracy * 100:.2f}%")

Accuracy for random foest with multicalss classifier : 39.22%


# Filling (previous claims)

**df2** is where  we'll apply the model to fill (previous claims) but :
> ok here's the problem:
df2 is main with null (previous claims)
df2.dropped > is where i dropped all null (previous claims)
df2.indexed > is where i string indexed all string columns , after i dropped the mother string columns and kept theri indexed columns
how will i do it.?

> i used **df2.indexed**
 too train the model. now my question is that for the **df2** how will we fill the null data?

Since you trained the model on `df2.indexed`, you need to apply the same transformations to `df2` before making predictions. Here's how:  

1. **Filter Rows with Null `previous claims`**  
   - Extract rows from `df2` where `previous claims` is null.  

2. **Apply Same String Indexing**  
   - Ensure all categorical columns in `df2` are string-indexed using the same indexer from `df2.indexed`.  
   - Drop original string columns, keeping only indexed ones (just like in `df2.indexed`).  

3. **Make Predictions**  
   - Use the trained Random Forest model to predict `previous claims` for these rows.  

4. **Update `df2`**  
   - Replace null values in `df2["previous claims"]` with the predicted values.  

Now, `df2` is fully filled with no missing `previous claims`. 🚀

In [59]:
df2.show()

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|     Self-Employed| 22.59876067181393|   Urban|          3|      

In [60]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

# Step 1: Identify missing values
missing_df = df2.filter(col("previous claims").isNull())
missing_df.show()
#missing_data = df2.filter(col("Previous Claims").isNull()).drop("Previous Claims")


+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| 11|23.0|  Male|      30983.0|        Single|                 3.0|       Master's|Unknown_Occupation| 5.813128940949042|   Urban|          3|      

In [61]:
missing_df.count()

364029

In [63]:
from pyspark.ml.feature import StringIndexer

to_string_indxn_columns = ['Gender', 'Marital Status', 'Education Level', 
                           'Occupation','Location',  'Property Type']

# both index and transfmation together
for col_name in to_string_indxn_columns:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_Indexed" , handleInvalid="keep")
    missing_df = indexer.fit(missing_df).transform(missing_df)

missing_df.show(10)

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+---------

In [64]:
missing_df_indexed = missing_df.drop(*to_string_indxn_columns)
missing_df_indexed.show(10)

+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|
+---+----+-------------+--------------------+------------------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| 11|23.0|      30983.0|    

In [65]:
missing_data = missing_df_indexed.drop("Previous Claims")
missing_data.show(5)

+---+----+-------------+--------------------+------------------+-----------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Policy Type|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|
+---+----+-------------+--------------------+------------------+-----------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| 11|23.0|      30983.0|                 3.0| 5.813128940949042|          3|

In [66]:
# Step 2: Features extract koro (same as training time)
missing_data = assembler.transform(missing_data)  # Ensure same preprocessing steps

# Step 3: Prediction koro
missing_predictions = rf_model.transform(missing_data).select("prediction")

In [67]:
missing_predictions.show(10)

+----------+
|prediction|
+----------+
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       0.0|
|       1.0|
|       0.0|
|       0.0|
|       0.0|
+----------+
only showing top 10 rows



In [68]:
missing_predictions.count()

364029

In [71]:
# Ensure row order is maintained by using zipWithIndex
from pyspark.sql.functions import monotonically_increasing_id

# Step 1: Add a unique index to both DataFrames
missing_data = missing_data.withColumn("row_id", monotonically_increasing_id())
missing_predictions = missing_predictions.withColumn("row_id", monotonically_increasing_id())

# Step 2: Join based on "row_id"
missing_data_with_predictions = missing_data.join(missing_predictions, on="row_id").drop("row_id")

# Check the result
missing_data_with_predictions.show(10)

+---+----+-------------+--------------------+------------------+-----------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+--------------------+----------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Policy Type|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|            features|prediction|
+---+----+-------------+--------------------+------------------+-----------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+-------------

In [74]:
df = missing_data_with_predictions
df.filter(col("id") == 11).show()


+---+----+-------------+--------------------+-----------------+-----------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+--------------------+----------+
| id| Age|Annual Income|Number of Dependents|     Health Score|Policy Type|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|            features|prediction|
+---+----+-------------+--------------------+-----------------+-----------+-----------+------------+------------------+-----------------+--------------+------------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+----------------

In [75]:
from pyspark.ml.feature import StringIndexer

to_string_indxn_columns = ['Gender', 'Marital Status', 'Education Level', 
                           'Occupation','Location',  'Property Type']
new_df = df2
# both index and transfmation together
for col_name in to_string_indxn_columns:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_Indexed" , handleInvalid="keep")
    new_df = indexer.fit(new_df).transform(new_df)

new_df.show(30)

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+--------------+----------------------+-----------------------+------------------+----------------+---------------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|Gender_Indexed|Marital Status_Indexed|Education Level_Indexed|Occupation_Indexed|Location_Indexed|Property Type_Indexed|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+---------

In [77]:
old_df = df2
old_df.show(15)

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
|  0|19.0|Female|      10049.0|       Married|                 1.0|     Bachelor's|     Self-Employed| 22.59876067181393|   Urban|          3|      

In [80]:
# Step 4: Prediction results main dataframe e update koro
from pyspark.sql.functions import col, when

# Step 1: Join `old_df` with `df` (containing predictions)
joined_df = old_df.join(df.select("id", "prediction"), on="id", how="left")


updated_df = joined_df.withColumn(
    "Previous Claims",
    when(col("Previous Claims").isNull(), col("prediction"))  # Use col() instead of df["prediction"]
    .otherwise(col("Previous Claims"))
).drop("prediction")  # Drop extra "prediction" column after update

# Updated dataframe dekhte chaile
updated_df.show()

+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|        Occupation|      Health Score|Location|Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|
+---+----+------+-------------+--------------+--------------------+---------------+------------------+------------------+--------+-----------+---------------+-----------+------------+------------------+-----------------+--------------+------------------+-------------+--------------+
| 12|25.0|Female|      23706.0|        Single|                 4.0|       Master's|          Employed| 4.090538023921365|   Urban|          2|      

In [98]:
updated_df.filter(col("id") == 20).show()

Py4JJavaError: An error occurred while calling o4147.showString.
: org.apache.spark.SparkException: Multiple failures in stage materialization.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.multiFailuresInStageMaterializationError(QueryExecutionErrors.scala:2076)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.cleanUpAndThrowException(AdaptiveSparkPlanExec.scala:821)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:335)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:419)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 471.0 failed 1 times, most recent failure: Lost task 1.0 in stage 471.0 (TID 1486) (DESKTOP-KGDANQ7 executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 41 more

Driver stacktrace:
		at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
		at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
		at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
		at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
		at scala.Option.foreach(Option.scala:407)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
		at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
		at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
		at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
		at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
		at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
		at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
		at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
		at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
		at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
		at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
		at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
		at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
		at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
		at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
		at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
		at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
		at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
		at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
		at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
		at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
		at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
		at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
		at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
		at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
		at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
		at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
		at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
		at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
		at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
		at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
		at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
		at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
		at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
		at org.apache.spark.scheduler.Task.run(Task.scala:141)
		at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
		at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
		at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
		at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
		at java.base/java.lang.Thread.run(Thread.java:834)
	Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
		at java.base/java.io.FileInputStream.readBytes(Native Method)
		at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
		at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
		... 41 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 470.0 failed 1 times, most recent failure: Lost task 6.0 in stage 470.0 (TID 1483) (DESKTOP-KGDANQ7 executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 41 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 41 more


In [97]:
updated_df.show()

Py4JJavaError: An error occurred while calling o4063.showString.
: org.apache.spark.SparkException: Multiple failures in stage materialization.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.multiFailuresInStageMaterializationError(QueryExecutionErrors.scala:2076)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.cleanUpAndThrowException(AdaptiveSparkPlanExec.scala:821)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:335)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:419)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 468.0 failed 1 times, most recent failure: Lost task 5.0 in stage 468.0 (TID 1466) (DESKTOP-KGDANQ7 executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 41 more

Driver stacktrace:
		at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
		at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
		at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
		at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
		at scala.Option.foreach(Option.scala:407)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
		at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
		at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
		at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
		at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
		at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
		at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
		at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
		at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
		at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
		at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
		at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
		at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
		at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
		at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
		at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
		at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
		at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
		at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
		at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
		at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
		at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
		at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
		at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
		at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
		at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
		at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
		at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
		at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
		at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
		at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
		at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
		at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
		at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
		at org.apache.spark.scheduler.Task.run(Task.scala:141)
		at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
		at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
		at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
		at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
		at java.base/java.lang.Thread.run(Thread.java:834)
	Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
		at java.base/java.io.FileInputStream.readBytes(Native Method)
		at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
		at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
		... 41 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 467.0 failed 1 times, most recent failure: Lost task 6.0 in stage 467.0 (TID 1459) (DESKTOP-KGDANQ7 executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 41 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 41 more


In [96]:
Final_df = updated_df.orderBy(col("id"))
Final_df.show()

Py4JJavaError: An error occurred while calling o4129.showString.
: org.apache.spark.SparkException: Multiple failures in stage materialization.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.multiFailuresInStageMaterializationError(QueryExecutionErrors.scala:2076)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.cleanUpAndThrowException(AdaptiveSparkPlanExec.scala:821)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:335)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:272)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:419)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor88.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 465.0 failed 1 times, most recent failure: Lost task 3.0 in stage 465.0 (TID 1440) (DESKTOP-KGDANQ7 executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 41 more

Driver stacktrace:
		at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
		at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
		at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
		at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
		at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
		at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
		at scala.Option.foreach(Option.scala:407)
		at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
		at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
		at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
		at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
		at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
		at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
		at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
		at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
		at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
		at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
		at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
		at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
		at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
		at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
		at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
		at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
		at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
		at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
		at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
		at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
		at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
		at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
		at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
		at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
		at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
		at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
		at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
		at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
		at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
		at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
		at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
		at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
		at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
		at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
		at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
		at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
		at org.apache.spark.scheduler.Task.run(Task.scala:141)
		at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
		at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
		at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
		at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
		at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
		at java.base/java.lang.Thread.run(Thread.java:834)
	Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
		at java.base/java.io.FileInputStream.readBytes(Native Method)
		at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
		at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
		... 41 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 464.0 failed 1 times, most recent failure: Lost task 0.0 in stage 464.0 (TID 1429) (DESKTOP-KGDANQ7 executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$17.hasNext(Iterator.scala:814)
	at org.apache.spark.sql.catalyst.csv.CSVExprUtils$.extractHeader(CSVExprUtils.scala:54)
	at org.apache.spark.sql.catalyst.csv.CSVHeaderChecker.checkHeaderColumnNames(CSVHeaderChecker.scala:126)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.parseIterator(UnivocityParser.scala:446)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:103)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 48 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:158)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:198)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:67)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$17.hasNext(Iterator.scala:814)
	at org.apache.spark.sql.catalyst.csv.CSVExprUtils$.extractHeader(CSVExprUtils.scala:54)
	at org.apache.spark.sql.catalyst.csv.CSVHeaderChecker.checkHeaderColumnNames(CSVHeaderChecker.scala:126)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.parseIterator(UnivocityParser.scala:446)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:103)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 48 more


In [None]:
#
#print(sorted_df.count())  # Check the number of rows
#sorted_df.printSchema()   # Check the DataFrame structure
df.show(10)  # Show original DataFrame


Py4JJavaError: An error occurred while calling o4111.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 460.0 failed 1 times, most recent failure: Lost task 2.0 in stage 460.0 (TID 1407) (DESKTOP-KGDANQ7 executor driver): org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 41 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.hadoop.fs.FSError: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:211)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:290)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.fs.FSInputChecker.readFully(FSInputChecker.java:460)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readChunk(ChecksumFileSystem.java:272)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:300)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.fillBuffer(UncompressedSplitLineReader.java:62)
	at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:227)
	at org.apache.hadoop.util.LineReader.readLine(LineReader.java:185)
	at org.apache.hadoop.mapreduce.lib.input.UncompressedSplitLineReader.readLine(UncompressedSplitLineReader.java:94)
	at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.initialize(LineRecordReader.java:130)
	at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.<init>(HadoopFileLinesReader.scala:63)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.readFile(CSVDataSource.scala:96)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.$anonfun$buildReader$2(CSVFileFormat.scala:137)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
	at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:140)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: The process cannot access the file because another process has locked a portion of the file
	at java.base/java.io.FileInputStream.readBytes(Native Method)
	at java.base/java.io.FileInputStream.read(FileInputStream.java:279)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.read(RawLocalFileSystem.java:202)
	... 41 more


In [None]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

# Step 1: Identify missing values
missing_df = df2.filter(col("previous claims").isNull())

# Step 2: Select the same feature columns used in training (replace feature_cols with your actual list)
feature_cols = [...]  # List of feature columns used during training
missing_features = missing_df.select(feature_cols)

# Step 3: Convert missing_features into the same format used for model training
missing_features_transformed = feature_assembler.transform(missing_features)

# Step 4: Make predictions using the trained model
predictions = rf_model.transform(missing_features_transformed).select("prediction")

# Step 5: Extract predictions and replace NULL values in df2
predicted_values = predictions.collect()  # Get the predicted values
predicted_values = [row["prediction"] for row in predicted_values]  # Convert to a list

# Step 6: Add predictions back into df2
missing_rows = missing_df.withColumn("previous claims", F.lit(None))  # Preserve schema
for i, value in enumerate(predicted_values):
    missing_rows = missing_rows.withColumn("previous claims", F.when(F.col("previous claims").isNull(), value).otherwise(F.col("previous claims")))

# Step 7: Merge back into df2
df2 = df2.exceptAll(missing_df).union(missing_rows)

# Check if all nulls are filled
df2.select(F.count(F.when(F.col("previous claims").isNull(), 1))).show()


# Final model

In [90]:
#string indexer
from pyspark.ml.feature import StringIndexer

to_string_indxn_columns = ['Gender', 'Marital Status', 'Education Level', 'Occupation',
                       'Location', 'Policy Type', 'Customer Feedback', 
                       'Exercise Frequency', 'Property Type']

# # index
# indexers = [
#     StringIndexer(inputCol=col, outputCol=f"{col}_indexed").fit(df2)
#     for col in to_string_indxn_columns
# ]

# # transform
# df2 = indexers.transform(df2)


# both index and transfmation together
for col_name in to_string_indxn_columns:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_Index" , handleInvalid="keep")
    df2 = indexer.fit(df2).transform(df2)

df2.show(10)


[Stage 174:>                                                        (0 + 1) / 1]

+---+----+------+-------------+--------------+--------------------+---------------+-------------+------------------+--------+-------------+---------------+-----------+------------+------------------+--------------------+-----------------+--------------+------------------+-------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+------------------------+-------------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|   Occupation|      Health Score|Location|  Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|   Policy Start Date|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|Gender_Index|Marital Status_Index|Education Level_Index|Occupation_Index|Location_Index|Policy Type_Index|Customer Feedback_Index|Exercise Frequency_Index|Property Type_Index|
+---+----+------+-------------+--------------+

                                                                                

In [91]:
df2.show(10)

+---+----+------+-------------+--------------+--------------------+---------------+-------------+------------------+--------+-------------+---------------+-----------+------------+------------------+--------------------+-----------------+--------------+------------------+-------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+------------------------+-------------------+
| id| Age|Gender|Annual Income|Marital Status|Number of Dependents|Education Level|   Occupation|      Health Score|Location|  Policy Type|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|   Policy Start Date|Customer Feedback|Smoking Status|Exercise Frequency|Property Type|Premium Amount|Gender_Index|Marital Status_Index|Education Level_Index|Occupation_Index|Location_Index|Policy Type_Index|Customer Feedback_Index|Exercise Frequency_Index|Property Type_Index|
+---+----+------+-------------+--------------+

In [92]:
df_indexed = df2.drop(*to_string_indxn_columns)



In [93]:
df_indexed.show(10)

+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+------------------------+-------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|   Policy Start Date|Smoking Status|Premium Amount|Gender_Index|Marital Status_Index|Education Level_Index|Occupation_Index|Location_Index|Policy Type_Index|Customer Feedback_Index|Exercise Frequency_Index|Property Type_Index|
+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------

# model training for occupation

In [96]:
# Splitting df2 into 80% training and 20% testing
train_df, test_df = df_indexed.randomSplit([0.8, 0.2], seed=42)

In [99]:
train_df.show(5)

[Stage 181:>                                                        (0 + 1) / 1]

+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+------------------------+-------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Smoking Status|Premium Amount|Gender_Index|Marital Status_Index|Education Level_Index|Occupation_Index|Location_Index|Policy Type_Index|Customer Feedback_Index|Exercise Frequency_Index|Property Type_Index|
+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+------------------------+-------------------+
|  0|19.0|

                                                                                

In [98]:
train_df.columns

['id',
 'Age',
 'Annual Income',
 'Number of Dependents',
 'Health Score',
 'Previous Claims',
 'Vehicle Age',
 'Credit Score',
 'Insurance Duration',
 'Smoking Status',
 'Premium Amount',
 'Gender_Index',
 'Marital Status_Index',
 'Education Level_Index',
 'Occupation_Index',
 'Location_Index',
 'Policy Type_Index',
 'Customer Feedback_Index',
 'Exercise Frequency_Index',
 'Property Type_Index']

In [100]:
from pyspark.ml.feature import VectorAssembler

feature_cols = ['id',
 'Age',
 'Annual Income',
 'Number of Dependents',
 'Health Score',
 'Previous Claims',
 'Vehicle Age',
 'Credit Score',
 'Insurance Duration',
 'Smoking Status',
 'Premium Amount',
 'Gender_Index',
 'Marital Status_Index',
 'Education Level_Index',
 'Location_Index',
 'Policy Type_Index',
 'Customer Feedback_Index',
 'Exercise Frequency_Index',
 'Property Type_Index']

label_col = 'Occupation_Index'


assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [101]:
print(assembler)

VectorAssembler_e7308c298679


In [102]:
train_data = assembler.transform(train_df)
test_data = assembler.transform(test_df)

In [None]:
# train_data = train_data.dropna() 

In [103]:
train_data.show()

[Stage 182:>                                                        (0 + 1) / 1]

+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+------------------------+-------------------+--------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Smoking Status|Premium Amount|Gender_Index|Marital Status_Index|Education Level_Index|Occupation_Index|Location_Index|Policy Type_Index|Customer Feedback_Index|Exercise Frequency_Index|Property Type_Index|            features|
+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+--------------

                                                                                

# Logistic regression

In [104]:
from pyspark.ml.classification import LogisticRegression

# Create a Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol=label_col)


In [105]:
lr_model = lr.fit(train_data)

25/03/03 01:56:25 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [106]:
lr_predictions = lr_model.transform(test_data)

In [108]:
lr_predictions.show(10)

[Stage 271:>                                                        (0 + 1) / 1]

+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+------------------------+-------------------+--------------------+--------------------+--------------------+----------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Smoking Status|Premium Amount|Gender_Index|Marital Status_Index|Education Level_Index|Occupation_Index|Location_Index|Policy Type_Index|Customer Feedback_Index|Exercise Frequency_Index|Property Type_Index|            features|       rawPrediction|         probability|prediction|
+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+----

                                                                                

In [109]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Initialize the evaluator
# evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator = MulticlassClassificationEvaluator(labelCol="Occupation_Index", predictionCol="prediction", metricName="accuracy")

# Calculate accuracy
accuracy = evaluator.evaluate(lr_predictions)
print(f"Accuracy: {accuracy * 100:.2f}%")




Accuracy: 33.72%


                                                                                

In [107]:
print(lr_predictions)

DataFrame[id: int, Age: double, Annual Income: double, Number of Dependents: double, Health Score: double, Previous Claims: double, Vehicle Age: double, Credit Score: double, Insurance Duration: double, Smoking Status: int, Premium Amount: double, Gender_Index: double, Marital Status_Index: double, Education Level_Index: double, Occupation_Index: double, Location_Index: double, Policy Type_Index: double, Customer Feedback_Index: double, Exercise Frequency_Index: double, Property Type_Index: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]


# Now Random forest

In [111]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="features",labelCol="Occupation_Index")
rf_model = rf.fit(train_data)


                                                                                

In [112]:
rf_predictions = rf_model.transform(test_data)

In [115]:
rf_predictions.show(10)

[Stage 291:>                                                        (0 + 1) / 1]

+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+------------------------+-------------------+--------------------+--------------------+--------------------+----------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Smoking Status|Premium Amount|Gender_Index|Marital Status_Index|Education Level_Index|Occupation_Index|Location_Index|Policy Type_Index|Customer Feedback_Index|Exercise Frequency_Index|Property Type_Index|            features|       rawPrediction|         probability|prediction|
+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+----

                                                                                

In [113]:
evaluator = MulticlassClassificationEvaluator(labelCol="Occupation_Index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(rf_predictions)
print(f"Random Forest Accuracy: {accuracy * 100:.2f}%")

[Stage 288:>                                                        (0 + 8) / 8]

Random Forest Accuracy: 33.99%


                                                                                

In [114]:
train_data.show()

[Stage 290:>                                                        (0 + 1) / 1]

+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+------------------------+-------------------+--------------------+
| id| Age|Annual Income|Number of Dependents|      Health Score|Previous Claims|Vehicle Age|Credit Score|Insurance Duration|Smoking Status|Premium Amount|Gender_Index|Marital Status_Index|Education Level_Index|Occupation_Index|Location_Index|Policy Type_Index|Customer Feedback_Index|Exercise Frequency_Index|Property Type_Index|            features|
+---+----+-------------+--------------------+------------------+---------------+-----------+------------+------------------+--------------+--------------+------------+--------------------+---------------------+----------------+--------------+-----------------+-----------------------+--------------

                                                                                

# Let's start filling the Null using the (rf_model) 

Occupation > string indexer but

we have to fill the Null with (unknown) 

then we'll do it but we need the same indexer values as before(during the time of training)

> here comes the problem which is (string indexer) miss match.

In [116]:
# train_df.printSchema() #working 
# test_df.show(5) #not working




In [None]:
# column_names = df_train.columns
# print(column_names)

['id', 'Age', 'Gender', 'Annual Income', 'Marital Status', 'Number of Dependents', 'Education Level', 'Occupation', 'Health Score', 'Location', 'Policy Type', 'Previous Claims', 'Vehicle Age', 'Credit Score', 'Insurance Duration', 'Policy Start Date', 'Customer Feedback', 'Smoking Status', 'Exercise Frequency', 'Property Type', 'Premium Amount']


In [None]:
#   # Step 2: Define Schema (based on the data sample)
#     schema = StructType([
#         StructField("id", IntegerType(), True),
#         StructField("Age", DoubleType(), True),
#         StructField("Gender", StringType(), True),
#         StructField("Annual_Income", DoubleType(), True),
#         StructField("Marital_Status", StringType(), True),
#         StructField("Number_of_Dependents", DoubleType(), True),
#         StructField("Education_Level", StringType(), True),
#         StructField("Occupation", StringType(), True),
#         StructField("Health_Score", DoubleType(), True),
#         StructField("Location", StringType(), True),
#         StructField("Policy_Type", StringType(), True),
#         StructField("Previous_Claims", DoubleType(), True),
#         StructField("Vehicle_Age", DoubleType(), True),
#         StructField("Credit_Score", DoubleType(), True),
#         StructField("Insurance_Duration", DoubleType(), True),
#         StructField("Policy_Start_Date", StringType(), True),
#         StructField("Customer_Feedback", StringType(), True),
#         StructField("Smoking_Status", StringType(), True),
#         StructField("Exercise_Frequency", StringType(), True),
#         StructField("Property_Type", StringType(), True),
#         StructField("Premium_Amount", DoubleType(), True)
#     ])