In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('SparkAssignment.com').getOrCreate()

In [3]:
# spark.stop()

1. Basic DataFrame Operations

In [4]:
# Loading the sales.csv into separate DataFrame.
sales_df = spark.read.option("header", "true").option("inferSchema","true").csv("sales.txt")
sales_df.show(truncate=False)

+--------+-----------+-------+------+----------+------+
|sales_id|customer_id|product|amount|sale_date |region|
+--------+-----------+-------+------+----------+------+
|1       |101        |Laptop |50000 |2023-01-15|North |
|2       |102        |Mobile |15000 |2023-02-10|South |
|3       |103        |Tablet |20000 |2023-03-05|West  |
|4       |104        |Laptop |55000 |2023-03-15|East  |
|5       |105        |Desktop|40000 |2023-04-20|North |
|6       |101        |Mobile |15000 |2023-05-10|South |
|7       |102        |Laptop |60000 |2023-06-15|East  |
|8       |103        |Tablet |20000 |2023-07-05|North |
|9       |104        |Desktop|45000 |2023-08-10|West  |
|10      |105        |Laptop |70000 |2023-09-25|North |
+--------+-----------+-------+------+----------+------+



In [5]:
# Loading the customer.csv into separate DataFrame.
customer_df = spark.read.option("header", "true").option("inferSchema","true").csv("customer.txt")
customer_df.show(truncate=False)

+-----------+-------------+---------------------+---+---------+
|customer_id|customer_name|email                |age|city     |
+-----------+-------------+---------------------+---+---------+
|101        |Arun Sharma  |arun.sharma@email.com|28 |Delhi    |
|102        |Meena Verma  |meena.verma@email.com|34 |Mumbai   |
|103        |Rahul Yadav  |rahul.yadav@email.com|30 |Bangalore|
|104        |Priya Patel  |priya.patel@email.com|27 |Ahmedabad|
|105        |Sneha Reddy  |sneha.reddy@email.com|29 |Hyderabad|
|106        |Vikas Jain   |vikas.jain@email.com |31 |Chennai  |
|107        |Amit Roy     |amit.roy@email.com   |35 |Kolkata  |
+-----------+-------------+---------------------+---+---------+



In [6]:
# Display the schema of both DataFrames.
sales_df.printSchema()
customer_df.printSchema()

root
 |-- sales_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product: string (nullable = true)
 |-- amount: integer (nullable = true)
 |-- sale_date: date (nullable = true)
 |-- region: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)



In [7]:
# Show the first 5 rows from the sales DataFrame.
sales_df.show(5, truncate=False)

+--------+-----------+-------+------+----------+------+
|sales_id|customer_id|product|amount|sale_date |region|
+--------+-----------+-------+------+----------+------+
|1       |101        |Laptop |50000 |2023-01-15|North |
|2       |102        |Mobile |15000 |2023-02-10|South |
|3       |103        |Tablet |20000 |2023-03-05|West  |
|4       |104        |Laptop |55000 |2023-03-15|East  |
|5       |105        |Desktop|40000 |2023-04-20|North |
+--------+-----------+-------+------+----------+------+
only showing top 5 rows



In [8]:
# Count the number of rows and columns in the customer DataFrame.
row_count = customer_df.count()
print("No. of rows in customers DataFrame are: ", row_count)

col_count = len(customer_df.columns)
print("No. of columns in customers DataFrame are: ", col_count)

No. of rows in customers DataFrame are:  7
No. of columns in customers DataFrame are:  5


2. Data Cleaning

In [9]:
# Remove duplicate rows from the sales DataFrame based on 
# customer_id,product,amount,sale_date,region columns
sales_df = sales_df.dropDuplicates(["customer_id", "product", "amount", "sale_date", "region"])
sales_df.show(truncate=False)

+--------+-----------+-------+------+----------+------+
|sales_id|customer_id|product|amount|sale_date |region|
+--------+-----------+-------+------+----------+------+
|9       |104        |Desktop|45000 |2023-08-10|West  |
|3       |103        |Tablet |20000 |2023-03-05|West  |
|7       |102        |Laptop |60000 |2023-06-15|East  |
|6       |101        |Mobile |15000 |2023-05-10|South |
|1       |101        |Laptop |50000 |2023-01-15|North |
|10      |105        |Laptop |70000 |2023-09-25|North |
|4       |104        |Laptop |55000 |2023-03-15|East  |
|5       |105        |Desktop|40000 |2023-04-20|North |
|8       |103        |Tablet |20000 |2023-07-05|North |
|2       |102        |Mobile |15000 |2023-02-10|South |
+--------+-----------+-------+------+----------+------+



