In [0]:
sc

In [0]:
spark

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("CosmosDBToDatabricks").getOrCreate()
spark.conf.set("spark.sql.caseSensitive", "true")

### Azure Cosmos DB configuration

In [0]:

cosmos_endpoint = "https://capstonecosmoscredit.documents.azure.com:443/"
cosmos_database = "capstone_cosmosDB"
cosmos_collection1 = "Appllication_data_container"
cosmos_collection2 = "credit_record_conatiner"
cosmos_master_key = "VhN35T2koUruJbVqcy6LRO4zr8Iuz7Aug7EMRDRXSza27DJcqG76ViUifvuqBrVaeBXXwaKOfRbGACDbrw5Afg=="
 
application_df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", cosmos_endpoint) \
    .option("spark.cosmos.accountKey", cosmos_master_key) \
    .option("spark.cosmos.database", cosmos_database) \
    .option("spark.cosmos.container", cosmos_collection1) \
    .load()
 
application_df.show(5)

+-------+----------+----------+-----------------+--------------------+------------+------------+----------+--------------------+--------------------+-------------+--------------------+-----------+----------------+----------+---------------+--------------------+---------------+---------------+
|     ID|DAYS_BIRTH|FLAG_MOBIL|NAME_HOUSING_TYPE|     OCCUPATION_TYPE|FLAG_OWN_CAR|CNT_CHILDREN|FLAG_EMAIL|    NAME_INCOME_TYPE|                  id|DAYS_EMPLOYED| NAME_EDUCATION_TYPE|CODE_GENDER|AMT_INCOME_TOTAL|FLAG_PHONE|FLAG_WORK_PHONE|  NAME_FAMILY_STATUS|CNT_FAM_MEMBERS|FLAG_OWN_REALTY|
+-------+----------+----------+-----------------+--------------------+------------+------------+----------+--------------------+--------------------+-------------+--------------------+-----------+----------------+----------+---------------+--------------------+---------------+---------------+
|5008931|    -15519|         1| Rented apartment|            Laborers|           N|           0|         0|Commercial 

### optimaztions technique cache
When you call cache(), Spark will store the DataFrame in memory the first time it is computed. This can significantly speed up subsequent actions on the DataFrame, as Spark won’t need to recompute the DataFrame from scratch.

In [0]:
application_df=application_df.cache()

In [0]:
application_df.count()

Out[10]: 438557

### Cheching the Schema for typecasting

In [0]:
application_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- DAYS_BIRTH: string (nullable = true)
 |-- FLAG_MOBIL: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- OCCUPATION_TYPE: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- CNT_CHILDREN: string (nullable = true)
 |-- FLAG_EMAIL: string (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- id: string (nullable = false)
 |-- DAYS_EMPLOYED: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- AMT_INCOME_TOTAL: string (nullable = true)
 |-- FLAG_PHONE: string (nullable = true)
 |-- FLAG_WORK_PHONE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- CNT_FAM_MEMBERS: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)



### Drop the ids that are auto generated from cosmosDB

In [0]:
application_df = application_df.drop("id")

### Count unique IDs in the application_df

In [0]:
app_unique_count = application_df.select("ID").distinct().count()
print(app_unique_count)

438510


In [0]:
# Since we cannot make a clear assumption about the reason for the changing data, we will generalize and assume that there are repeated data due to incorrect data entries or changes depending on time, and choose the last one.
# # First, order the DataFrame by 'ID' (and other columns if needed)
application_df = application_df.orderBy('ID', ascending=False)

# Drop duplicates based on 'ID', keeping the last occurrence (which is now the first due to descending order)
application_df = application_df.dropDuplicates(subset=['ID'])

### Cast and Rename the Columns,then check for the Schema

In [0]:


# Step 1: Cast the columns
application_df = application_df \
    .withColumn("ID", col("ID").cast("int")) \
    .withColumn("CNT_CHILDREN", col("CNT_CHILDREN").cast("int")) \
    .withColumn("AMT_INCOME_TOTAL", col("AMT_INCOME_TOTAL").cast("float")) \
    .withColumn("DAYS_BIRTH", col("DAYS_BIRTH").cast("int")) \
    .withColumn("DAYS_EMPLOYED", col("DAYS_EMPLOYED").cast("int")) \
    .withColumn("FLAG_MOBIL", col("FLAG_MOBIL").cast("int")) \
    .withColumn("FLAG_WORK_PHONE", col("FLAG_WORK_PHONE").cast("int")) \
    .withColumn("FLAG_PHONE", col("FLAG_PHONE").cast("int")) \
    .withColumn("FLAG_EMAIL", col("FLAG_EMAIL").cast("int")) \
    .withColumn("CNT_FAM_MEMBERS", col("CNT_FAM_MEMBERS").cast("float"))

