In [27]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("CRM_EDA").getOrCreate()

df_cust = spark.read.option("header", True).option("inferSchema", True).csv("source_crm/cust_info.csv")

df_prd = spark.read.option("header", True).option("inferSchema", True).csv("source_crm/prd_info.csv")

df_sales = spark.read.option("header", True).option("inferSchema", True).csv("source_crm/sales_details.csv")

In [28]:
print("Customers:", df_cust.count(), "rows")
print("Products :", df_prd.count(), "rows")
print("Sales    :", df_sales.count(), "rows")

df_cust.printSchema()
df_prd.printSchema()
df_sales.printSchema()

Customers: 18494 rows
Products : 397 rows
Sales    : 60398 rows
root
 |-- cst_id: integer (nullable = true)
 |-- cst_key: string (nullable = true)
 |-- cst_firstname: string (nullable = true)
 |-- cst_lastname: string (nullable = true)
 |-- cst_marital_status: string (nullable = true)
 |-- cst_gndr: string (nullable = true)
 |-- cst_create_date: date (nullable = true)

root
 |-- prd_id: integer (nullable = true)
 |-- prd_key: string (nullable = true)
 |-- prd_nm: string (nullable = true)
 |-- prd_cost: integer (nullable = true)
 |-- prd_line: string (nullable = true)
 |-- prd_start_dt: date (nullable = true)
 |-- prd_end_dt: date (nullable = true)

root
 |-- sls_ord_num: string (nullable = true)
 |-- sls_prd_key: string (nullable = true)
 |-- sls_cust_id: integer (nullable = true)
 |-- sls_order_dt: integer (nullable = true)
 |-- sls_ship_dt: integer (nullable = true)
 |-- sls_due_dt: integer (nullable = true)
 |-- sls_sales: integer (nullable = true)
 |-- sls_quantity: integer (nullab

In [29]:
df_cust.show(5)
df_prd.show(5)
df_sales.show(5)

+------+----------+-------------+------------+------------------+--------+---------------+
|cst_id|   cst_key|cst_firstname|cst_lastname|cst_marital_status|cst_gndr|cst_create_date|
+------+----------+-------------+------------+------------------+--------+---------------+
| 11000|AW00011000|          Jon|       Yang |                 M|       M|     2025-10-06|
| 11001|AW00011001|       Eugene|     Huang  |                 S|       M|     2025-10-06|
| 11002|AW00011002|        Ruben|      Torres|                 M|       M|     2025-10-06|
| 11003|AW00011003|      Christy|         Zhu|                 S|       F|     2025-10-06|
| 11004|AW00011004|    Elizabeth|     Johnson|                 S|       F|     2025-10-06|
+------+----------+-------------+------------+------------------+--------+---------------+
only showing top 5 rows

+------+----------------+--------------------+--------+--------+------------+----------+
|prd_id|         prd_key|              prd_nm|prd_cost|prd_line|prd

In [33]:
df_cust.groupBy('cst_id').count().where(f.col('count') > 1).orderBy(f.desc("count")).show()

+------+-----+
|cst_id|count|
+------+-----+
|     0|    4|
| 29466|    3|
| 29483|    2|
| 29473|    2|
| 29449|    2|
| 29433|    2|
+------+-----+



In [34]:
df_cust = df_cust.na.fill({"cst_id": "0000"})

In [37]:
df_cust = df_cust.dropDuplicates(["cst_id"])

In [38]:
df_cust.groupBy('cst_id').count().where(f.col('count') > 1).orderBy(f.desc("count")).show()

+------+-----+
|cst_id|count|
+------+-----+
+------+-----+

