Setting up the environment and loading the data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, count, countDistinct, expr

# Initialize a Spark session
spark = SparkSession.builder.appName("BigDataAnalysis").getOrCreate()

# Load the dataset
file_path = "sampled_data.csv"  # Replace with your file path
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Display the schema and a sample of the data
df.printSchema()
df.show(5)


root
 |-- case_id: integer (nullable = true)
 |-- date_decision: date (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- WEEK_NUM: integer (nullable = true)
 |-- target: integer (nullable = true)
 |-- amtinstpaidbefduel24m_4187115A: double (nullable = true)
 |-- annuity_780A: double (nullable = true)
 |-- annuitynextmonth_57A: double (nullable = true)
 |-- avginstallast24m_3658937A: double (nullable = true)
 |-- avglnamtstart24m_4525187A: double (nullable = true)
 |-- avgoutstandbalancel6m_4187114A: double (nullable = true)
 |-- avgpmtlast12m_4525200A: double (nullable = true)
 |-- credamount_770A: double (nullable = true)
 |-- currdebt_22A: double (nullable = true)
 |-- currdebtcredtyperange_828A: double (nullable = true)
 |-- disbursedcredamount_1113A: double (nullable = true)
 |-- downpmt_116A: double (nullable = true)
 |-- inittransactionamount_650A: double (nullable = true)
 |-- lastapprcommoditycat_1041M: string (nullable = true)
 |-- lastapprcommoditytypec_5251766M: st

Step 2: Data Cleaning
Identify Missing Data

In [2]:
from pyspark.sql.functions import sum

# Count missing values for each column
missing_values = df.select([(col(c).isNull().cast("int")).alias(c) for c in df.columns])
missing_summary = missing_values.select([(sum(c).alias(c)) for c in df.columns])
missing_summary.show()


+-------+-------------+-----+--------+------+------------------------------+------------+--------------------+-------------------------+-------------------------+------------------------------+----------------------+---------------+------------+--------------------------+-------------------------+------------+--------------------------+--------------------------+-------------------------------+-----------------------+---------------------+-----------------+------------------------+---------------------------+------------------------------+-------------------------+---------------------+-------------------------------+------------+---------------+-------------------+-------------+-------------------------+------------------------+-------------------------------+---------------------+-------------------------+-----------+-------------------------+----------------------------+------------+-----------------+------------------------+--------------------+---------------+-------------+-------

Remove Columns with Excessive Missing Data

In [3]:
# Drop columns where more than 50% of the data is missing
threshold = 0.5 * df.count()  # Adjust threshold as needed
columns_to_keep = [c for c in df.columns if df.filter(col(c).isNotNull()).count() > threshold]
df_reduced = df.select(columns_to_keep)

# Display the updated schema and row count
df_reduced.printSchema()
df_reduced.count()


root
 |-- case_id: integer (nullable = true)
 |-- date_decision: date (nullable = true)
 |-- MONTH: integer (nullable = true)
 |-- WEEK_NUM: integer (nullable = true)
 |-- target: integer (nullable = true)
 |-- amtinstpaidbefduel24m_4187115A: double (nullable = true)
 |-- annuity_780A: double (nullable = true)
 |-- annuitynextmonth_57A: double (nullable = true)
 |-- avginstallast24m_3658937A: double (nullable = true)
 |-- credamount_770A: double (nullable = true)
 |-- currdebt_22A: double (nullable = true)
 |-- currdebtcredtyperange_828A: double (nullable = true)
 |-- disbursedcredamount_1113A: double (nullable = true)
 |-- downpmt_116A: double (nullable = true)
 |-- lastapprcommoditycat_1041M: string (nullable = true)
 |-- lastapprcommoditytypec_5251766M: string (nullable = true)
 |-- lastapprcredamount_781A: double (nullable = true)
 |-- lastcancelreason_561M: string (nullable = true)
 |-- lastrejectcommoditycat_161M: string (nullable = true)
 |-- lastrejectcommodtypec_5251769M: stri

1526659

Impute Missing Values for Numerical Columns

In [4]:
from pyspark.ml.feature import Imputer

# Define columns for imputation
numerical_columns = [c for c in df_reduced.columns if dict(df_reduced.dtypes)[c] in ('int', 'double')]

# Configure imputer to fill missing values with the mean
imputer = Imputer(inputCols=numerical_columns, outputCols=numerical_columns).setStrategy("mean")
df_imputed = imputer.fit(df_reduced).transform(df_reduced)

# Verify missing values are handled
df_imputed.select([(col(c).isNull().cast("int")).alias(c) for c in numerical_columns]).agg(
    *[(sum(c)).alias(c) for c in numerical_columns]
).show()


+-------+-----+--------+------+------------------------------+------------+--------------------+-------------------------+---------------+------------+--------------------------+-------------------------+------------+-----------------------+------------+---------------+-------------+-------------------------+-----------+-------------------------+------------+-----------------+--------------------------+
|case_id|MONTH|WEEK_NUM|target|amtinstpaidbefduel24m_4187115A|annuity_780A|annuitynextmonth_57A|avginstallast24m_3658937A|credamount_770A|currdebt_22A|currdebtcredtyperange_828A|disbursedcredamount_1113A|downpmt_116A|lastapprcredamount_781A|maininc_215A|maxannuity_159A|maxdebt4_972A|maxinstallast24m_3658928A|price_1097A|sumoutstandtotal_3546847A|totaldebt_9A|totalsettled_863A|mainoccupationinc_384A_max|
+-------+-----+--------+------+------------------------------+------------+--------------------+-------------------------+---------------+------------+--------------------------+--------

Handle Missing Values in Categorical Columns

In [6]:
# Replace nulls in categorical columns with a placeholder (e.g., 'Unknown')
categorical_columns = [c for c in df_reduced.columns if dict(df_reduced.dtypes)[c] == 'string']

# Use DataFrame's na.fill method to handle missing values
for col_name in categorical_columns:
    df_imputed = df_imputed.na.fill(value="Unknown", subset=[col_name])

# Display a sample of the cleaned data
df_imputed.show(5)


+-------+-------------+------+--------+------+------------------------------+------------+--------------------+-------------------------+---------------+------------+--------------------------+-------------------------+------------+--------------------------+-------------------------------+-----------------------+---------------------+---------------------------+------------------------------+---------------------+-------------------------------+-----------------+---------------+-------------+-------------------------+-------------------------+-----------------+-------------------------+------------+-----------------+--------------------+---------------+-------------+--------------+--------------+--------------------------+---------------------------------------+
|case_id|date_decision| MONTH|WEEK_NUM|target|amtinstpaidbefduel24m_4187115A|annuity_780A|annuitynextmonth_57A|avginstallast24m_3658937A|credamount_770A|currdebt_22A|currdebtcredtyperange_828A|disbursedcredamount_1113A|downpmt

Step 3: Descriptive Statistics

In [9]:
# Verify numerical columns in the DataFrame
numerical_columns = [c for c, dtype in df_imputed.dtypes if dtype in ("int", "double")]
print("Numerical Columns:", numerical_columns)

# Replace 'numerical_column' with one of the valid numerical columns
target_column = "your_numerical_column"  # Replace with a valid numerical column name from the above list

# Calculate the median for the selected column
if target_column in numerical_columns:
    median_value = df_imputed.approxQuantile(target_column, [0.5], 0)  # 0 indicates exact quantile computation
    print(f"Median of {target_column}: {median_value[0]}")
else:
    print(f"Column '{target_column}' is not numerical or does not exist in the DataFrame.")


Numerical Columns: ['case_id', 'MONTH', 'WEEK_NUM', 'target', 'amtinstpaidbefduel24m_4187115A', 'annuity_780A', 'annuitynextmonth_57A', 'avginstallast24m_3658937A', 'credamount_770A', 'currdebt_22A', 'currdebtcredtyperange_828A', 'disbursedcredamount_1113A', 'downpmt_116A', 'lastapprcredamount_781A', 'maininc_215A', 'maxannuity_159A', 'maxdebt4_972A', 'maxinstallast24m_3658928A', 'price_1097A', 'sumoutstandtotal_3546847A', 'totaldebt_9A', 'totalsettled_863A', 'mainoccupationinc_384A_max']
Column 'your_numerical_column' is not numerical or does not exist in the DataFrame.


Step 4: Data Exploration and Visualizations
Histograms and Box Plots

In [18]:
# from pyspark.sql.functions import col, floor
# import matplotlib.pyplot as plt

# # Specify a numerical column to use
# numerical_column = "amtinstpaidbefduel24m_4187115A"  # Replace with your desired numerical column

# # Check if the column exists in the DataFrame
# if numerical_column in [col_name for col_name, col_type in df_imputed.dtypes if col_type in ("int", "double")]:
#     # Create bins (adjust bin size as needed)
#     bin_size = 1000  # Adjust the bin size based on the range of the data
#     binned_df = df_imputed.withColumn("bin", (floor(col(numerical_column) / bin_size) * bin_size))

#     # Count rows per bin
#     aggregated_df = binned_df.groupBy("bin").count().orderBy("bin")

#     # Convert aggregated data to pandas for visualization
#     pandas_aggregated_df = aggregated_df.toPandas()

#     # Plot histogram from aggregated data
#     plt.bar(pandas_aggregated_df['bin'], pandas_aggregated_df['count'], width=bin_size)
#     plt.title(f"Histogram for {numerical_column}")
#     plt.xlabel("Bins")
#     plt.ylabel("Frequency")
#     plt.show()
# else:
#     print(f"The column '{numerical_column}' is not a valid numerical column.")