In [10]:
# Drop rows where any column in the customer DataFrame has null values.
customer_df = customer_df.dropna()
customer_df.show(truncate=False)

+-----------+-------------+---------------------+---+---------+
|customer_id|customer_name|email                |age|city     |
+-----------+-------------+---------------------+---+---------+
|101        |Arun Sharma  |arun.sharma@email.com|28 |Delhi    |
|102        |Meena Verma  |meena.verma@email.com|34 |Mumbai   |
|103        |Rahul Yadav  |rahul.yadav@email.com|30 |Bangalore|
|104        |Priya Patel  |priya.patel@email.com|27 |Ahmedabad|
|105        |Sneha Reddy  |sneha.reddy@email.com|29 |Hyderabad|
|106        |Vikas Jain   |vikas.jain@email.com |31 |Chennai  |
|107        |Amit Roy     |amit.roy@email.com   |35 |Kolkata  |
+-----------+-------------+---------------------+---+---------+



In [11]:
# Replace null values in the amount column of the sales DataFrame with 0.
sales_df = sales_df.fillna({'amount': 0})
sales_df.show(truncate=False)

+--------+-----------+-------+------+----------+------+
|sales_id|customer_id|product|amount|sale_date |region|
+--------+-----------+-------+------+----------+------+
|9       |104        |Desktop|45000 |2023-08-10|West  |
|3       |103        |Tablet |20000 |2023-03-05|West  |
|7       |102        |Laptop |60000 |2023-06-15|East  |
|6       |101        |Mobile |15000 |2023-05-10|South |
|1       |101        |Laptop |50000 |2023-01-15|North |
|10      |105        |Laptop |70000 |2023-09-25|North |
|4       |104        |Laptop |55000 |2023-03-15|East  |
|5       |105        |Desktop|40000 |2023-04-20|North |
|8       |103        |Tablet |20000 |2023-07-05|North |
|2       |102        |Mobile |15000 |2023-02-10|South |
+--------+-----------+-------+------+----------+------+



In [12]:
# Replace null values in the email column of the customer DataFrame with the value "unknown".
customer_df = customer_df.fillna({'email': 'unknown'})
customer_df.show(truncate=False)

+-----------+-------------+---------------------+---+---------+
|customer_id|customer_name|email                |age|city     |
+-----------+-------------+---------------------+---+---------+
|101        |Arun Sharma  |arun.sharma@email.com|28 |Delhi    |
|102        |Meena Verma  |meena.verma@email.com|34 |Mumbai   |
|103        |Rahul Yadav  |rahul.yadav@email.com|30 |Bangalore|
|104        |Priya Patel  |priya.patel@email.com|27 |Ahmedabad|
|105        |Sneha Reddy  |sneha.reddy@email.com|29 |Hyderabad|
|106        |Vikas Jain   |vikas.jain@email.com |31 |Chennai  |
|107        |Amit Roy     |amit.roy@email.com   |35 |Kolkata  |
+-----------+-------------+---------------------+---+---------+



3. Column Manipulation

In [13]:
from pyspark.sql.functions import col

# Add a new column discounted_amount to the sales DataFrame that applies a 10% discount on amount.
sales_df = sales_df.withColumn('discounted_amount', col('amount') * 0.9)
sales_df.show(truncate=False)

+--------+-----------+-------+------+----------+------+-----------------+
|sales_id|customer_id|product|amount|sale_date |region|discounted_amount|
+--------+-----------+-------+------+----------+------+-----------------+
|9       |104        |Desktop|45000 |2023-08-10|West  |40500.0          |
|3       |103        |Tablet |20000 |2023-03-05|West  |18000.0          |
|7       |102        |Laptop |60000 |2023-06-15|East  |54000.0          |
|6       |101        |Mobile |15000 |2023-05-10|South |13500.0          |
|1       |101        |Laptop |50000 |2023-01-15|North |45000.0          |
|10      |105        |Laptop |70000 |2023-09-25|North |63000.0          |
|4       |104        |Laptop |55000 |2023-03-15|East  |49500.0          |
|5       |105        |Desktop|40000 |2023-04-20|North |36000.0          |
|8       |103        |Tablet |20000 |2023-07-05|North |18000.0          |
|2       |102        |Mobile |15000 |2023-02-10|South |13500.0          |
+--------+-----------+-------+------+-

