In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config('spark.sql.warehouse.dir',f'/user/{username}/warehouse'). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
!hadoop fs -head /public/trendytech/datasets/cust_transf.csv

1001,2023-05-15,1001,49.99
1002,2023-05-16,1002,29.99
1003,2023-05-17,1003,39.99
1004,2023-05-18,1004,19.99
1005,2023-05-19,1005,24.99
1001,2023-05-20,1002,29.99
1002,2023-05-21,1003,39.99
1003,2023-05-22,1004,19.99
1004,2023-05-23,1005,24.99
1005,2023-05-24,1001,49.99
1001,2023-05-25,1003,39.99
1002,2023-05-26,1004,19.99
1003,2023-05-27,1005,24.99
1004,2023-05-28,1001,49.99
1005,2023-05-29,1002,29.99
1001,2023-05-30,1003,39.99
1002,2023-05-31,1004,19.99
1003,2023-06-01,1005,24.99
1004,2023-06-02,1001,49.99
1005,2023-06-03,1002,29.99
1001,2023-06-04,1003,39.99
1002,2023-06-05,1004,19.99
1003,2023-06-06,1005,24.99
1004,2023-06-07,1001,49.99
1005,2023-06-08,1002,29.99
1006,2023-06-01,1001,49.99
1007,2023-06-02,1002,29.99
1008,2023-06-03,1003,39.99
1009,2023-06-04,1004,19.99
1010,2023-06-05,1005,24.99
1006,2023-06-06,1002,29.99
1007,2023-06-07,1003,39.99
1008,2023-06-08,1004,19.99
1009,2023-06-09,1005,24.99
1010,2023-06-10,1001,49.99
1006,2023-06-11,1003,39.99
1007,2023-06-12,1004,19.99
1

In [2]:
orders_schema = 'cust_id long, purchase_date date, product_id long, amount float'
orders_df = spark.read. \
format('csv'). \
schema(orders_schema). \
load('/public/trendytech/datasets/cust_transf.csv')

In [4]:
orders_df.show()

+-------+-------------+----------+------+
|cust_id|purchase_date|product_id|amount|
+-------+-------------+----------+------+
|   1001|   2023-05-15|      1001| 49.99|
|   1002|   2023-05-16|      1002| 29.99|
|   1003|   2023-05-17|      1003| 39.99|
|   1004|   2023-05-18|      1004| 19.99|
|   1005|   2023-05-19|      1005| 24.99|
|   1001|   2023-05-20|      1002| 29.99|
|   1002|   2023-05-21|      1003| 39.99|
|   1003|   2023-05-22|      1004| 19.99|
|   1004|   2023-05-23|      1005| 24.99|
|   1005|   2023-05-24|      1001| 49.99|
|   1001|   2023-05-25|      1003| 39.99|
|   1002|   2023-05-26|      1004| 19.99|
|   1003|   2023-05-27|      1005| 24.99|
|   1004|   2023-05-28|      1001| 49.99|
|   1005|   2023-05-29|      1002| 29.99|
|   1001|   2023-05-30|      1003| 39.99|
|   1002|   2023-05-31|      1004| 19.99|
|   1003|   2023-06-01|      1005| 24.99|
|   1004|   2023-06-02|      1001| 49.99|
|   1005|   2023-06-03|      1002| 29.99|
+-------+-------------+----------+

### Q.1) Find top selling products.

In [3]:
start_date= "2023-05-01"
end_date = "2023-06-08"

In [4]:
filt_df = orders_df.filter((orders_df.purchase_date >= start_date) & (orders_df.purchase_date <= end_date)).cache()

In [7]:
filt_df.count()

65226530

In [8]:
revenue_df = filt_df.groupBy("product_id").sum("amount").withColumnRenamed("sum(amount)", "revenue")

In [9]:
revenue_df.show()

+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|      1010|   7312.910350799561|
|      1002| 4.293836211229706E8|
|      1012|    9402.91035079956|
|      1009|   6267.909952163696|
|      1013|   10447.91035079956|
|      1007|   4177.909952163696|
|      1011|    8357.91035079956|
|      1005|2.7828563865119934E8|
|      1001| 5.566826598912048E8|
|      1015|   12537.91035079956|
|      1014|   11492.91035079956|
|      1008|   5222.909952163696|
|      1004| 2.862080211229706E8|
|      1006|  3132.9099521636963|
|      1003| 5.725592484315491E8|
+----------+--------------------+



In [10]:
top_products = revenue_df.sort("revenue",ascending = False).limit(10).show()

+----------+--------------------+
|product_id|             revenue|
+----------+--------------------+
|      1003| 5.725592484315491E8|
|      1001| 5.566826598912048E8|
|      1002| 4.293836211229706E8|
|      1004| 2.862080211229706E8|
|      1005|2.7828563865119934E8|
|      1015|   12537.91035079956|
|      1014|   11492.91035079956|
|      1013|   10447.91035079956|
|      1012|    9402.91035079956|
|      1011|    8357.91035079956|
+----------+--------------------+



### Q2. Find the top 10 customers.

In [17]:
filt_cust = filt_df.groupBy("cust_id").sum("amount").withColumnRenamed("sum(amount)", "cust_amount")

In [18]:
filt_cust.show()

+-------+--------------------+
|cust_id|         cust_amount|
+-------+--------------------+
|   1010| 3.976240414623642E7|
|   1002| 2.067296592137146E8|
|   1012|1.1133638841640854E8|
|   1009|3.1807159145837784E7|
|   1013| 9.542903341640854E7|
|   1007| 1.113311634160099E8|
|   1011|1.2724374341640854E8|
|   1005| 2.624090592137146E8|
|   1001| 3.180884683165741E8|
|   1015| 3.976762914623642E7|
|   1014| 3.181238414623642E7|
|   1008|  9.54238084160099E7|
|   1004| 3.101342652822876E8|
|   1006| 1.272385184160099E8|
|   1003| 2.146838592137146E8|
+-------+--------------------+



In [19]:
top_cust = filt_cust.sort("cust_amount",ascending = False).limit(10)

In [20]:
top_cust.show()

+-------+--------------------+
|cust_id|         cust_amount|
+-------+--------------------+
|   1001| 3.180884683165741E8|
|   1004| 3.101342652822876E8|
|   1005| 2.624090592137146E8|
|   1003| 2.146838592137146E8|
|   1002| 2.067296592137146E8|
|   1011|1.2724374341640854E8|
|   1006| 1.272385184160099E8|
|   1012|1.1133638841640854E8|
|   1007| 1.113311634160099E8|
|   1013| 9.542903341640854E7|
+-------+--------------------+