# Step 2: Rename the columns
application_df = application_df \
    .withColumnRenamed("CODE_GENDER", "gender") \
    .withColumnRenamed("FLAG_OWN_CAR", "own_car") \
    .withColumnRenamed("FLAG_OWN_REALTY", "own_property") \
    .withColumnRenamed("CNT_CHILDREN", "num_child") \
    .withColumnRenamed("AMT_INCOME_TOTAL", "annual_income") \
    .withColumnRenamed("NAME_INCOME_TYPE", "Income_category") \
    .withColumnRenamed("NAME_EDUCATION_TYPE", "education_level") \
    .withColumnRenamed("NAME_FAMILY_STATUS", "Marital_status") \
    .withColumnRenamed("NAME_HOUSING_TYPE", "house_type") \
    .withColumnRenamed("DAYS_BIRTH", "Age") \
    .withColumnRenamed("DAYS_EMPLOYED", "years_employment") \
    .withColumnRenamed("FLAG_MOBIL", "has_mobile") \
    .withColumnRenamed("FLAG_WORK_PHONE", "has_work_phone") \
    .withColumnRenamed("FLAG_PHONE", "has_phone") \
    .withColumnRenamed("FLAG_EMAIL", "has_email") \
    .withColumnRenamed("OCCUPATION_TYPE", "job") \
    .withColumnRenamed("CNT_FAM_MEMBERS", "family_size")

application_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- has_mobile: integer (nullable = true)
 |-- house_type: string (nullable = true)
 |-- job: string (nullable = true)
 |-- own_car: string (nullable = true)
 |-- num_child: integer (nullable = true)
 |-- has_email: integer (nullable = true)
 |-- Income_category: string (nullable = true)
 |-- years_employment: integer (nullable = true)
 |-- education_level: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- annual_income: float (nullable = true)
 |-- has_phone: integer (nullable = true)
 |-- has_work_phone: integer (nullable = true)
 |-- Marital_status: string (nullable = true)
 |-- family_size: float (nullable = true)
 |-- own_property: string (nullable = true)



### Count total null values in each column

In [0]:
from pyspark.sql.functions import col, sum as spark_sum
null_counts = application_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in application_df.columns])

# Show the result
null_counts.show()

+---+---+----------+----------+------+-------+---------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+--------------+-----------+------------+
| ID|Age|has_mobile|house_type|   job|own_car|num_child|has_email|Income_category|years_employment|education_level|gender|annual_income|has_phone|has_work_phone|Marital_status|family_size|own_property|
+---+---+----------+----------+------+-------+---------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+--------------+-----------+------------+
|  0|  0|         0|         0|134187|      0|        0|        0|              0|               0|              0|     0|            0|        0|             0|             0|          0|           0|
+---+---+----------+----------+------+-------+---------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+--------------+-----------+----

#### Fill null values in the 'JOB' column with 'Unknown'

In [0]:
# Fill null values in the 'OCCUPATION_TYPE' column with 'Unknown'
application_df = application_df.fillna({'job': 'Unknown'})

### Find distinct counts for each column in the DataFrame

In [0]:
unique_counts = application_df.select([countDistinct(col(c)).alias(c) for c in application_df.columns])
unique_counts.show()

+------+-----+----------+----------+---+-------+---------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+--------------+-----------+------------+
|    ID|  Age|has_mobile|house_type|job|own_car|num_child|has_email|Income_category|years_employment|education_level|gender|annual_income|has_phone|has_work_phone|Marital_status|family_size|own_property|
+------+-----+----------+----------+---+-------+---------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+--------------+-----------+------------+
|438510|16379|         1|         6| 19|      2|       12|        2|              5|            9406|              5|     2|          866|        2|             2|             5|         13|           2|
+------+-----+----------+----------+---+-------+---------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+--------------+------

### Since has_mobile only constant value of 1, we drop the has_mobile Column

In [0]:
application_df = application_df.drop('has_mobile')
application_df = application_df.drop('has_email')
application_df = application_df.drop('has_work_phone')

### Standardize Categorical Columns

In [0]:

application_df = application_df.withColumn("gender", 
    when(col("gender") == "M", "Male").when(col("gender") == "F", "Female"))

application_df = application_df.withColumn("own_car", 
    when(col("own_car") == "Y", "Yes").otherwise("No"))

application_df= application_df.withColumn("own_property", 
    when(col("own_property") == "Y", "Yes").otherwise("No"))

#### Cleaning names/values of 'education_level', 'marital_status' 'house_type' columns

In [0]:
from pyspark.sql.functions import when, col

# Clean up 'education_level' column
application_df = application_df.withColumn(
    'education_level',
    when(col('education_level') == 'Secondary / secondary special', 'Secondary')
    .otherwise(col('education_level'))
)