In [14]:
# Rename the city column in the customer DataFrame to customer_city.
customer_df = customer_df.withColumnRenamed('city', 'customer_city')
customer_df.show()

+-----------+-------------+--------------------+---+-------------+
|customer_id|customer_name|               email|age|customer_city|
+-----------+-------------+--------------------+---+-------------+
|        101|  Arun Sharma|arun.sharma@email...| 28|        Delhi|
|        102|  Meena Verma|meena.verma@email...| 34|       Mumbai|
|        103|  Rahul Yadav|rahul.yadav@email...| 30|    Bangalore|
|        104|  Priya Patel|priya.patel@email...| 27|    Ahmedabad|
|        105|  Sneha Reddy|sneha.reddy@email...| 29|    Hyderabad|
|        106|   Vikas Jain|vikas.jain@email.com| 31|      Chennai|
|        107|     Amit Roy|  amit.roy@email.com| 35|      Kolkata|
+-----------+-------------+--------------------+---+-------------+



In [15]:
# Drop the region column from the sales DataFrame.
drop_region_df = sales_df.drop('region')
drop_region_df.show()

+--------+-----------+-------+------+----------+-----------------+
|sales_id|customer_id|product|amount| sale_date|discounted_amount|
+--------+-----------+-------+------+----------+-----------------+
|       9|        104|Desktop| 45000|2023-08-10|          40500.0|
|       3|        103| Tablet| 20000|2023-03-05|          18000.0|
|       7|        102| Laptop| 60000|2023-06-15|          54000.0|
|       6|        101| Mobile| 15000|2023-05-10|          13500.0|
|       1|        101| Laptop| 50000|2023-01-15|          45000.0|
|      10|        105| Laptop| 70000|2023-09-25|          63000.0|
|       4|        104| Laptop| 55000|2023-03-15|          49500.0|
|       5|        105|Desktop| 40000|2023-04-20|          36000.0|
|       8|        103| Tablet| 20000|2023-07-05|          18000.0|
|       2|        102| Mobile| 15000|2023-02-10|          13500.0|
+--------+-----------+-------+------+----------+-----------------+



In [16]:
from pyspark.sql.functions import when, col

# Create a new column customer_age_category in the customer DataFrame based on age:
    # a. "Youth" for age < 30
    # b. "Adult" for 30 <= age < 50
    # c. "Senior" for age >= 50

age_category_df = customer_df.withColumn('customer_age_category', when(col('age') < 30, 'Youth').when((col('age') >= 30) & (col('age') < 50), 'Adult').otherwise('Senior'))
age_category_df.show()

+-----------+-------------+--------------------+---+-------------+---------------------+
|customer_id|customer_name|               email|age|customer_city|customer_age_category|
+-----------+-------------+--------------------+---+-------------+---------------------+
|        101|  Arun Sharma|arun.sharma@email...| 28|        Delhi|                Youth|
|        102|  Meena Verma|meena.verma@email...| 34|       Mumbai|                Adult|
|        103|  Rahul Yadav|rahul.yadav@email...| 30|    Bangalore|                Adult|
|        104|  Priya Patel|priya.patel@email...| 27|    Ahmedabad|                Youth|
|        105|  Sneha Reddy|sneha.reddy@email...| 29|    Hyderabad|                Youth|
|        106|   Vikas Jain|vikas.jain@email.com| 31|      Chennai|                Adult|
|        107|     Amit Roy|  amit.roy@email.com| 35|      Kolkata|                Adult|
+-----------+-------------+--------------------+---+-------------+---------------------+



4. Filtering

In [17]:
# Filter the sales DataFrame to show only rows where amount is greater than 50,000.
filtered_amount_df = sales_df.filter(sales_df['amount'] > 50000)
filtered_amount_df.show(truncate=False)

+--------+-----------+-------+------+----------+------+-----------------+
|sales_id|customer_id|product|amount|sale_date |region|discounted_amount|
+--------+-----------+-------+------+----------+------+-----------------+
|7       |102        |Laptop |60000 |2023-06-15|East  |54000.0          |
|10      |105        |Laptop |70000 |2023-09-25|North |63000.0          |
|4       |104        |Laptop |55000 |2023-03-15|East  |49500.0          |
+--------+-----------+-------+------+----------+------+-----------------+



In [18]:
# Filter the customer DataFrame to show customers aged between 25 and 30.
filtered_age_df = customer_df.filter(customer_df['age'].between(25, 30))
filtered_age_df.show(truncate=False)

