<a href="https://colab.research.google.com/github/sandeepgundeboina/SmallApps/blob/main/SparkChallengeGemini.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Requirements:

Load Data:

    Load the sales_data.csv file into a PySpark DataFrame.
    Infer the schema automatically during loading.

-------------------------------------------------------

Schema and Data Type Adjustments:

    Ensure sale_date is cast to a DateType.
    Ensure amount is cast to a DoubleType.
    All other columns should have appropriate inferred types.

--------------------------------------------------------

Data Cleaning/Validation:

    Handle Missing Values: Identify and fill any missing region values with 'Unknown'.
    Invalid Amounts: Filter out any records where amount is less than or equal to 0.

--------------------------------------------------------

Feature Engineering/Derivation:

    Calculate daily_profit_margin: Assume a fixed profit margin of 20% on all sales. Add a new column daily_profit_margin calculated as amount * 0.20.
    Extract sale_month and sale_year: Create new columns sale_month (integer, e.g., 1 for January) and sale_year (integer, e.g., 2024) from the sale_date column.
    Categorize Sales Volume: Create a new column sales_volume_category based on the amount:
    'Low' if amount < 100
    'Medium' if 100 <= amount < 300
    'High' if amount >= 300

----------------------------------------------------------

Data Aggregation and Summarization:

    Total Sales by Region: Calculate the total amount for each region.
    Average Sales per Product: Calculate the average amount for each product_id.
    Monthly Sales Trends: Calculate the total amount for each sale_year and sale_month combination.
    Top 5 Customers by Spending: Identify the top 5 customer_ids who have spent the most in total.
---------------------------------------------------------
Data Pivoting:

    Sales by Region and Currency: Pivot the data to show the total sales amount for each region, broken down by currency. (This will involve using pivot.)

--------------------------------------------------------

Data Export:

    Write the final transformed DataFrame (after all cleaning and feature engineering, before aggregations) to a Parquet file, partitioned by sale_year and sale_month.

### **Solution**

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('SparkChallengeGemini').getOrCreate()

In [2]:
spark

In [3]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

#### Loading data from CSV file

In [4]:
df=spark.read.format('csv').option('header',True).option('inferSchema',True).load('/content/drive/MyDrive/Abc/SalesData/sales_data.csv')
df.show(5)

+--------------+----------+-----------+----------+------+--------+------+
|transaction_id|product_id|customer_id| sale_date|amount|currency|region|
+--------------+----------+-----------+----------+------+--------+------+
|          1001|      P001|       C101|2024-01-15|150.75|     USD| North|
|          1002|      P002|       C102|2024-01-15| 200.0|     EUR| South|
|          1003|      P003|       C103|2024-01-16| 50.25|     USD|  East|
|          1004|      P001|       C101|2024-01-16| 300.5|     USD| North|
|          1005|      P004|       C104|2024-01-17|  75.0|     GBP|  West|
+--------------+----------+-----------+----------+------+--------+------+
only showing top 5 rows



In [5]:
df.columns

['transaction_id',
 'product_id',
 'customer_id',
 'sale_date',
 'amount',
 'currency',
 'region']

In [6]:
df.dtypes

[('transaction_id', 'int'),
 ('product_id', 'string'),
 ('customer_id', 'string'),
 ('sale_date', 'date'),
 ('amount', 'double'),
 ('currency', 'string'),
 ('region', 'string')]

#### Checking for columns with Null values

In [7]:
null_cnt=df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns])
null_cnt.show()

+--------------+----------+-----------+---------+------+--------+------+
|transaction_id|product_id|customer_id|sale_date|amount|currency|region|
+--------------+----------+-----------+---------+------+--------+------+
|             0|         0|          0|        0|     0|       0|     1|
+--------------+----------+-----------+---------+------+--------+------+



#### Filling Null values with Unknown

In [8]:
df_fill=df.fillna("Unknown",subset=['Region'])

