
# Starting Session


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CustomerDataProcessing").getOrCreate()

spark

## Loading the Data from DBFS

In [0]:
df = spark.read.format("csv").option("header", "true").load("/FileStore/shared_uploads/santi.job.2022@gmail.com/customers.csv")

In [0]:
df.show(5)
df.printSchema()

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    False|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     True|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     True|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    False|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    False|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: string (

## Using SQL to modify type fo some columns

In [0]:
from pyspark.sql.functions import *

df = df.withColumn('registration_date', to_date(col("registration_date"),'yyyy-MM-dd')) \
    .withColumn('is_active',col("is_active").cast('boolean'))

In [0]:
df.show(5)
df.printSchema()

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registration_date: date (nu

In [0]:
df.count()

Out[26]: 17653

## Dealing with Missing values in Pyspark

In [0]:
df = df.fillna({'city':'Unknown','state':'Unknown','country':'Unknown'})

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+
|customer_id|      name|     city|      state|country|registration_date|is_active|
+-----------+----------+---------+-----------+-------+-----------------+---------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|
+-----------+----------+---------+-----------+-------+-----------------+---------+
only showing top 5 rows



**- Let's first see if there are nulls in the `registration_date` column**

In [0]:
df.filter(col("registration_date").isNull()).count()

Out[15]: 0

In [0]:
df = df.withColumn('registration_year',year(col('registration_date')))\
    .withColumn('registration_month',month(col('registration_date')))

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|customer_id|      name|     city|      state|country|registration_date|is_active|registration_year|registration_month|
+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|             2023|                 6|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|             2023|                12|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|             2023|                10|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|             2023|                10|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|             2023|                 3|
+-----------+----------+---------+------

## More Data Exploration

**- Next, let's explore unique values**

In [0]:
unique_cities = df.select(countDistinct('city')).collect()
unique_cities[0][0]

unique_states = df.select(countDistinct('state')).collect()
unique_states[0][0]

unique_countries = df.select(countDistinct('country')).collect()
unique_countries[0][0]

print(unique_cities[0][0])
print(unique_states[0][0])
print(unique_countries[0][0])

8
7
1


**- More Analysis using with groupBy, pivot and window**

In [0]:
df.show(5)

+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|customer_id|      name|     city|      state|country|registration_date|is_active|registration_year|registration_month|
+-----------+----------+---------+-----------+-------+-----------------+---------+-----------------+------------------+
|          0|Customer_0|     Pune|Maharashtra|  India|       2023-06-29|    false|             2023|                 6|
|          1|Customer_1|Bangalore| Tamil Nadu|  India|       2023-12-07|     true|             2023|                12|
|          2|Customer_2|Hyderabad|    Gujarat|  India|       2023-10-27|     true|             2023|                10|
|          3|Customer_3|Bangalore|  Karnataka|  India|       2023-10-17|    false|             2023|                10|
|          4|Customer_4|Ahmedabad|  Karnataka|  India|       2023-03-14|    false|             2023|                 3|
+-----------+----------+---------+------

In [0]:
df.groupBy('city').count().orderBy(col('count').desc()).show()

+---------+-----+
|     city|count|
+---------+-----+
|     Pune| 2243|
|Hyderabad| 2242|
|  Kolkata| 2223|
|Bangalore| 2211|
|    Delhi| 2200|
|Ahmedabad| 2198|
|  Chennai| 2194|
|   Mumbai| 2142|
+---------+-----+



In [0]:
df.groupBy('state','country').count().orderBy(col('count').desc()).show()

+-----------+-------+-----+
|      state|country|count|
+-----------+-------+-----+
|      Delhi|  India| 2578|
|    Gujarat|  India| 2543|
| Tamil Nadu|  India| 2536|
|  Telangana|  India| 2520|
|West Bengal|  India| 2503|
|Maharashtra|  India| 2490|
|  Karnataka|  India| 2483|
+-----------+-------+-----+



In [0]:
# Pivot Table - Count of Active and Inactive Users Per state

df.groupBy('state').pivot('is_active').count().show()


+-----------+-----+----+
|      state|false|true|
+-----------+-----+----+
|  Karnataka| 1207|1276|
| Tamil Nadu| 1284|1252|
|    Gujarat| 1211|1332|
|      Delhi| 1356|1222|
|  Telangana| 1294|1226|
|Maharashtra| 1260|1230|
|West Bengal| 1306|1197|
+-----------+-----+----+



In [0]:
from pyspark.sql.window import Window

window_spec = Window.partitionBy('state').orderBy(col('registration_date').desc())

df = df.withColumn('rank',rank().over(window_spec))\
    .withColumn('dense_rank',dense_rank().over(window_spec))\
    .withColumn('row_number',row_number().over(window_spec))


In [0]:
df.select('name','city','state','registration_date','rank','dense_rank','row_number').show(10)

+--------------+---------+-----+-----------------+----+----------+----------+
|          name|     city|state|registration_date|rank|dense_rank|row_number|
+--------------+---------+-----+-----------------+----+----------+----------+
|   Customer_61|Hyderabad|Delhi|       2023-12-31|   1|         1|         1|
|  Customer_501|   Mumbai|Delhi|       2023-12-31|   1|         1|         2|
| Customer_2763|     Pune|Delhi|       2023-12-31|   1|         1|         3|
|Customer_12858|Ahmedabad|Delhi|       2023-12-31|   1|         1|         4|
|Customer_13570|Bangalore|Delhi|       2023-12-31|   1|         1|         5|
|Customer_14970|  Chennai|Delhi|       2023-12-31|   1|         1|         6|
|Customer_16447|Ahmedabad|Delhi|       2023-12-31|   1|         1|         7|
|Customer_16709|  Chennai|Delhi|       2023-12-31|   1|         1|         8|
|Customer_17129|  Chennai|Delhi|       2023-12-31|   1|         1|         9|
|  Customer_250|     Pune|Delhi|       2023-12-30|  10|         

In [0]:
df_recent_customers = df.filter(col("registration_date") >= lit('2023-07-01')) # lit uses the value to fill a column with that value repeated on each row
df_recent_customers.count()

Out[36]: 9025

In [0]:
# oldest and the neweset customer per city

df.groupBy('city').agg(min('registration_date').alias('oldest'),max('registration_date').alias('newest')).show()


+---------+----------+----------+
|     city|    oldest|    newest|
+---------+----------+----------+
|Bangalore|2023-01-01|2023-12-31|
|  Chennai|2023-01-01|2023-12-31|
|   Mumbai|2023-01-01|2023-12-31|
|Ahmedabad|2023-01-01|2023-12-31|
|  Kolkata|2023-01-01|2023-12-31|
|     Pune|2023-01-01|2023-12-31|
|    Delhi|2023-01-01|2023-12-31|
|Hyderabad|2023-01-01|2023-12-31|
+---------+----------+----------+



In [0]:
output_path = '/FileStore/tables/processed_customers'
df.write.mode('overwrite').parquet(output_path)