+-----------+-------------+---------------------+---+-------------+
|customer_id|customer_name|email                |age|customer_city|
+-----------+-------------+---------------------+---+-------------+
|101        |Arun Sharma  |arun.sharma@email.com|28 |Delhi        |
|103        |Rahul Yadav  |rahul.yadav@email.com|30 |Bangalore    |
|104        |Priya Patel  |priya.patel@email.com|27 |Ahmedabad    |
|105        |Sneha Reddy  |sneha.reddy@email.com|29 |Hyderabad    |
+-----------+-------------+---------------------+---+-------------+



In [19]:
from pyspark.sql.functions import countDistinct

# Identify all customers who have made purchases in more than one region.
customer_region_count = sales_df.groupBy('customer_id').agg(countDistinct('region').alias('region_count'))
customer_region_count.show()

+-----------+------------+
|customer_id|region_count|
+-----------+------------+
|        101|           2|
|        103|           2|
|        102|           2|
|        105|           1|
|        104|           2|
+-----------+------------+



In [20]:
# Filter the top 3 sales based on amount for each product.
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

window = Window.partitionBy('product').orderBy(col('amount').desc())
sales_df.withColumn('rank',dense_rank().over(window)).filter(col('rank') <= 3).show()

+--------+-----------+-------+------+----------+------+-----------------+----+
|sales_id|customer_id|product|amount| sale_date|region|discounted_amount|rank|
+--------+-----------+-------+------+----------+------+-----------------+----+
|       9|        104|Desktop| 45000|2023-08-10|  West|          40500.0|   1|
|       5|        105|Desktop| 40000|2023-04-20| North|          36000.0|   2|
|      10|        105| Laptop| 70000|2023-09-25| North|          63000.0|   1|
|       7|        102| Laptop| 60000|2023-06-15|  East|          54000.0|   2|
|       4|        104| Laptop| 55000|2023-03-15|  East|          49500.0|   3|
|       6|        101| Mobile| 15000|2023-05-10| South|          13500.0|   1|
|       2|        102| Mobile| 15000|2023-02-10| South|          13500.0|   1|
|       3|        103| Tablet| 20000|2023-03-05|  West|          18000.0|   1|
|       8|        103| Tablet| 20000|2023-07-05| North|          18000.0|   1|
+--------+-----------+-------+------+----------+----

5. Joins

In [21]:
# 17. Perform an inner join between sales and customer DataFrames on customer_id.
sales_df.join(customer_df, sales_df.customer_id == customer_df.customer_id, "inner").show(truncate=False)

+--------+-----------+-------+------+----------+------+-----------------+-----------+-------------+---------------------+---+-------------+
|sales_id|customer_id|product|amount|sale_date |region|discounted_amount|customer_id|customer_name|email                |age|customer_city|
+--------+-----------+-------+------+----------+------+-----------------+-----------+-------------+---------------------+---+-------------+
|9       |104        |Desktop|45000 |2023-08-10|West  |40500.0          |104        |Priya Patel  |priya.patel@email.com|27 |Ahmedabad    |
|3       |103        |Tablet |20000 |2023-03-05|West  |18000.0          |103        |Rahul Yadav  |rahul.yadav@email.com|30 |Bangalore    |
|7       |102        |Laptop |60000 |2023-06-15|East  |54000.0          |102        |Meena Verma  |meena.verma@email.com|34 |Mumbai       |
|6       |101        |Mobile |15000 |2023-05-10|South |13500.0          |101        |Arun Sharma  |arun.sharma@email.com|28 |Delhi        |
|1       |101       

In [22]:
#  Perform a left join to include all records from sales and matching records from customer
sales_df.join(customer_df, sales_df.customer_id == customer_df.customer_id, "left").show(truncate=False)

+--------+-----------+-------+------+----------+------+-----------------+-----------+-------------+---------------------+---+-------------+
|sales_id|customer_id|product|amount|sale_date |region|discounted_amount|customer_id|customer_name|email                |age|customer_city|
+--------+-----------+-------+------+----------+------+-----------------+-----------+-------------+---------------------+---+-------------+
|9       |104        |Desktop|45000 |2023-08-10|West  |40500.0          |104        |Priya Patel  |priya.patel@email.com|27 |Ahmedabad    |
|3       |103        |Tablet |20000 |2023-03-05|West  |18000.0          |103        |Rahul Yadav  |rahul.yadav@email.com|30 |Bangalore    |
|7       |102        |Laptop |60000 |2023-06-15|East  |54000.0          |102        |Meena Verma  |meena.verma@email.com|34 |Mumbai       |
|6       |101        |Mobile |15000 |2023-05-10|South |13500.0          |101        |Arun Sharma  |arun.sharma@email.com|28 |Delhi        |
|1       |101       