# Clean up 'marital_status' column
application_df = application_df.withColumn(
    'Marital_status',
    when(col('Marital_status') == 'Single / not married', 'Single')
    .otherwise(col('Marital_status'))
)

# Clean up 'house_type' column
application_df = application_df.withColumn(
    'house_type',
    when(col('house_type') == 'House / apartment', 'House')
    .otherwise(col('house_type'))
)


#### Convert birth_day to age in years and Convert employment_length to employment years

In [0]:
application_df = application_df.withColumn("Age", ((col("Age") / 365) * -1).cast("int"))
application_df = application_df.withColumn(
    'years_employment',
    when(col('years_employment') < 0, (col('years_employment') / 365) * -1)
    .otherwise(0)
    .cast("int")
)

#### Create a new column 'employment_status' based on 'years_employment'

In [0]:
application_df = application_df.withColumn(
    'employment_status',
    when(col('years_employment') == 0, 'unemployed')
     .otherwise('employed')
)

In [0]:
application_df.show(20)

+-------+---+----------+--------------+-------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+-----------+------------+-----------------+
|     ID|Age|house_type|           job|own_car|num_child|Income_category|years_employment|education_level|gender|annual_income|has_phone|Marital_status|family_size|own_property|employment_status|
+-------+---+----------+--------------+-------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+-----------+------------+-----------------+
|6671725| 61|     House|       Unknown|     No|        0|      Pensioner|               0|      Secondary|Female|     157500.0|        0|       Married|        2.0|         Yes|       unemployed|
|6671727| 35|     House|      Laborers|    Yes|        1|        Working|               4|      Secondary|  Male|     135000.0|        0|       Married|        3.0|         Yes|         employed|
|6671728| 35|     Ho

In [0]:
# Azure Cosmos DB configuration
cosmos_endpoint = "https://capstonecosmoscredit.documents.azure.com:443/"
cosmos_database = "capstone_cosmosDB"
cosmos_collection1 = "Appllication_data_container"
cosmos_collection2 = "credit_record_conatiner"
cosmos_master_key = "VhN35T2koUruJbVqcy6LRO4zr8Iuz7Aug7EMRDRXSza27DJcqG76ViUifvuqBrVaeBXXwaKOfRbGACDbrw5Afg=="

credit_df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", cosmos_endpoint) \
    .option("spark.cosmos.accountKey", cosmos_master_key) \
    .option("spark.cosmos.database", cosmos_database) \
    .option("spark.cosmos.container", cosmos_collection2) \
    .load()
 
credit_df.show(5)

+-------+--------------+------+--------------------+
|     ID|MONTHS_BALANCE|STATUS|                  id|
+-------+--------------+------+--------------------+
|5003471|           -39|     4|bd307592-66fc-46c...|
|5004559|           -39|     4|b6431fe5-5b09-4cf...|
|5003804|           -43|     4|3f01ff58-637b-47c...|
|5009628|           -36|     4|23efdf8a-3d8b-40b...|
|5009754|           -23|     4|32f50588-bffa-451...|
+-------+--------------+------+--------------------+
only showing top 5 rows



In [0]:
credit_df=credit_df.cache()

In [0]:
credit_df.count()

Out[27]: 1048575

In [0]:

credit_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- MONTHS_BALANCE: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- id: string (nullable = false)



#### Drop the ids that are auto generated from cosmosDB

In [0]:
credit_df = credit_df.drop("id")

In [0]:
credit_df.printSchema()



root
 |-- ID: string (nullable = true)
 |-- MONTHS_BALANCE: string (nullable = true)
 |-- STATUS: string (nullable = true)



### Count number of null values in each column

In [0]:
null_counts_df = credit_df.select([(col(c).isNull().cast("int")).alias(c) for c in credit_df.columns])
null_counts_df_agg = null_counts_df.agg(*[sum(col(c)).alias(c) for c in credit_df.columns])
null_counts_df_agg.show()

+---+--------------+------+
| ID|MONTHS_BALANCE|STATUS|
+---+--------------+------+
|  0|             0|     0|
+---+--------------+------+



In [0]:
spark.conf.set("spark.sql.caseSensitive", "false")

#### Casting and Renaming the Columns

In [0]:
credit_df = credit_df \
    .withColumn("ID", col("ID").cast("int")) \
    .withColumn("MONTHS_BALANCE", col("MONTHS_BALANCE").cast("int"))

credit_df = credit_df \
    .withColumnRenamed("MONTHS_BALANCE", "MONTHS_AGO") 

credit_df = credit_df.withColumn("MONTHS_AGO", ((col("MONTHS_AGO")) * -1).cast("int"))

#### Map STATUS values to meaningful descriptions

