In [1]:
from dotenv import load_dotenv
import os
from pathlib import Path
import pandas as pd
from pyspark.sql.functions import countDistinct, desc
from notebooks.utils import create_pie_chart

CURRENT_DIRECTORY_NOTEBOOK = None


def intitate_notebook():
    load_dotenv()
    global CURRENT_DIRECTORY_NOTEBOOK
    if CURRENT_DIRECTORY_NOTEBOOK is None:
        os.chdir(os.getenv("PROJECT_BASE_PATH"))
        CURRENT_DIRECTORY_NOTEBOOK = Path(os.getcwd())
        print("Current directory for notebook: ", CURRENT_DIRECTORY_NOTEBOOK)
    else:
        print(
            "Current directory for notebook is already set: ",
            CURRENT_DIRECTORY_NOTEBOOK,
        )


intitate_notebook()

Current directory for notebook:  /Users/shirshmall/Personal_Drive/Credit_Risk_Model_Project


### Features Definitions

In [2]:
features_definitions_df = pd.read_csv(
    "raw_main_dataset/home-credit-credit-risk-model-stability/feature_definitions.csv"
)
features_definitions_df

Unnamed: 0,Variable,Description
0,actualdpd_943P,Days Past Due (DPD) of previous contract (actu...
1,actualdpdtolerance_344P,DPD of client with tolerance.
2,addres_district_368M,District of the person's address.
3,addres_role_871L,Role of person's address.
4,addres_zip_823M,Zip code of the address.
...,...,...
460,totinstallast1m_4525188A,Total amount of monthly instalments paid in th...
461,twobodfilling_608L,Type of application process.
462,type_25L,Contact type of a person.
463,typesuite_864L,Persons accompanying the client during the loa...


In [3]:
from pyspark.sql import SparkSession

parquet_file_path = CURRENT_DIRECTORY_NOTEBOOK / Path(
    "raw_main_dataset/home-credit-credit-risk-model-stability/parquet_files/train/train_base.parquet"
)

spark = SparkSession.builder.appName("Parquet_File_Exploration").getOrCreate()


df = spark.read.parquet(str(parquet_file_path))
df.show()

24/07/14 17:35:46 WARN Utils: Your hostname, Shirshs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.68.106 instead (on interface en0)
24/07/14 17:35:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/14 17:35:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/14 17:35:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+-------+-------------+------+--------+------+
|case_id|date_decision| MONTH|WEEK_NUM|target|
+-------+-------------+------+--------+------+
|      0|   2019-01-03|201901|       0|     0|
|      1|   2019-01-03|201901|       0|     0|
|      2|   2019-01-04|201901|       0|     0|
|      3|   2019-01-03|201901|       0|     0|
|      4|   2019-01-04|201901|       0|     1|
|      5|   2019-01-02|201901|       0|     0|
|      6|   2019-01-03|201901|       0|     0|
|      7|   2019-01-03|201901|       0|     0|
|      8|   2019-01-03|201901|       0|     0|
|      9|   2019-01-03|201901|       0|     0|
|     10|   2019-01-03|201901|       0|     0|
|     11|   2019-01-03|201901|       0|     0|
|     12|   2019-01-03|201901|       0|     0|
|     13|   2019-01-03|201901|       0|     0|
|     14|   2019-01-03|201901|       0|     0|
|     15|   2019-01-03|201901|       0|     0|
|     16|   2019-01-03|201901|       0|     0|
|     17|   2019-01-03|201901|       0|     0|
|     18|   2

In [4]:
# Show the schema of the DataFrame
print("Schema:")
df.printSchema()


# Show the data types of the columns
print("\nData types:")
print(df.dtypes)

Schema:
root
 |-- case_id: long (nullable = true)
 |-- date_decision: string (nullable = true)
 |-- MONTH: long (nullable = true)
 |-- WEEK_NUM: long (nullable = true)
 |-- target: long (nullable = true)


Data types:
[('case_id', 'bigint'), ('date_decision', 'string'), ('MONTH', 'bigint'), ('WEEK_NUM', 'bigint'), ('target', 'bigint')]


In [5]:
# Show the first 5 rows
print("First 5 rows:")
df.show(10)

First 5 rows:
+-------+-------------+------+--------+------+
|case_id|date_decision| MONTH|WEEK_NUM|target|
+-------+-------------+------+--------+------+
|      0|   2019-01-03|201901|       0|     0|
|      1|   2019-01-03|201901|       0|     0|
|      2|   2019-01-04|201901|       0|     0|
|      3|   2019-01-03|201901|       0|     0|
|      4|   2019-01-04|201901|       0|     1|
|      5|   2019-01-02|201901|       0|     0|
|      6|   2019-01-03|201901|       0|     0|
|      7|   2019-01-03|201901|       0|     0|
|      8|   2019-01-03|201901|       0|     0|
|      9|   2019-01-03|201901|       0|     0|
+-------+-------------+------+--------+------+
only showing top 10 rows



In [6]:
# Count the number of rows
row_count = df.count()
print(f"Number of rows: {row_count}")

# Show the columns
print("\nColumns:")
print(df.columns)

Number of rows: 1526659

Columns:
['case_id', 'date_decision', 'MONTH', 'WEEK_NUM', 'target']


In [7]:
# Show the summary statistics
print("Summary statistics:")
df.summary().show()

Summary statistics:


24/07/14 17:35:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+-----------------+-------------+------------------+-----------------+-------------------+
|summary|          case_id|date_decision|             MONTH|         WEEK_NUM|             target|
+-------+-----------------+-------------+------------------+-----------------+-------------------+
|  count|          1526659|      1526659|           1526659|          1526659|            1526659|
|   mean|1286076.571738679|         NULL|201936.28798245057|40.76903617638254|0.03143727577671242|
| stddev|718946.5922850802|         NULL| 44.73597450410422|23.79798129273225| 0.1744963994279181|
|    min|                0|   2019-01-01|            201901|                0|                  0|
|    25%|           766057|         NULL|            201906|               23|                  0|
|    50%|          1357307|         NULL|            201910|               40|                  0|
|    75%|          1739145|         NULL|            202001|               55|                  0|
|    max| 

                                                                                

In [8]:
def explore_column(df, column_name):
    print(f"\nExploring '{column_name}' column:")
    df.select(column_name).summary().show()
    distinct_count = df.select(countDistinct(column_name)).collect()[0][0]
    print(f"Number of distinct '{column_name}': {distinct_count}")
    df.groupBy(column_name).count().orderBy(desc("count")).show(
        5
    )  # Show top 5 most frequent values

In [9]:
for column in df.columns:
    print("Column Name: " + column)
    explore_column(df, column)
    print(" - " * 25)

Column Name: case_id

Exploring 'case_id' column:
+-------+-----------------+
|summary|          case_id|
+-------+-----------------+
|  count|          1526659|
|   mean|1286076.571738679|
| stddev|718946.5922850802|
|    min|                0|
|    25%|           766057|
|    50%|          1357307|
|    75%|          1739145|
|    max|          2703454|
+-------+-----------------+



24/07/14 17:35:58 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Number of distinct 'case_id': 1526659


                                                                                

+-------+-----+
|case_id|count|
+-------+-----+
|     32|    1|
|     23|    1|
|     26|    1|
|    117|    1|
|     27|    1|
+-------+-----+
only showing top 5 rows

 -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  - 
Column Name: date_decision

Exploring 'date_decision' column:


                                                                                

+-------+-------------+
|summary|date_decision|
+-------+-------------+
|  count|      1526659|
|   mean|         NULL|
| stddev|         NULL|
|    min|   2019-01-01|
|    25%|         NULL|
|    50%|         NULL|
|    75%|         NULL|
|    max|   2020-10-05|
+-------+-------------+

Number of distinct 'date_decision': 644
+-------------+-----+
|date_decision|count|
+-------------+-----+
|   2019-11-29| 8812|
|   2019-11-30| 8756|
|   2019-12-28| 6900|
|   2019-12-29| 6537|
|   2019-11-17| 6340|
+-------------+-----+
only showing top 5 rows

 -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  - 
Column Name: MONTH

Exploring 'MONTH' column:
+-------+------------------+
|summary|             MONTH|
+-------+------------------+
|  count|           1526659|
|   mean|201936.28798245057|
| stddev| 44.73597450410422|
|    min|            201901|
|    25%|            201906|
|    50%|            201910|
|    75%|            202001|
|    max|            202010|
+------

In [11]:
create_pie_chart(df, "target")


Creating pie chart for 'target' column:


In [12]:
from pyspark.sql.functions import col, lit, datediff, expr, max, min

# Ensure 'date_decision' is in DateType format
df = df.withColumn("date_decision", col("date_decision").cast("date"))

# Extract week number and month from 'date_decision'
### Step 1: Get the minimum date as 'start_date'
start_date = df.select(min(col("date_decision"))).first()[0]

### Step 2: Calculate the difference of all dates with 'start_date' as "difference_date"
df = df.withColumn("difference_date", datediff(col("date_decision"), lit(start_date)))

### Step 3: Calculate the number of weeks in 'difference_date' as Extracted_WEEK_NUM
df = df.withColumn("Extracted_WEEK_NUM", expr("floor(difference_date / 7)"))

# Compare extracted values with existing columns
df_comparison = df.select(
    "date_decision",
    "WEEK_NUM",
    "Extracted_WEEK_NUM",
)

In [13]:
def get_max_min_of_column(column_name):
    max_value = df.agg(max(column_name)).collect()[0][0]
    min_value = df.agg(min(column_name)).collect()[0][0]
    print(
        "Column Name: ",
        column_name,
        "  |  Max Value: ",
        max_value,
        "  |  Min Value: ",
        min_value,
        "\n\n",
        " - " * 20,
        "\n",
    )


get_max_min_of_column("WEEK_NUM")
get_max_min_of_column("Extracted_WEEK_NUM")

Column Name:  WEEK_NUM   |  Max Value:  91   |  Min Value:  0 

  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  

Column Name:  Extracted_WEEK_NUM   |  Max Value:  91   |  Min Value:  0 

  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  



In [14]:
# Show mismatches for week numbers
print("Comparing WEEK_NUM:")
df_comparison.filter(col("WEEK_NUM") != col("Extracted_WEEK_NUM")).show()

Comparing WEEK_NUM:
+-------------+--------+------------------+
|date_decision|WEEK_NUM|Extracted_WEEK_NUM|
+-------------+--------+------------------+
+-------------+--------+------------------+



In [15]:
# Stop the SparkSession
spark.stop()