In [23]:
# Perform a full outer join between sales and customer DataFrames.
sales_df.join(customer_df, sales_df.customer_id == customer_df.customer_id, "full").show(truncate=False)

+--------+-----------+-------+------+----------+------+-----------------+-----------+-------------+---------------------+---+-------------+
|sales_id|customer_id|product|amount|sale_date |region|discounted_amount|customer_id|customer_name|email                |age|customer_city|
+--------+-----------+-------+------+----------+------+-----------------+-----------+-------------+---------------------+---+-------------+
|6       |101        |Mobile |15000 |2023-05-10|South |13500.0          |101        |Arun Sharma  |arun.sharma@email.com|28 |Delhi        |
|1       |101        |Laptop |50000 |2023-01-15|North |45000.0          |101        |Arun Sharma  |arun.sharma@email.com|28 |Delhi        |
|7       |102        |Laptop |60000 |2023-06-15|East  |54000.0          |102        |Meena Verma  |meena.verma@email.com|34 |Mumbai       |
|2       |102        |Mobile |15000 |2023-02-10|South |13500.0          |102        |Meena Verma  |meena.verma@email.com|34 |Mumbai       |
|3       |103       

In [24]:
# Identify customers who have not made any purchases by performing an anti-join.
customer_df.join(sales_df, customer_df.customer_id == sales_df.customer_id, "leftanti").show(truncate=False)

+-----------+-------------+--------------------+---+-------------+
|customer_id|customer_name|email               |age|customer_city|
+-----------+-------------+--------------------+---+-------------+
|106        |Vikas Jain   |vikas.jain@email.com|31 |Chennai      |
|107        |Amit Roy     |amit.roy@email.com  |35 |Kolkata      |
+-----------+-------------+--------------------+---+-------------+



6. Aggregations

In [25]:
from pyspark.sql.functions import sum

#  Calculate the total sales amount for each product.
total_sales_df = sales_df.groupBy('product').agg(sum('amount').alias('total_sales_amount'))
total_sales_df.show()

+-------+------------------+
|product|total_sales_amount|
+-------+------------------+
| Laptop|            235000|
| Mobile|             30000|
| Tablet|             40000|
|Desktop|             85000|
+-------+------------------+



In [26]:
from pyspark.sql.functions import avg

# Find the average age of customers in the customer DataFrame.
average_age_df = customer_df.agg(avg('age').alias('average_age'))
average_age_df.show()

+------------------+
|       average_age|
+------------------+
|30.571428571428573|
+------------------+



In [27]:
from pyspark.sql.functions import max, min

# Calculate the maximum and minimum sales amounts in the sales DataFrame.
max_sales_amount = sales_df.agg(max('amount').alias('max_amount'))
min_sales_amount = sales_df.agg(min('amount').alias('min_amount'))

max_sales_amount.show()
min_sales_amount.show()

+----------+
|max_amount|
+----------+
|     70000|
+----------+

+----------+
|min_amount|
+----------+
|     15000|
+----------+



In [28]:
from pyspark.sql.functions import count

#  Group the customer DataFrame by customer_city and count the number of customers in 
# each city.

city_customer_count_df = customer_df.groupBy('customer_city').agg(count('customer_id').alias('customer_count'))
city_customer_count_df.show()

+-------------+--------------+
|customer_city|customer_count|
+-------------+--------------+
|    Bangalore|             1|
|      Chennai|             1|
|       Mumbai|             1|
|    Ahmedabad|             1|
|      Kolkata|             1|
|        Delhi|             1|
|    Hyderabad|             1|
+-------------+--------------+



7. Sorting

In [29]:
# Sort the sales DataFrame by amount in descending order.
sorted_sales_df = sales_df.orderBy(col('amount').desc())
sorted_sales_df.show()

