In [77]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
appName("Sneha Spark Session").\
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [78]:
spark

In [79]:
! hadoop fs -cat /public/trendytech/datasets/cust_transf.csv |head

1001,2023-05-15,1001,49.99
1002,2023-05-16,1002,29.99
1003,2023-05-17,1003,39.99
1004,2023-05-18,1004,19.99
1005,2023-05-19,1005,24.99
1001,2023-05-20,1002,29.99
1002,2023-05-21,1003,39.99
1003,2023-05-22,1004,19.99
1004,2023-05-23,1005,24.99
1005,2023-05-24,1001,49.99
cat: Unable to write to output stream.


In [80]:
trans_schema = 'customer_id long,purchase_date date,product_id integer,amount double'

In [81]:
trans_df = spark.read.format("csv").schema(trans_schema).load('/public/trendytech/datasets/cust_transf.csv')

In [82]:
trans_df.show()

+-----------+-------------+----------+------+
|customer_id|purchase_date|product_id|amount|
+-----------+-------------+----------+------+
|       1001|   2023-05-15|      1001| 49.99|
|       1002|   2023-05-16|      1002| 29.99|
|       1003|   2023-05-17|      1003| 39.99|
|       1004|   2023-05-18|      1004| 19.99|
|       1005|   2023-05-19|      1005| 24.99|
|       1001|   2023-05-20|      1002| 29.99|
|       1002|   2023-05-21|      1003| 39.99|
|       1003|   2023-05-22|      1004| 19.99|
|       1004|   2023-05-23|      1005| 24.99|
|       1005|   2023-05-24|      1001| 49.99|
|       1001|   2023-05-25|      1003| 39.99|
|       1002|   2023-05-26|      1004| 19.99|
|       1003|   2023-05-27|      1005| 24.99|
|       1004|   2023-05-28|      1001| 49.99|
|       1005|   2023-05-29|      1002| 29.99|
|       1001|   2023-05-30|      1003| 39.99|
|       1002|   2023-05-31|      1004| 19.99|
|       1003|   2023-06-01|      1005| 24.99|
|       1004|   2023-06-02|      1

In [83]:
start_date = "2023-05-01"

In [84]:
end_date = "2023-06-30"

In [85]:
filtered_df = trans_df.filter((trans_df.purchase_date>=start_date) & (trans_df.purchase_date<=end_date))

In [86]:
revenue_df =filtered_df.groupBy("product_id").sum("amount").withColumnRenamed("sum(amount)", "revenue")

In [87]:
cust_df = filtered_df.groupBy("customer_id").sum("amount").withColumnRenamed("sum(amount)", "tran_amount")

In [88]:
top_products =revenue_df.sort("revenue",ascending = False).limit(10).show()

+----------+-------------------+
|product_id|            revenue|
+----------+-------------------+
|      1001|8.747870076028482E8|
|      1003|6.997946075949881E8|
|      1002|5.248022075897805E8|
|      1005|4.373060075933379E8|
|      1004|3.498098075985674E8|
|      1015| 12537.909999999963|
|      1014| 11492.909999999963|
|      1013| 10447.909999999963|
|      1012|  9402.909999999965|
|      1011|  8357.909999999967|
+----------+-------------------+



In [89]:
top_cust = cust_df.sort("tran_amount", ascending = False).limit(10).show()

+-----------+--------------------+
|customer_id|         tran_amount|
+-----------+--------------------+
|       1001| 3.180884580005335E8|
|       1004|3.1013425800086874E8|
|       1005|2.6240905800151232E8|
|       1003|2.1468385800145328E8|
|       1002|2.0672965800144076E8|
|       1011|1.9086143271084765E8|
|       1006|1.9085620771084762E8|
|       1015|1.6700301271081635E8|
|       1010|1.6699778771081638E8|
|       1014|1.5109356771079004E8|
+-----------+--------------------+



In [90]:
cached_filtered_df = trans_df.filter((trans_df.purchase_date>=start_date) & (trans_df.purchase_date<=end_date)).cache()

In [91]:
cache_revenue_df =cached_filtered_df.groupBy("product_id").sum("amount").withColumnRenamed("sum(amount)", "revenue")

In [16]:
cached_top_products =cache_revenue_df.sort("revenue",ascending = False).limit(10).show()

+----------+-------------------+
|product_id|            revenue|
+----------+-------------------+
|      1001|8.747870076028482E8|
|      1003| 6.99794607594988E8|
|      1002|5.248022075897805E8|
|      1005|4.373060075933379E8|
|      1004|3.498098075985674E8|
|      1015| 12537.909999999963|
|      1014| 11492.909999999963|
|      1013| 10447.909999999963|
|      1012|  9402.909999999965|
|      1011|  8357.909999999967|
+----------+-------------------+



