In [3]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config('spark.shuffle.useOldFetchProtocol', 'true'). \
config("spark.sql.warehouse.dir",f"/user/itv016269/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [53]:
cust_trans_schema = "customer_id int, purchase_date date, product_id int, amount double"

In [54]:
cust_transf_df=spark.read \
.format('csv') \
.option("dateFormat","dd/mm/yyyy") \
.schema(cust_trans_schema) \
.load("/public/trendytech/datasets/cust_transf.csv")

A.1 Your marketing team wants to identify the top-selling products based on
revenue for a given time period. The query is expected to be executed
frequently, and the results need to be returned quickly. Design a caching
strategy that efficiently retrieves the top-selling products by revenue.
Additionally, demonstrate the impact of caching by comparing the retrieval
time for Top 10 best-selling products from start_date = "2023-05-01" to
end_date = "2023-06-08" before and after implementing the caching strategy.
[Note : Strategize your caching in such a way that the right Dataframes are
cached at the right time for maximal performance gains]

In [4]:
cust_transf_df.count()

87498290

In [6]:
start_date="2023-05-01"
end_date="2023-06-08"

In [7]:
filtered_df = cust_transf_df.filter((cust_transf_df.purchase_date >= start_date) & (cust_transf_df.purchase_date <= end_date)).cache()

In [15]:
from pyspark.sql.functions import round

revenue_df = filtered_df.groupBy('product_id').sum('amount').withColumnRenamed("sum(amount)", "revenue")
revenue_new_df = revenue_df.select("*",round("revenue",2).alias("new_revenue")).drop("revenue")


In [16]:
revenue_new_df.show()

+----------+--------------+
|product_id|   new_revenue|
+----------+--------------+
|      1005| 2.782856412E8|
|      1008|       5222.91|
|      1010|       7312.91|
|      1002|4.2938362439E8|
|      1015|      12537.91|
|      1001|5.5668264119E8|
|      1006|       3132.91|
|      1007|       4177.91|
|      1003|5.7255922439E8|
|      1014|      11492.91|
|      1004| 2.862080244E8|
|      1011|       8357.91|
|      1012|       9402.91|
|      1013|      10447.91|
|      1009|       6267.91|
+----------+--------------+



In [19]:
top_products = revenue_df.sort("revenue",ascending=False).limit(10)
top_products.show()

+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|      1003| 5.725592243903786E8|
|      1001|  5.56682641192824E8|
|      1002|4.2938362439486474E8|
|      1004|2.8620802440276194E8|
|      1005| 2.782856412021384E8|
|      1015|  12537.909999999963|
|      1014|  11492.909999999963|
|      1013|  10447.909999999963|
|      1012|   9402.909999999965|
|      1011|   8357.909999999967|
+----------+--------------------+



A.2 Find the top 10 customers with maximum transaction amount for the same
date range of start_date = "2023-05-01" to end_date = "2023-06-08"

In [20]:
top_customers = filtered_df.sort("amount",ascending=False).limit(10)
top_customers.show()

+-----------+-------------+----------+------+
|customer_id|purchase_date|product_id|amount|
+-----------+-------------+----------+------+
|       1015|   2023-05-31|      1015| 59.99|
|       1015|   2023-05-31|      1015| 59.99|
|       1015|   2023-05-31|      1015| 59.99|
|       1015|   2023-05-31|      1015| 59.99|
|       1015|   2023-05-31|      1015| 59.99|
|       1015|   2023-05-31|      1015| 59.99|
|       1015|   2023-05-31|      1015| 59.99|
|       1015|   2023-05-31|      1015| 59.99|
|       1015|   2023-05-31|      1015| 59.99|
|       1015|   2023-05-31|      1015| 59.99|
+-----------+-------------+----------+------+



In [4]:
spark.sql("use itv016269_databases")
spark.sql("show tables")

database,tableName,isTemporary
itv016269_databases,cust_transf_ext,False


In [33]:
spark.sql("drop table itv016269_databases.cust_transf_ext ")

In [35]:
spark.sql("create table itv016269_databases.cust_transf_ext ( customer_id int, purchase_date date, product_id int, amount double) using csv location '/public/trendytech/datasets/cust_transf.csv'")

In [5]:
spark.sql("cache table itv016269_databases.cust_transf_ext")

In [58]:
spark.sql("uncache table itv016269_databases.cust_transf_ext")

In [7]:
spark.sql("select * from cust_transf_ext where purchase_date >= '2023-05-01' and purchase_date >= '2023-06-08'")

customer_id,purchase_date,product_id,amount
1005,2023-06-08,1002,29.99
1008,2023-06-08,1004,19.99
1009,2023-06-09,1005,24.99
1010,2023-06-10,1001,49.99
1006,2023-06-11,1003,39.99
1007,2023-06-12,1004,19.99
1008,2023-06-13,1005,24.99
1009,2023-06-14,1001,49.99
1010,2023-06-15,1002,29.99
1013,2023-06-08,1004,19.99


In [8]:
spark.sql("select product_id, sum(amount) as revenue from cust_transf_ext where purchase_date >= '2023-05-01' and purchase_date >= '2023-06-08' group by product_id order by revenue desc limit 10 ").show()

+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|      1001|3.1810436640042645E8|
|      1005|1.5902036640107167E8|
|      1002| 1.431278748007746E8|
|      1003|1.2723538320049205E8|
|      1004|1.2720356640098402E8|
+----------+--------------------+



In [12]:
spark.sql("select customer_id, max(amount) as max_transaction from cust_transf_ext where purchase_date >= '2023-05-01' and purchase_date >= '2023-06-08' group by customer_id  limit 10 ").show()

+-----------+---------------+
|customer_id|max_transaction|
+-----------+---------------+
|       1015|          49.99|
|       1006|          39.99|
|       1007|          19.99|
|       1014|          49.99|
|       1009|          49.99|
|       1010|          49.99|
|       1008|          24.99|
|       1005|          29.99|
|       1013|          24.99|
|       1011|          39.99|
+-----------+---------------+



In [16]:
cust_transf_df.show()

+-----------+-------------+----------+------+
|customer_id|purchase_date|product_id|amount|
+-----------+-------------+----------+------+
|       1001|   2023-05-15|      1001| 49.99|
|       1002|   2023-05-16|      1002| 29.99|
|       1003|   2023-05-17|      1003| 39.99|
|       1004|   2023-05-18|      1004| 19.99|
|       1005|   2023-05-19|      1005| 24.99|
|       1001|   2023-05-20|      1002| 29.99|
|       1002|   2023-05-21|      1003| 39.99|
|       1003|   2023-05-22|      1004| 19.99|
|       1004|   2023-05-23|      1005| 24.99|
|       1005|   2023-05-24|      1001| 49.99|
|       1001|   2023-05-25|      1003| 39.99|
|       1002|   2023-05-26|      1004| 19.99|
|       1003|   2023-05-27|      1005| 24.99|
|       1004|   2023-05-28|      1001| 49.99|
|       1005|   2023-05-29|      1002| 29.99|
|       1001|   2023-05-30|      1003| 39.99|
|       1002|   2023-05-31|      1004| 19.99|
|       1003|   2023-06-01|      1005| 24.99|
|       1004|   2023-06-02|      1

A.4 Find the top 10 regular customers based on the number of distinct months
in which they made purchases

In [19]:
from pyspark.sql.functions import year, month

new_cust_df = cust_transf_df.withColumn("purchase_year",year("purchase_date")).withColumn("purchase_month",month("purchase_date"))
new_cust_df.show()

+-----------+-------------+----------+------+-------------+--------------+
|customer_id|purchase_date|product_id|amount|purchase_year|purchase_month|
+-----------+-------------+----------+------+-------------+--------------+
|       1001|   2023-05-15|      1001| 49.99|         2023|             5|
|       1002|   2023-05-16|      1002| 29.99|         2023|             5|
|       1003|   2023-05-17|      1003| 39.99|         2023|             5|
|       1004|   2023-05-18|      1004| 19.99|         2023|             5|
|       1005|   2023-05-19|      1005| 24.99|         2023|             5|
|       1001|   2023-05-20|      1002| 29.99|         2023|             5|
|       1002|   2023-05-21|      1003| 39.99|         2023|             5|
|       1003|   2023-05-22|      1004| 19.99|         2023|             5|
|       1004|   2023-05-23|      1005| 24.99|         2023|             5|
|       1005|   2023-05-24|      1001| 49.99|         2023|             5|
|       1001|   2023-05-2

In [20]:
from pyspark.sql.functions import countDistinct

distinct_months_df = new_cust_df.groupBy("customer_id","purchase_year","purchase_month").agg(countDistinct("purchase_month").alias("distinct_months"))
distinct_months_df.show()

+-----------+-------------+--------------+---------------+
|customer_id|purchase_year|purchase_month|distinct_months|
+-----------+-------------+--------------+---------------+
|       1015|         2023|             6|              1|
|       1010|         2023|             5|              1|
|       1015|         2023|             5|              1|
|       1008|         2023|             6|              1|
|       1004|         2023|             5|              1|
|       1007|         2023|             6|              1|
|       1009|         2023|             5|              1|
|       1002|         2023|             6|              1|
|       1011|         2023|             5|              1|
|       1014|         2023|             5|              1|
|       1003|         2023|             6|              1|
|       1006|         2023|             6|              1|
|       1001|         2023|             6|              1|
|       1010|         2023|             6|              

In [21]:
regular_customers = distinct_months_df.filter("distinct_months = 1").groupBy("customer_id").count().orderBy("count",ascending = False).limit(10)
regular_customers.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1005|    2|
|       1007|    2|
|       1004|    2|
|       1009|    2|
|       1003|    2|
|       1001|    2|
|       1011|    2|
|       1008|    2|
|       1002|    2|
|       1006|    2|
+-----------+-----+



Without Caching the query took 1.5 mins to execute

Demonstrate the changes in the performance of the query A.4 for the
following persist storage levels.
- MEMORY_ONLY
- MEMORY_ONLY_SER
- MEMORY_AND_DISK
- MEMORY_AND_DISK_SER
- DISK_ONLY

In [28]:
from pyspark.sql.functions import countDistinct, year, month

new_cust_df = cust_transf_df.withColumn("purchase_year",year("purchase_date")).withColumn("purchase_month",month("purchase_date"))

distinct_months_df = new_cust_df.groupBy("customer_id","purchase_year","purchase_month"). \
                    agg(countDistinct("purchase_month").alias("distinct_months")).cache()

In [29]:
regular_customers = distinct_months_df.filter("distinct_months = 1").groupBy("customer_id").count(). \
                    orderBy("count",ascending = False).limit(10)

regular_customers.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1006|    2|
|       1013|    2|
|       1003|    2|
|       1015|    2|
|       1001|    2|
|       1004|    2|
|       1008|    2|
|       1014|    2|
|       1007|    2|
|       1009|    2|
+-----------+-----+



After Caching the query took 0.9 secs to execute

In [30]:
from pyspark.sql.functions import countDistinct, year, month
from pyspark.storagelevel import StorageLevel

new_cust_df = cust_transf_df.withColumn("purchase_year",year("purchase_date")).withColumn("purchase_month",month("purchase_date"))

distinct_months_df = new_cust_df.groupBy("customer_id","purchase_year","purchase_month"). \
                    agg(countDistinct("purchase_month").alias("distinct_months")).persist(StorageLevel.MEMORY_ONLY)

In [31]:
regular_customers = distinct_months_df.filter("distinct_months = 1").groupBy("customer_id").count(). \
                    orderBy("count",ascending = False).limit(10)

regular_customers.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1001|    2|
|       1004|    2|
|       1014|    2|
|       1013|    2|
|       1010|    2|
|       1011|    2|
|       1007|    2|
|       1012|    2|
|       1002|    2|
|       1003|    2|
+-----------+-----+



Persist with Memory only took 1 sec to execute

In [33]:
from pyspark.sql.functions import countDistinct, year, month
from pyspark.storagelevel import StorageLevel

new_cust_df = cust_transf_df.withColumn("purchase_year",year("purchase_date")).withColumn("purchase_month",month("purchase_date"))

distinct_months_df = new_cust_df.groupBy("customer_id","purchase_year","purchase_month"). \
                    agg(countDistinct("purchase_month").alias("distinct_months")).persist(StorageLevel.MEMORY_AND_DISK)

In [34]:
regular_customers = distinct_months_df.filter("distinct_months = 1").groupBy("customer_id").count(). \
                    orderBy("count",ascending = False).limit(10)

regular_customers.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1010|    2|
|       1009|    2|
|       1002|    2|
|       1007|    2|
|       1012|    2|
|       1005|    2|
|       1011|    2|
|       1015|    2|
|       1006|    2|
|       1004|    2|
+-----------+-----+



Persist with Memory and Disk took 1 sec to execute

In [36]:
from pyspark.sql.functions import countDistinct, year, month
from pyspark.storagelevel import StorageLevel

new_cust_df = cust_transf_df.withColumn("purchase_year",year("purchase_date")).withColumn("purchase_month",month("purchase_date"))

distinct_months_df = new_cust_df.groupBy("customer_id","purchase_year","purchase_month"). \
                    agg(countDistinct("purchase_month").alias("distinct_months")).persist(StorageLevel.DISK_ONLY)

In [37]:
regular_customers = distinct_months_df.filter("distinct_months = 1").groupBy("customer_id").count(). \
                    orderBy("count",ascending = False).limit(10)

regular_customers.show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1014|    2|
|       1007|    2|
|       1011|    2|
|       1008|    2|
|       1006|    2|
|       1010|    2|
|       1012|    2|
|       1005|    2|
|       1002|    2|
|       1009|    2|
+-----------+-----+



Persist with Disk only took 1 sec to execute

In [55]:
def get_customer_history(customer_id):
    customer_history_df = cust_transf_df.filter(cust_transf_df.customer_id == customer_id).cache()
    return customer_history_df

In [56]:
customer_id=1001
customer_history_df = get_customer_history(customer_id)
customer_history_df.show()

+-----------+-------------+----------+------+
|customer_id|purchase_date|product_id|amount|
+-----------+-------------+----------+------+
|       1001|   2023-05-15|      1001| 49.99|
|       1001|   2023-05-20|      1002| 29.99|
|       1001|   2023-05-25|      1003| 39.99|
|       1001|   2023-05-30|      1003| 39.99|
|       1001|   2023-06-04|      1003| 39.99|
|       1001|   2023-05-15|      1001| 49.99|
|       1001|   2023-05-20|      1002| 29.99|
|       1001|   2023-05-25|      1003| 39.99|
|       1001|   2023-05-30|      1003| 39.99|
|       1001|   2023-06-04|      1003| 39.99|
|       1001|   2023-05-15|      1001| 49.99|
|       1001|   2023-05-20|      1002| 29.99|
|       1001|   2023-05-25|      1003| 39.99|
|       1001|   2023-05-30|      1003| 39.99|
|       1001|   2023-06-04|      1003| 39.99|
|       1001|   2023-05-15|      1001| 49.99|
|       1001|   2023-05-20|      1002| 29.99|
|       1001|   2023-05-25|      1003| 39.99|
|       1001|   2023-05-30|      1

In [57]:
customer_history_df.unpersist()

customer_id,purchase_date,product_id,amount
1001,2023-05-15,1001,49.99
1001,2023-05-20,1002,29.99
1001,2023-05-25,1003,39.99
1001,2023-05-30,1003,39.99
1001,2023-06-04,1003,39.99
1001,2023-05-15,1001,49.99
1001,2023-05-20,1002,29.99
1001,2023-05-25,1003,39.99
1001,2023-05-30,1003,39.99
1001,2023-06-04,1003,39.99


In [12]:
hotels_schema = "booking_id int, guest_name string, checkin_date date, checkout_date date, room_type string, total_price float"

In [13]:
hotels_df = spark.read. \
format("csv"). \
schema(hotels_schema). \
load("/public/trendytech/datasets/hotel_data.csv")

In [14]:
hotels_df.show()

+----------+-----------------+------------+-------------+---------+-----------+
|booking_id|       guest_name|checkin_date|checkout_date|room_type|total_price|
+----------+-----------------+------------+-------------+---------+-----------+
|         1|         John Doe|  2023-05-01|   2023-05-05| Standard|      400.0|
|         2|       Jane Smith|  2023-05-02|   2023-05-06|   Deluxe|      600.0|
|         3|     Mark Johnson|  2023-05-03|   2023-05-08| Standard|      450.0|
|         4|     Sarah Wilson|  2023-05-04|   2023-05-07|Executive|      750.0|
|         5|      Emily Brown|  2023-05-06|   2023-05-09|   Deluxe|      550.0|
|         6|    Michael Davis|  2023-05-07|   2023-05-10| Standard|      400.0|
|         7|Samantha Thompson|  2023-05-08|   2023-05-12|   Deluxe|      600.0|
|         8|      William Lee|  2023-05-10|   2023-05-13| Standard|      450.0|
|         9|    Amanda Harris|  2023-05-11|   2023-05-16|Executive|      750.0|
|        10|  David Rodriguez|  2023-05-

In [16]:
spark.sql("create table itv016269_databases.hotels(booking_id int, guest_name string, checkin_date date, checkout_date date, room_type string, total_price float) using csv location '/public/trendytech/datasets/hotel_data.csv'")

In [17]:
spark.sql("select * from itv016269_databases.hotels")

booking_id,guest_name,checkin_date,checkout_date,room_type,total_price
1,John Doe,2023-05-01,2023-05-05,Standard,400.0
2,Jane Smith,2023-05-02,2023-05-06,Deluxe,600.0
3,Mark Johnson,2023-05-03,2023-05-08,Standard,450.0
4,Sarah Wilson,2023-05-04,2023-05-07,Executive,750.0
5,Emily Brown,2023-05-06,2023-05-09,Deluxe,550.0
6,Michael Davis,2023-05-07,2023-05-10,Standard,400.0
7,Samantha Thompson,2023-05-08,2023-05-12,Deluxe,600.0
8,William Lee,2023-05-10,2023-05-13,Standard,450.0
9,Amanda Harris,2023-05-11,2023-05-16,Executive,750.0
10,David Rodriguez,2023-05-12,2023-05-15,Deluxe,550.0


In [18]:
spark.sql("select count(booking_id) as total_bookings from itv016269_databases.hotels ") #Execution time 0.2 secs

total_bookings
107


In [20]:
spark.sql("select room_type, round(avg(total_price),2) as total_bookings from itv016269_databases.hotels group by room_type ") #Execution time 0.8 secs

room_type,total_bookings
Executive,750.0
Deluxe,575.58
Standard,425.0


In [23]:
spark.sql("cache lazy table itv016269_databases.hotels ")

# Caching on External Tables

In [24]:
spark.sql("select count(booking_id) as total_bookings from itv016269_databases.hotels ") #Execution time 32 msecs

total_bookings
107


In [25]:
spark.sql("select room_type, round(avg(total_price),2) as total_bookings from itv016269_databases.hotels group by room_type ") #Execution time 0.2 secs

room_type,total_bookings
Executive,750.0
Deluxe,575.58
Standard,425.0