+--------+-----------+-------+------+----------+------+-----------------+
|sales_id|customer_id|product|amount| sale_date|region|discounted_amount|
+--------+-----------+-------+------+----------+------+-----------------+
|      10|        105| Laptop| 70000|2023-09-25| North|          63000.0|
|       7|        102| Laptop| 60000|2023-06-15|  East|          54000.0|
|       4|        104| Laptop| 55000|2023-03-15|  East|          49500.0|
|       1|        101| Laptop| 50000|2023-01-15| North|          45000.0|
|       9|        104|Desktop| 45000|2023-08-10|  West|          40500.0|
|       5|        105|Desktop| 40000|2023-04-20| North|          36000.0|
|       3|        103| Tablet| 20000|2023-03-05|  West|          18000.0|
|       8|        103| Tablet| 20000|2023-07-05| North|          18000.0|
|       6|        101| Mobile| 15000|2023-05-10| South|          13500.0|
|       2|        102| Mobile| 15000|2023-02-10| South|          13500.0|
+--------+-----------+-------+------+-

In [30]:
# Sort the customer DataFrame by age in ascending order.
sorted_customer_df = customer_df.orderBy(col('age').asc())
sorted_customer_df.show(truncate=False)

+-----------+-------------+---------------------+---+-------------+
|customer_id|customer_name|email                |age|customer_city|
+-----------+-------------+---------------------+---+-------------+
|104        |Priya Patel  |priya.patel@email.com|27 |Ahmedabad    |
|101        |Arun Sharma  |arun.sharma@email.com|28 |Delhi        |
|105        |Sneha Reddy  |sneha.reddy@email.com|29 |Hyderabad    |
|103        |Rahul Yadav  |rahul.yadav@email.com|30 |Bangalore    |
|106        |Vikas Jain   |vikas.jain@email.com |31 |Chennai      |
|102        |Meena Verma  |meena.verma@email.com|34 |Mumbai       |
|107        |Amit Roy     |amit.roy@email.com   |35 |Kolkata      |
+-----------+-------------+---------------------+---+-------------+



8. Union Operations

In [31]:
new_customer_df = spark.read.option("header", "true").option("inferSchema","true").csv("new_customer_data.csv")
new_customer_df.show(truncate=False)

new_sales_df = spark.read.option("header", "true").option("inferSchema","true").csv("new_sales_data.csv")
new_sales_df.show(truncate=False)

+-----------+--------------+------------------------+---+-------------+
|customer_id|customer_name |email                   |age|customer_city|
+-----------+--------------+------------------------+---+-------------+
|108        |Pooja Joshi   |pooja.joshi@email.com   |33 |Chennai      |
|109        |Pooja Joshi   |pooja.joshi@email.com   |35 |Kolkata      |
|110        |Vikram Chauhan|vikram.chauhan@email.com|34 |Pune         |
|111        |Neha Gupta    |neha.gupta@email.com    |35 |Kolkata      |
|112        |Sunita Rao    |sunita.rao@email.com    |28 |Lucknow      |
|113        |Sunita Rao    |sunita.rao@email.com    |33 |Lucknow      |
|114        |Rakesh Bansal |rakesh.bansal@email.com |28 |Lucknow      |
|115        |Sunita Rao    |sunita.rao@email.com    |25 |Chennai      |
+-----------+--------------+------------------------+---+-------------+

+--------+-----------+-------+------+----------+------+
|sales_id|customer_id|product|amount|sale_date |region|
+--------+-----------+-

In [32]:
# perform a union operation with the customer DataFrame.
union_df_customer=customer_df.union(new_customer_df)
union_df_customer.show(truncate=False)

+-----------+--------------+------------------------+---+-------------+
|customer_id|customer_name |email                   |age|customer_city|
+-----------+--------------+------------------------+---+-------------+
|101        |Arun Sharma   |arun.sharma@email.com   |28 |Delhi        |
|102        |Meena Verma   |meena.verma@email.com   |34 |Mumbai       |
|103        |Rahul Yadav   |rahul.yadav@email.com   |30 |Bangalore    |
|104        |Priya Patel   |priya.patel@email.com   |27 |Ahmedabad    |
|105        |Sneha Reddy   |sneha.reddy@email.com   |29 |Hyderabad    |
|106        |Vikas Jain    |vikas.jain@email.com    |31 |Chennai      |
|107        |Amit Roy      |amit.roy@email.com      |35 |Kolkata      |
|108        |Pooja Joshi   |pooja.joshi@email.com   |33 |Chennai      |
|109        |Pooja Joshi   |pooja.joshi@email.com   |35 |Kolkata      |
|110        |Vikram Chauhan|vikram.chauhan@email.com|34 |Pune         |
|111        |Neha Gupta    |neha.gupta@email.com    |35 |Kolkata

9. Window Functions

In [33]:
# Rank the sales records based on the amount column.
from pyspark.sql.window import Window
window=window.orderBy('amount')
rank_sales = sales_df.withColumn("sales_rank", dense_rank().over(window))
rank_sales.show()

