In [1]:
import pyspark
import time
from functools import wraps
import pyspark.sql.functions as f
def time_it(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"⏱️ Function '{func.__name__}' executed in {end - start:.4f} seconds")
        return result
    return wrapper


In [2]:
spark = pyspark.sql.SparkSession.builder.master("local[4]").appName("Big Data Application").getOrCreate()
spark

## Read CSV & Basic Operations

In [16]:
df = spark.read.format("csv").option("header", "true").load(r"C:\Users\mailv\projects\data\hmda_2017_nationwide_all-records_labels\hmda_2017_nationwide_all-records_labels.csv")


In [11]:
df.count()

14285496

In [12]:
df.rdd.getNumPartitions()

84

In [13]:
df.groupBy("agency_name").count().show()

+--------------------+-------+
|         agency_name|  count|
+--------------------+-------+
|Federal Reserve S...| 366432|
|Department of Hou...|7083670|
|Office of the Com...| 463411|
|Federal Deposit I...|1128554|
|Consumer Financia...|4222898|
|National Credit U...|1020531|
+--------------------+-------+



In [17]:
df.printSchema()

root
 |-- as_of_year: string (nullable = true)
 |-- respondent_id: string (nullable = true)
 |-- agency_name: string (nullable = true)
 |-- agency_abbr: string (nullable = true)
 |-- agency_code: string (nullable = true)
 |-- loan_type_name: string (nullable = true)
 |-- loan_type: string (nullable = true)
 |-- property_type_name: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- loan_purpose_name: string (nullable = true)
 |-- loan_purpose: string (nullable = true)
 |-- owner_occupancy_name: string (nullable = true)
 |-- owner_occupancy: string (nullable = true)
 |-- loan_amount_000s: string (nullable = true)
 |-- preapproval_name: string (nullable = true)
 |-- preapproval: string (nullable = true)
 |-- action_taken_name: string (nullable = true)
 |-- action_taken: string (nullable = true)
 |-- msamd_name: string (nullable = true)
 |-- msamd: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- state_abbr: string (nullable = true)
 |-- state

In [18]:
df.select("state_abbr","loan_purpose_name", "action_taken_name","preapproval_name","denial_reason_name_1","denial_reason_name_2","denial_reason_name_3").show()

+----------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|state_abbr|loan_purpose_name|   action_taken_name|    preapproval_name|denial_reason_name_1|denial_reason_name_2|denial_reason_name_3|
+----------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|        PA|      Refinancing|Application withd...|      Not applicable|                NULL|                NULL|                NULL|
|        WA|      Refinancing|Application denie...|      Not applicable|      Credit history|                NULL|                NULL|
|        UT|      Refinancing|File closed for i...|      Not applicable|                NULL|                NULL|                NULL|
|        MO|      Refinancing|     Loan originated|      Not applicable|                NULL|                NULL|                NULL|
|        IL|      Refinancing|Application withd.

In [14]:
df.groupBy("action_taken_name").count().collect()

[Row(action_taken_name='Application denied by financial institution', count=2009743),
 Row(action_taken_name='Loan originated', count=7339057),
 Row(action_taken_name='Application approved but not accepted', count=409797),
 Row(action_taken_name='File closed for incompleteness', count=601148),
 Row(action_taken_name='Preapproval request approved but not accepted', count=36106),
 Row(action_taken_name='Application withdrawn by applicant', count=1696289),
 Row(action_taken_name='Preapproval request denied by financial institution', count=106680),
 Row(action_taken_name='Loan purchased by the institution', count=2086676)]

In [15]:
df.rdd.getNumPartitions()

84

## Get percentage per action category

In [22]:
from pyspark.sql import functions as f
df_action_taken_name_counts = df.groupBy("action_taken_name").count()
df_action_taken_name_counts.write.format("parquet").save("df_action_taken_name_counts.parquet")
total_row_count = df.count()
print(df_action_taken_name_counts)
print(total_row_count)

DataFrame[action_taken_name: string, count: bigint]
14285496


In [27]:
df_action_taken_name_counts_percentage = df_action_taken_name_counts.withColumn("percentage_action_taken_name",f.round(f.col("count")*100/f.lit(total_row_count),2))
df_action_taken_name_counts_percentage.show()

+--------------------+-------+----------------------------+
|   action_taken_name|  count|percentage_action_taken_name|
+--------------------+-------+----------------------------+
|Application denie...|2009743|                       14.07|
|     Loan originated|7339057|                       51.37|
|Application appro...| 409797|                        2.87|
|File closed for i...| 601148|                        4.21|
|Preapproval reque...|  36106|                        0.25|
|Application withd...|1696289|                       11.87|
|Preapproval reque...| 106680|                        0.75|
|Loan purchased by...|2086676|                       14.61|
+--------------------+-------+----------------------------+



In [29]:
df_action_taken_name_counts.rdd.getNumPartitions()

1

# Save as parquet

In [18]:
df.write.partitionBy("as_of_year").mode("overwrite").parquet(r"C:\Users\mailv\projects\data\hmda_2017_nationwide_all-records_labels\hmda_2017_nationwide_all-records_labels")

# Comparing csv and parquet for count

In [10]:
@time_it
def park_count():
    df_park = spark.read.format("parquet").load(r"C:\Users\mailv\projects\data\hmda_2017_nationwide_all-records_labels\hmda_2017_nationwide_all-records_labels").select("respondent_id","state_abbr")
    return df_park.count()
park_count()

⏱️ Function 'park_count' executed in 1.2319 seconds


14285496

In [12]:
@time_it
def csv_count():
    df_csv = spark.read.format("csv").option("header",True).load(r"C:\Users\mailv\projects\data\hmda_2017_nationwide_all-records_labels\hmda_2017_nationwide_all-records_labels.csv").select("respondent_id","state_abbr")
    return df_csv.count()
csv_count()

⏱️ Function 'csv_count' executed in 13.7547 seconds


14285496

# Comparing parquet,csv for count with a where filter

In [3]:
@time_it
def park_count():
    df_park = spark.read.format("parquet").load(r"C:\Users\mailv\projects\data\hmda_2017_nationwide_all-records_labels\hmda_2017_nationwide_all-records_labels").select("respondent_id","state_abbr").filter(f.col("state_abbr")=="PA")
    return df_park.count()
park_count()

⏱️ Function 'park_count' executed in 5.9587 seconds


473757

In [5]:
@time_it
def csv_count():
    df_csv = spark.read.format("csv").option("header",True).load(r"C:\Users\mailv\projects\data\hmda_2017_nationwide_all-records_labels\hmda_2017_nationwide_all-records_labels.csv").select("respondent_id","state_abbr").filter(f.col("state_abbr")=="PA")
    return df_csv.count()
csv_count()

⏱️ Function 'csv_count' executed in 22.0480 seconds


473757

In [20]:
spark.catalog.currentCatalog()


'spark_catalog'

# Comparing number of partitions 

In [9]:
df = spark.read.format("parquet").load(r"C:\Users\mailv\projects\data\hmda_2017_nationwide_all-records_labels\hmda_2017_nationwide_all-records_labels").select("respondent_id","state_abbr")
print(df.count())
print("num parts",df.rdd.getNumPartitions())
print(" **** After group by ....")
df =df.groupBy("state_abbr").count()
print(df.count())
print("num parts",df.rdd.getNumPartitions())

14285496
num parts 7
 **** After group by ....
54
num parts 1