In [19]:
spark.sql("drop database itv017244_week7assignment_cust_transaction")

In [20]:
spark.sql("create database itv017244_week7assignment_cust_transaction")

In [18]:
spark.sql("drop table itv017244_week7assignment_cust_transaction.transaction_ext")

In [21]:
spark.sql("create table itv017244_week7assignment_cust_transaction.transaction_ext(customer_id long,purchase_date date,product_id integer,transaction_amount double) USING csv location '/user/itv017244/warehouse/assignments_week7/cust_transf.csv'")


In [92]:
spark.sql("select * from itv017244_week7assignment_cust_transaction.transaction_ext limit(10)").show()

+-----------+-------------+----------+------------------+
|customer_id|purchase_date|product_id|transaction_amount|
+-----------+-------------+----------+------------------+
|       1004|   2023-05-28|      1001|             49.99|
|       1005|   2023-05-29|      1002|             29.99|
|       1001|   2023-05-30|      1003|             39.99|
|       1002|   2023-05-31|      1004|             19.99|
|       1003|   2023-06-01|      1005|             24.99|
|       1004|   2023-06-02|      1001|             49.99|
|       1005|   2023-06-03|      1002|             29.99|
|       1001|   2023-06-04|      1003|             39.99|
|       1002|   2023-06-05|      1004|             19.99|
|       1003|   2023-06-06|      1005|             24.99|
+-----------+-------------+----------+------------------+



In [93]:
spark.sql("select product_id, sum(transaction_amount) as revenue from itv017244_week7assignment_cust_transaction.transaction_ext where purchase_date>= '2023-05-01' and purchase_date<= '2023-06-30' group by product_id order by revenue desc limit(10)").show()

+----------+-------------------+
|product_id|            revenue|
+----------+-------------------+
|      1001|8.747870076028482E8|
|      1003| 6.99794607594988E8|
|      1002|5.248022075897805E8|
|      1005|4.373060075933379E8|
|      1004|3.498098075985673E8|
|      1015| 12537.909999999963|
|      1014| 11492.909999999963|
|      1013| 10447.909999999963|
|      1012|  9402.909999999965|
|      1011|  8357.909999999967|
+----------+-------------------+



In [94]:
spark.sql("select customer_id, sum(transaction_amount) as revenue from itv017244_week7assignment_cust_transaction.transaction_ext where purchase_date>= '2023-05-01' and purchase_date<= '2023-06-30' group by customer_id order by revenue desc limit(10)").show()

+-----------+--------------------+
|customer_id|             revenue|
+-----------+--------------------+
|       1001| 3.180884580005336E8|
|       1004| 3.101342580008687E8|
|       1005| 2.624090580015123E8|
|       1003|2.1468385800145325E8|
|       1002| 2.067296580014408E8|
|       1011|1.9086143271084768E8|
|       1006|1.9085620771084768E8|
|       1015|1.6700301271081635E8|
|       1010|1.6699778771081635E8|
|       1014|   1.5109356771079E8|
+-----------+--------------------+



In [95]:
spark.sql("cache table itv017244_week7assignment_cust_transaction.transaction_ext")

In [96]:
spark.sql("select product_id, sum(transaction_amount) as revenue from itv017244_week7assignment_cust_transaction.transaction_ext where purchase_date>= '2023-05-01' and purchase_date<= '2023-06-30' group by product_id order by revenue desc limit(10)").show()

+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|      1001| 8.747870076028483E8|
|      1003| 6.997946075949881E8|
|      1002| 5.248022075897805E8|
|      1005|4.3730600759333783E8|
|      1004| 3.498098075985674E8|
|      1015|  12537.909999999963|
|      1014|  11492.909999999963|
|      1013|  10447.909999999963|
|      1012|   9402.909999999965|
|      1011|   8357.909999999967|
+----------+--------------------+



In [97]:
spark.sql("select customer_id, sum(transaction_amount) as revenue from itv017244_week7assignment_cust_transaction.transaction_ext where purchase_date>= '2023-05-01' and purchase_date<= '2023-06-30' group by customer_id order by revenue desc limit(10)").show()

+-----------+--------------------+
|customer_id|             revenue|
+-----------+--------------------+
|       1001| 3.180884580005336E8|
|       1004|3.1013425800086874E8|
|       1005|2.6240905800151226E8|
|       1003|2.1468385800145328E8|
|       1002|2.0672965800144076E8|
|       1011|1.9086143271084768E8|
|       1006|1.9085620771084768E8|
|       1015|1.6700301271081635E8|
|       1010|1.6699778771081635E8|
|       1014|1.5109356771079004E8|
+-----------+--------------------+