+--------+-----------+-------+------+----------+------+-----------------+----------+
|sales_id|customer_id|product|amount| sale_date|region|discounted_amount|sales_rank|
+--------+-----------+-------+------+----------+------+-----------------+----------+
|       5|        105|Desktop| 40000|2023-04-20| North|          36000.0|         1|
|       9|        104|Desktop| 45000|2023-08-10|  West|          40500.0|         2|
|       1|        101| Laptop| 50000|2023-01-15| North|          45000.0|         1|
|       4|        104| Laptop| 55000|2023-03-15|  East|          49500.0|         2|
|       7|        102| Laptop| 60000|2023-06-15|  East|          54000.0|         3|
|      10|        105| Laptop| 70000|2023-09-25| North|          63000.0|         4|
|       6|        101| Mobile| 15000|2023-05-10| South|          13500.0|         1|
|       2|        102| Mobile| 15000|2023-02-10| South|          13500.0|         1|
|       3|        103| Tablet| 20000|2023-03-05|  West|          

In [34]:
# Add a cumulative sum of amount for each product in the sales DataFrame.
window = Window.partitionBy("product").orderBy("amount").rowsBetween(Window.unboundedPreceding, 0)
cum_sales = sales_df.withColumn("cum_sum", sum("amount").over(window))
cum_sales.show()

+--------+-----------+-------+------+----------+------+-----------------+-------+
|sales_id|customer_id|product|amount| sale_date|region|discounted_amount|cum_sum|
+--------+-----------+-------+------+----------+------+-----------------+-------+
|       5|        105|Desktop| 40000|2023-04-20| North|          36000.0|  40000|
|       9|        104|Desktop| 45000|2023-08-10|  West|          40500.0|  85000|
|       1|        101| Laptop| 50000|2023-01-15| North|          45000.0|  50000|
|       4|        104| Laptop| 55000|2023-03-15|  East|          49500.0| 105000|
|       7|        102| Laptop| 60000|2023-06-15|  East|          54000.0| 165000|
|      10|        105| Laptop| 70000|2023-09-25| North|          63000.0| 235000|
|       6|        101| Mobile| 15000|2023-05-10| South|          13500.0|  15000|
|       2|        102| Mobile| 15000|2023-02-10| South|          13500.0|  30000|
|       3|        103| Tablet| 20000|2023-03-05|  West|          18000.0|  20000|
|       8|      

In [35]:
#  Add a column that calculates the difference between each customer's amount and the 
# average amount within their product group.
window = Window.partitionBy("product").orderBy("amount").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
diff = sales_df.withColumn("difference", col("amount") - avg("amount").over(window))
diff.show()

+--------+-----------+-------+------+----------+------+-----------------+----------+
|sales_id|customer_id|product|amount| sale_date|region|discounted_amount|difference|
+--------+-----------+-------+------+----------+------+-----------------+----------+
|       5|        105|Desktop| 40000|2023-04-20| North|          36000.0|   -2500.0|
|       9|        104|Desktop| 45000|2023-08-10|  West|          40500.0|    2500.0|
|       1|        101| Laptop| 50000|2023-01-15| North|          45000.0|   -8750.0|
|       4|        104| Laptop| 55000|2023-03-15|  East|          49500.0|   -3750.0|
|       7|        102| Laptop| 60000|2023-06-15|  East|          54000.0|    1250.0|
|      10|        105| Laptop| 70000|2023-09-25| North|          63000.0|   11250.0|
|       6|        101| Mobile| 15000|2023-05-10| South|          13500.0|       0.0|
|       2|        102| Mobile| 15000|2023-02-10| South|          13500.0|       0.0|
|       3|        103| Tablet| 20000|2023-03-05|  West|          

10. Partitioning

In [36]:
# Write the sales DataFrame to a partitioned Parquet file by region.
sales_df.write.partitionBy("region").mode("overwrite").parquet("sales.parquet")

In [37]:
# Partition the customer DataFrame by customer_city and save it as a CSV file
customer_df.write.partitionBy("customer_city").mode("overwrite").csv("customers.csv")

11. Real-World Scenarios

In [48]:
# Calculate the percentage contribution of each product to the total sales.
total_sales=sales_df.groupBy().sum('amount').collect()[0][0]
perc_cont = sales_df.groupBy('product').agg((sum('amount')*100/total_sales).alias('contribution_perc'))

perc_cont.show()