In [9]:
null_cnt=df_fill.select([count(when(col(c).isNull(),c)).alias(c) for c in df_fill.columns])
null_cnt.show()

+--------------+----------+-----------+---------+------+--------+------+
|transaction_id|product_id|customer_id|sale_date|amount|currency|region|
+--------------+----------+-----------+---------+------+--------+------+
|             0|         0|          0|        0|     0|       0|     0|
+--------------+----------+-----------+---------+------+--------+------+



#### Checking for improper amounts

In [10]:
df_drop=df_fill.filter(df_fill.amount>0).drop()

In [11]:
df_drop.filter(df_drop.amount<=0).count()

0

In [12]:
df_drop.show(5)

+--------------+----------+-----------+----------+------+--------+------+
|transaction_id|product_id|customer_id| sale_date|amount|currency|region|
+--------------+----------+-----------+----------+------+--------+------+
|          1001|      P001|       C101|2024-01-15|150.75|     USD| North|
|          1002|      P002|       C102|2024-01-15| 200.0|     EUR| South|
|          1003|      P003|       C103|2024-01-16| 50.25|     USD|  East|
|          1004|      P001|       C101|2024-01-16| 300.5|     USD| North|
|          1005|      P004|       C104|2024-01-17|  75.0|     GBP|  West|
+--------------+----------+-----------+----------+------+--------+------+
only showing top 5 rows



#### Adding Features

In [13]:
df_profit=df_drop.withColumn('profit_margin',round(df_drop['amount']*0.20,2))
df_profit.show(5)

+--------------+----------+-----------+----------+------+--------+------+-------------+
|transaction_id|product_id|customer_id| sale_date|amount|currency|region|profit_margin|
+--------------+----------+-----------+----------+------+--------+------+-------------+
|          1001|      P001|       C101|2024-01-15|150.75|     USD| North|        30.15|
|          1002|      P002|       C102|2024-01-15| 200.0|     EUR| South|         40.0|
|          1003|      P003|       C103|2024-01-16| 50.25|     USD|  East|        10.05|
|          1004|      P001|       C101|2024-01-16| 300.5|     USD| North|         60.1|
|          1005|      P004|       C104|2024-01-17|  75.0|     GBP|  West|         15.0|
+--------------+----------+-----------+----------+------+--------+------+-------------+
only showing top 5 rows



In [14]:
df_sale=df_profit.withColumn('sale_date',to_date(df_profit['sale_date'],'dd-MM-yyyy'))
df_mon=df_sale.withColumn('sale_Month',month(df_sale['sale_date']))
df_year=df_mon.withColumn('sale_Year',year(df_mon['sale_date']))
df_year.show(4)

+--------------+----------+-----------+----------+------+--------+------+-------------+----------+---------+
|transaction_id|product_id|customer_id| sale_date|amount|currency|region|profit_margin|sale_Month|sale_Year|
+--------------+----------+-----------+----------+------+--------+------+-------------+----------+---------+
|          1001|      P001|       C101|2024-01-15|150.75|     USD| North|        30.15|         1|     2024|
|          1002|      P002|       C102|2024-01-15| 200.0|     EUR| South|         40.0|         1|     2024|
|          1003|      P003|       C103|2024-01-16| 50.25|     USD|  East|        10.05|         1|     2024|
|          1004|      P001|       C101|2024-01-16| 300.5|     USD| North|         60.1|         1|     2024|
+--------------+----------+-----------+----------+------+--------+------+-------------+----------+---------+
only showing top 4 rows



In [15]:
df_final=df_year.withColumn('sales_volume_category',when(col('amount')<100, "low").when(((col('amount')>=100) & (col('amount')<300)) ,"medium").otherwise("high"))
df_final.show(4)