In [98]:
from pyspark.sql.functions import year, month

In [99]:
new_df = trans_df.withColumn("purchase_year",year("purchase_date")).withColumn("purchase_month",month("purchase_date"))


In [100]:
from pyspark.sql.functions import countDistinct

In [101]:
customer_month_counts = new_df.groupBy("customer_id", "purchase_year","purchase_month").agg(countDistinct("purchase_month").alias("distinct_months"))

In [102]:
customer_month_counts.show()

+-----------+-------------+--------------+---------------+
|customer_id|purchase_year|purchase_month|distinct_months|
+-----------+-------------+--------------+---------------+
|       1011|         2023|             5|              1|
|       1002|         2023|             5|              1|
|       1004|         2023|             6|              1|
|       1014|         2023|             5|              1|
|       1004|         2023|             5|              1|
|       1009|         2023|             6|              1|
|       1013|         2023|             6|              1|
|       1002|         2023|             6|              1|
|       1013|         2023|             5|              1|
|       1001|         2023|             6|              1|
|       1007|         2023|             6|              1|
|       1007|         2023|             5|              1|
|       1015|         2023|             6|              1|
|       1006|         2023|             5|              

In [103]:
regular_customers = customer_month_counts.filter("distinct_months = 1").groupBy("customer_id").count().orderBy("count", ascending=False).limit(10).show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1014|    2|
|       1009|    2|
|       1015|    2|
|       1012|    2|
|       1006|    2|
|       1007|    2|
|       1011|    2|
|       1002|    2|
|       1003|    2|
|       1001|    2|
+-----------+-----+



In [104]:
customer_month_counts = new_df.groupBy("customer_id", "purchase_year","purchase_month").agg(countDistinct("purchase_month").alias("distinct_months")).cache()

In [105]:
regular_customers = customer_month_counts.filter("distinct_months = 1").groupBy("customer_id").count().orderBy("count", ascending=False).limit(10).show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1005|    2|
|       1010|    2|
|       1009|    2|
|       1004|    2|
|       1011|    2|
|       1007|    2|
|       1001|    2|
|       1002|    2|
|       1003|    2|
|       1013|    2|
+-----------+-----+



In [106]:
from pyspark.sql.functions import month

In [107]:
from pyspark.sql.functions import year

In [108]:
from pyspark.storagelevel import StorageLevel


In [109]:
new_df = trans_df.withColumn("purchase_year",year("purchase_date")).withColumn("purchase_month",month("purchase_date"))


In [110]:
customer_month_counts = new_df.groupBy("customer_id", "purchase_year","purchase_month").agg(countDistinct("purchase_month").alias("distinct_months")).persist(StorageLevel.MEMORY_AND_DISK)

In [111]:
regular_customers = customer_month_counts.filter("distinct_months = 1").groupBy("customer_id").count().orderBy("count", ascending=False).limit(10).show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1010|    2|
|       1011|    2|
|       1007|    2|
|       1003|    2|
|       1014|    2|
|       1008|    2|
|       1002|    2|
|       1013|    2|
|       1005|    2|
|       1009|    2|
+-----------+-----+



In [112]:
customer_month_counts = new_df.groupBy("customer_id", "purchase_year","purchase_month").agg(countDistinct("purchase_month").alias("distinct_months")).persist(StorageLevel.MEMORY_ONLY)

In [113]:
regular_customers = customer_month_counts.filter("distinct_months = 1").groupBy("customer_id").count().orderBy("count", ascending=False).limit(10).show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1005|    2|
|       1001|    2|
|       1010|    2|
|       1008|    2|
|       1002|    2|
|       1004|    2|
|       1012|    2|
|       1003|    2|
|       1014|    2|
|       1015|    2|
+-----------+-----+



In [114]:
customer_month_counts = new_df.groupBy("customer_id", "purchase_year","purchase_month").agg(countDistinct("purchase_month").alias("distinct_months")).persist(StorageLevel.DISK_ONLY)

In [115]:
regular_customers = customer_month_counts.filter("distinct_months = 1").groupBy("customer_id").count().orderBy("count", ascending=False).limit(10).show()

+-----------+-----+
|customer_id|count|
+-----------+-----+
|       1015|    2|
|       1013|    2|
|       1007|    2|
|       1011|    2|
|       1009|    2|
|       1004|    2|
|       1012|    2|
|       1006|    2|
|       1001|    2|
|       1005|    2|
+-----------+-----+