+-------+------------------+
|product| contribution_perc|
+-------+------------------+
| Laptop|60.256410256410255|
| Mobile|7.6923076923076925|
| Tablet|10.256410256410257|
|Desktop|21.794871794871796|
+-------+------------------+



In [39]:
from pyspark.sql.functions import year, sum

# Extract the year from sale_date and group by year to calculate total sales
sales_df = sales_df.withColumn("Year", year("sale_date"))
df_grouped_by_year = sales_df.groupBy("Year").agg(sum("amount").alias("total_sales")) \
    .show()

+----+-----------+
|Year|total_sales|
+----+-----------+
|2023|     390000|
+----+-----------+



In [40]:
# Identify the most purchased product in each region.

sales_df.createOrReplaceTempView("sales")

df_most_purchased =  spark.sql("""
    SELECT region, product
    FROM (
        SELECT region, product, SUM(amount) as TotalAmount,
               ROW_NUMBER() OVER (PARTITION BY region ORDER BY SUM(amount) DESC) as Rank
        FROM sales
        GROUP BY region, product
    ) tmp
    WHERE Rank = 1
""")

df_most_purchased.show()

+------+-------+
|region|product|
+------+-------+
|  East| Laptop|
| North| Laptop|
| South| Mobile|
|  West|Desktop|
+------+-------+



In [47]:
#  Add a column to show the difference between the highest and lowest sales for each 
# product.

grouped_sales_df = sales_df.groupBy("product").agg(max("amount").alias("MaxAmount"),
                                             min("amount").alias("MinAmount"))

sales_diff_df = grouped_sales_df.withColumn("sale_diff", col("MaxAmount") - col("MinAmount"))
sales_diff_df.show()

+-------+---------+---------+---------+
|product|MaxAmount|MinAmount|sale_diff|
+-------+---------+---------+---------+
| Laptop|    70000|    50000|    20000|
| Mobile|    15000|    15000|        0|
| Tablet|    20000|    20000|        0|
|Desktop|    45000|    40000|     5000|
+-------+---------+---------+---------+



In [42]:
# Write the result of the join between sales and customer to parquet file

customer_df_renamed = customer_df.withColumnRenamed("customer_id", "cust_id")

sale_customer_joined = sales_df.join(customer_df_renamed, sales_df['customer_id'] == customer_df_renamed['cust_id'], 'inner')

sale_customer_joined.show(truncate=False)

sale_customer_joined.write.mode('overwrite').parquet('sales_customer_joined.parquet')

+--------+-----------+-------+------+----------+------+-----------------+----+-------+-------------+---------------------+---+-------------+
|sales_id|customer_id|product|amount|sale_date |region|discounted_amount|Year|cust_id|customer_name|email                |age|customer_city|
+--------+-----------+-------+------+----------+------+-----------------+----+-------+-------------+---------------------+---+-------------+
|9       |104        |Desktop|45000 |2023-08-10|West  |40500.0          |2023|104    |Priya Patel  |priya.patel@email.com|27 |Ahmedabad    |
|3       |103        |Tablet |20000 |2023-03-05|West  |18000.0          |2023|103    |Rahul Yadav  |rahul.yadav@email.com|30 |Bangalore    |
|7       |102        |Laptop |60000 |2023-06-15|East  |54000.0          |2023|102    |Meena Verma  |meena.verma@email.com|34 |Mumbai       |
|6       |101        |Mobile |15000 |2023-05-10|South |13500.0          |2023|101    |Arun Sharma  |arun.sharma@email.com|28 |Delhi        |
|1       |101

In [45]:
from pyspark.sql.functions import months_between, current_date

filtered_sales = sales_df.withColumn('months_diff', months_between(current_date(), col('sale_date'))) \
    .filter(col('months_diff') <= 6)

filtered_sales.show()

+--------+-----------+-------+------+---------+------+-----------------+----+-----------+
|sales_id|customer_id|product|amount|sale_date|region|discounted_amount|Year|months_diff|
+--------+-----------+-------+------+---------+------+-----------------+----+-----------+
+--------+-----------+-------+------+---------+------+-----------------+----+-----------+



In [46]:
# Calculate the average sales amount per customer.
avg_sales = spark.sql("select customer_id, avg(amount) as avg_amount from sales group by customer_id order by customer_id")
avg_sales.show()

+-----------+----------+
|customer_id|avg_amount|
+-----------+----------+
|        101|   32500.0|
|        102|   37500.0|
|        103|   20000.0|
|        104|   50000.0|
|        105|   55000.0|
+-----------+----------+

