In [None]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=99c7cd52131698f2665fcfbb9c383ea606bde8cdbbff27f6e0610e46c1943bbf
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema=StructType([
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_date", DateType(), True),
    StructField("loaction", StringType(), True),
    StructField("source_order", StringType(), True)
])


In [None]:
from pyspark.sql import SparkSession


In [None]:
spark = SparkSession.builder \
    .appName("aiht_project") \
    .getOrCreate()

sales_df = spark.read.format("csv").option("inferschema", "true").schema(schema).load("/content/sales.csv.txt")

display(sales_df)


DataFrame[product_id: int, customer_id: string, order_date: date, loaction: string, source_order: string]

Deriving year

In [None]:
from pyspark.sql.functions import year

sales_df = sales_df.withColumn("order_year", year(sales_df.order_date))

In [None]:
display(sales_df)


DataFrame[product_id: int, customer_id: string, order_date: date, loaction: string, source_order: string, order_year: int, order_month: int, order_quarter: int]

In [None]:

display(sales_df)
sales_df.show(n=5)


DataFrame[product_id: int, customer_id: string, order_date: date, loaction: string, source_order: string, order_year: int, order_month: int, order_quarter: int]

+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|product_id|customer_id|order_date|loaction|source_order|order_year|order_month|order_quarter|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
|         1|          A|2023-01-01|   India|      Swiggy|      2023|       2023|         2023|
|         2|          A|2022-01-01|   India|      Swiggy|      2022|       2022|         2022|
|         2|          A|2023-01-07|   India|      Swiggy|      2023|       2023|         2023|
|         3|          A|2023-01-10|   India|  Restaurant|      2023|       2023|         2023|
|         3|          A|2022-01-11|   India|      Swiggy|      2022|       2022|         2022|
+----------+-----------+----------+--------+------------+----------+-----------+-------------+
only showing top 5 rows



Menu dataframe

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

schema=StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", StringType(), True),

])

menu_df = spark.read.format("csv").option("inferschema", "true").schema(schema).load("/content/menu.csv.txt")
display(menu_df)



DataFrame[product_id: int, product_name: string, price: string]

In [None]:
menu_df.show(n=5)


+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         1|       PIZZA|  100|
|         2|     Chowmin|  150|
|         3|    sandwich|  120|
|         4|        Dosa|  110|
|         5|     Biryani|   80|
+----------+------------+-----+
only showing top 5 rows



Total amount spent by each customer

In [None]:
total_amount_spent =(sales_df.join(menu_df, 'product_id').groupBy('customer_id').agg({'price' : 'sum'}).orderBy('customer_id'))
display(total_amount_spent)
total_amount_spent.show(n=5)


DataFrame[customer_id: string, sum(price): double]

+-----------+----------+
|customer_id|sum(price)|
+-----------+----------+
|          A|    4260.0|
|          B|    4440.0|
|          C|    2400.0|
|          D|    1200.0|
|          E|    2040.0|
+-----------+----------+



Total amount of sales in each month

In [None]:
df1 =(sales_df.join(menu_df, 'product_id').groupBy('order_year').agg({'price' : 'sum'}))
display(df1)
df1.show(n=2)


DataFrame[order_year: int, sum(price): double]

+----------+----------+
|order_year|sum(price)|
+----------+----------+
|      2023|    9990.0|
|      2022|    4350.0|
+----------+----------+



How many times each product purchased

In [None]:
from pyspark.sql.functions import count
most_df = (sales_df.join(menu_df,'product_id').groupBy('product_id','product_name').agg(count('product_id').alias('product_count')).orderBy('product_count',ascending=0)

           )
display(most_df)
most_df.show(n=6)


DataFrame[product_id: int, product_name: string, product_count: bigint]

+----------+------------+-------------+
|product_id|product_name|product_count|
+----------+------------+-------------+
|         3|    sandwich|           48|
|         2|     Chowmin|           24|
|         1|       PIZZA|           21|
|         4|        Dosa|           12|
|         5|     Biryani|            6|
|         6|       Pasta|            6|
+----------+------------+-------------+



Top 5 ordered items

In [None]:
from pyspark.sql.functions import count
most_df = (sales_df.join(menu_df,'product_id').groupBy('product_id','product_name').agg(count('product_id').alias('product_count')).orderBy('product_count',ascending=0).drop('product_id').limit(5)
)

display(most_df)
most_df.show(n=5)


DataFrame[product_name: string, product_count: bigint]

+------------+-------------+
|product_name|product_count|
+------------+-------------+
|    sandwich|           48|
|     Chowmin|           24|
|       PIZZA|           21|
|        Dosa|           12|
|     Biryani|            6|
+------------+-------------+



Frequency of customer visited to restaurant

In [None]:
from pyspark.sql.functions import countDistinct

freq = (sales_df.filter(sales_df.source_order=='Restaurant').groupBy('customer_id').agg(countDistinct('order_date'))

      )
display(freq)
freq.show(n=5)


DataFrame[customer_id: string, count(DISTINCT order_date): bigint]

+-----------+--------------------------+
|customer_id|count(DISTINCT order_date)|
+-----------+--------------------------+
|          E|                         5|
|          B|                         6|
|          D|                         1|
|          C|                         3|
|          A|                         6|
+-----------+--------------------------+