In [116]:
def get_customer_history(customer_id):
    customer_history_df = trans_df.filter(trans_df.customer_id ==customer_id).cache()
    return customer_history_df

In [117]:
customer_history_df = get_customer_history(1012)
customer_history_df.show()

+-----------+-------------+----------+------+
|customer_id|purchase_date|product_id|amount|
+-----------+-------------+----------+------+
|       1012|   2023-06-02|      1002| 29.99|
|       1012|   2023-06-07|      1003| 39.99|
|       1012|   2023-06-12|      1004| 19.99|
|       1012|   2023-06-02|      1002| 29.99|
|       1012|   2023-06-07|      1003| 39.99|
|       1012|   2023-06-12|      1004| 19.99|
|       1012|   2023-06-02|      1002| 29.99|
|       1012|   2023-06-07|      1003| 39.99|
|       1012|   2023-06-12|      1004| 19.99|
|       1012|   2023-06-02|      1002| 29.99|
|       1012|   2023-06-07|      1003| 39.99|
|       1012|   2023-06-12|      1004| 19.99|
|       1012|   2023-06-02|      1002| 29.99|
|       1012|   2023-06-07|      1003| 39.99|
|       1012|   2023-06-12|      1004| 19.99|
|       1012|   2023-06-02|      1002| 29.99|
|       1012|   2023-06-07|      1003| 39.99|
|       1012|   2023-06-12|      1004| 19.99|
|       1012|   2023-06-02|      1

In [118]:
cached_filtered_df.unpersist()

customer_id,purchase_date,product_id,amount
1001,2023-05-15,1001,49.99
1002,2023-05-16,1002,29.99
1003,2023-05-17,1003,39.99
1004,2023-05-18,1004,19.99
1005,2023-05-19,1005,24.99
1001,2023-05-20,1002,29.99
1002,2023-05-21,1003,39.99
1003,2023-05-22,1004,19.99
1004,2023-05-23,1005,24.99
1005,2023-05-24,1001,49.99


In [119]:
spark.sql("uncache table itv017244_week7assignment_cust_transaction.transaction_ext")

In [56]:
spark.sql("create database itv017244_week7assignment_hotel")

In [120]:
spark.sql("drop table itv017244_week7assignment_hotel.itb017244_hotel_transaction_ext")

In [121]:
spark.sql("create table itv017244_week7assignment_hotel.itb017244_hotel_transaction_ext(booking_id INT,guest_name STRING, checkin_date DATE, checkout_date DATE, room_type STRING, total_price DOUBLE) USING csv location '/user/itv017244/warehouse/assignments_week7/hotel/hotel_data.csv'")

In [122]:
spark.sql("select * from itv017244_week7assignment_hotel.itb017244_hotel_transaction_ext limit 5").show()

+----------+------------+------------+-------------+---------+-----------+
|booking_id|  guest_name|checkin_date|checkout_date|room_type|total_price|
+----------+------------+------------+-------------+---------+-----------+
|         1|    John Doe|  2023-05-01|   2023-05-05| Standard|      400.0|
|         2|  Jane Smith|  2023-05-02|   2023-05-06|   Deluxe|      600.0|
|         3|Mark Johnson|  2023-05-03|   2023-05-08| Standard|      450.0|
|         4|Sarah Wilson|  2023-05-04|   2023-05-07|Executive|      750.0|
|         5| Emily Brown|  2023-05-06|   2023-05-09|   Deluxe|      550.0|
+----------+------------+------------+-------------+---------+-----------+



In [124]:
spark.sql("select room_type,avg(total_price)as avg_total_price from itv017244_week7assignment_hotel.itb017244_hotel_transaction_ext where booking_id<=100 group by room_type limit 5").show()

+---------+----------------+
|room_type| avg_total_price|
+---------+----------------+
|Executive|           750.0|
| Standard|424.390243902439|
|   Deluxe|           575.0|
+---------+----------------+



In [125]:
spark.sql("cache table itv017244_week7assignment_hotel.itb017244_hotel_transaction_ext")

In [126]:
spark.sql("select room_type,avg(total_price)as avg_total_price from itv017244_week7assignment_hotel.itb017244_hotel_transaction_ext where booking_id<=100 group by room_type limit 5").show()

+---------+----------------+
|room_type| avg_total_price|
+---------+----------------+
|   Deluxe|           575.0|
| Standard|424.390243902439|
|Executive|           750.0|
+---------+----------------+



In [127]:
spark.sql("uncache table itv017244_week7assignment_hotel.itb017244_hotel_transaction_ext ")