+--------------+----------+-----------+----------+------+--------+------+-------------+----------+---------+---------------------+
|transaction_id|product_id|customer_id| sale_date|amount|currency|region|profit_margin|sale_Month|sale_Year|sales_volume_category|
+--------------+----------+-----------+----------+------+--------+------+-------------+----------+---------+---------------------+
|          1001|      P001|       C101|2024-01-15|150.75|     USD| North|        30.15|         1|     2024|               medium|
|          1002|      P002|       C102|2024-01-15| 200.0|     EUR| South|         40.0|         1|     2024|               medium|
|          1003|      P003|       C103|2024-01-16| 50.25|     USD|  East|        10.05|         1|     2024|                  low|
|          1004|      P001|       C101|2024-01-16| 300.5|     USD| North|         60.1|         1|     2024|                 high|
+--------------+----------+-----------+----------+------+--------+------+----------

#### Aggregations

In [16]:
df_region=df_final.groupBy('region').agg(round(sum(col('amount')),2).alias('region_sales'))
df_region.show()

+-------+------------+
| region|region_sales|
+-------+------------+
|Unknown|       250.0|
|  South|      1815.0|
|Central|       900.0|
|   East|      745.25|
|   West|     2920.49|
|  North|     3236.74|
+-------+------------+



In [17]:
df_product=df_final.groupBy('product_id').agg(round(avg(col('amount')),2).alias('avg_sales_per_product')).orderBy(col('avg_sales_per_product').desc())
df_product.show()

+----------+---------------------+
|product_id|avg_sales_per_product|
+----------+---------------------+
|      P009|               1100.0|
|      P008|                500.0|
|      P006|               483.33|
|      P011|                450.0|
|      P012|                250.0|
|      P002|               196.67|
|      P010|                127.5|
|      P001|                112.6|
|      P003|               110.05|
|      P005|                108.1|
|      P004|                100.1|
|      P007|                 47.5|
+----------+---------------------+



In [18]:
df_cus=df_final.groupBy('customer_id').agg(round(sum(col('amount')),2).alias('total_sales_per_cus')).orderBy(col('Total_sales_per_cus').desc())
df_cus.show(5)

+-----------+-------------------+
|customer_id|total_sales_per_cus|
+-----------+-------------------+
|       C111|             2200.0|
|       C101|            1751.74|
|       C108|             1455.0|
|       C113|              900.0|
|       C102|              685.0|
+-----------+-------------------+
only showing top 5 rows



In [19]:
df_trend=df_final.groupBy(['sale_Month','sale_Year']).agg(round(sum(col('amount')),2).alias('total_sales_per_MY'))
df_trend.show()

+----------+---------+------------------+
|sale_Month|sale_Year|total_sales_per_MY|
+----------+---------+------------------+
|         3|     2024|            890.49|
|         1|     2024|           2216.99|
|         5|     2024|            1055.0|
|         2|     2024|            2380.0|
|         4|     2024|            1310.0|
|         6|     2024|            2015.0|
+----------+---------+------------------+



In [20]:
df_sal_R_c=df_final.groupBy(['region']).pivot('currency').agg(round(sum('amount'),2).alias('sal_cur_reg')).fillna(0)
df_sal_R_c.show()

+-------+------+------+-------+
| region|   EUR|   GBP|    USD|
+-------+------+------+-------+
|Unknown|   0.0|   0.0|  250.0|
|  South|1680.0|   0.0|  135.0|
|Central|   0.0|   0.0|  900.0|
|   East| 270.0|   0.0| 475.25|
|   West|   0.0|674.99| 2245.5|
|  North|   0.0|  35.0|3201.74|
+-------+------+------+-------+



In [21]:
df_final.write.parquet('/content/spark-warehouse/agg_data')

In [22]:
!ls -lrt /content/spark-warehouse/agg_data

total 8
-rw-r--r-- 1 root root 4401 Jun 18 17:34 part-00000-ed4648f0-0351-41b4-b594-ccf1fed8ccb2-c000.snappy.parquet
-rw-r--r-- 1 root root    0 Jun 18 17:34 _SUCCESS


**END OF CODE**