In [0]:
from pyspark.sql import functions as sql_functions
credit_df = credit_df.withColumn(
    'STATUS_LABEL',when(col('STATUS') == '0', 'Paid Within 1-29 Days').when(col('STATUS') == '1', 'Paid Within 30-59 Days')
    .when(col('STATUS') == '2', 'Paid Within 60-89 Days')
    .when(col('STATUS') == '3', 'Paid Within 90-119 Days')
    .when(col('STATUS') == '4', 'Paid Within 120-149 Days')
    .when(col('STATUS') == '5', 'Bad Debt (Over 150 Days)')
    .when(col('STATUS') == 'C', 'Paid Off This Month')
    .when(col('STATUS') == 'X', 'No Loan This Month')
    
)

credit_df = credit_df.withColumn(
    'RISK_CATEGORY',
    when(col('STATUS') == 'X', 'No Issues/No loan')
    .when(col('STATUS') == 'C', 'Good Credit')
    .when(col('STATUS') == '0', 'Standard')
    .when(col('STATUS') == '1', 'Slight Risk')
    .when(col('STATUS') == '2', 'Moderate Risk')
    .when(col('STATUS') == '3', 'Significant Risk')
    .when(col('STATUS') == '4', 'High Risk')
    .when(col('STATUS') == '5', 'Bad Debt')
    
)


In [0]:
credit_df = credit_df.withColumn(
    'is_delay_month',
    when(col('STATUS') == 'X', 'No')
    .when(col('STATUS') == 'C', 'No')
    .otherwise('Yes')
)


In [0]:
credit_df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- MONTHS_AGO: integer (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- STATUS_LABEL: string (nullable = true)
 |-- RISK_CATEGORY: string (nullable = true)
 |-- is_delay_month: string (nullable = false)



#### Both the dataframes are joined using inner join

In [0]:
df = application_df.join(credit_df, on="ID", how="inner")

#### Shape of the Transformed Data

In [0]:
num_rows = df.count()
num_columns = len(df.columns)

# Print the shape
print(f"Shape of the DataFrame: ({num_rows}, {num_columns})")

Shape of the DataFrame: (777715, 21)


#### Count of all the Null Values in each Column

In [0]:
null_counts_df = df.select([(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts_df_agg = null_counts_df.agg(*[sum(col(c)).alias(c) for c in df.columns])
null_counts_df_agg.show()

+---+---+----------+---+-------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+-----------+------------+-----------------+----------+------+------------+-------------+--------------+
| ID|Age|house_type|job|own_car|num_child|Income_category|years_employment|education_level|gender|annual_income|has_phone|Marital_status|family_size|own_property|employment_status|MONTHS_AGO|STATUS|STATUS_LABEL|RISK_CATEGORY|is_delay_month|
+---+---+----------+---+-------+---------+---------------+----------------+---------------+------+-------------+---------+--------------+-----------+------------+-----------------+----------+------+------------+-------------+--------------+
|  0|  0|         0|  0|      0|        0|              0|               0|              0|     0|            0|        0|             0|          0|           0|                0|         0|     0|           0|            0|             0|
+---+---+----------+---+-------+----

In [0]:
credit_df.show(10)

+-------+----------+------+--------------------+-------------+--------------+
|     ID|MONTHS_AGO|STATUS|        STATUS_LABEL|RISK_CATEGORY|is_delay_month|
+-------+----------+------+--------------------+-------------+--------------+
|5003471|        39|     4|Paid Within 120-1...|    High Risk|           Yes|
|5004559|        39|     4|Paid Within 120-1...|    High Risk|           Yes|
|5003804|        43|     4|Paid Within 120-1...|    High Risk|           Yes|
|5009628|        36|     4|Paid Within 120-1...|    High Risk|           Yes|
|5009754|        23|     4|Paid Within 120-1...|    High Risk|           Yes|
|5009749|         7|     4|Paid Within 120-1...|    High Risk|           Yes|
|5009749|         2|     4|Paid Within 120-1...|    High Risk|           Yes|
|5008827|        24|     4|Paid Within 120-1...|    High Risk|           Yes|
|5009752|        21|     4|Paid Within 120-1...|    High Risk|           Yes|
|5009752|        16|     4|Paid Within 120-1...|    High Risk|  

#### Writing the Transformed Data to Blob Storage

In [0]:
storage_account_name = "capstonecreditsa"
storage_account_key = "XHEwETA/1qyqzNTrS3ANoQSSOuOdyb1+5Cos2jOP/9rO+G9DDkKWME5UwIx82vO0uWegnJStyMXi+AStIXsSwQ=="
container_name = "transformed-data"
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)
output_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/"
df.write.mode("overwrite").option("header", "true").parquet(output_path)