In [1]:
pip install pyspark

Collecting py4j==0.10.9.7 (from pyspark)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9
    Uninstalling py4j-0.10.9:
      Successfully uninstalled py4j-0.10.9
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pyspark-stubs 3.0.0.post3 requires pyspark<3.1.0,>=3.0.0.dev0, but you have pyspark 3.5.0 which is incompatible.[0m[31m
[0mSuccessfully installed py4j-0.10.9.7
Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install pyspark-stubs

Collecting pyspark<3.1.0,>=3.0.0.dev0 (from pyspark-stubs)
  Using cached pyspark-3.0.3-py2.py3-none-any.whl
Collecting py4j==0.10.9 (from pyspark<3.1.0,>=3.0.0.dev0->pyspark-stubs)
  Using cached py4j-0.10.9-py2.py3-none-any.whl (198 kB)
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninstalling py4j-0.10.9.7:
      Successfully uninstalled py4j-0.10.9.7
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.0
    Can't uninstall 'pyspark'. No files were found to uninstall.
Successfully installed py4j-0.10.9 pyspark-3.0.3
Note: you may need to restart the kernel to use updated packages.


In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import os

In [4]:
from pyspark.sql.functions import *

In [5]:
spark = SparkSession.builder \
        .master("local") \
        .config("spark.sql.autoBroadcastJoinThreshold", -1) \
        .config("spark.executor.memory", "500mb") \
        .appName("Exercise1") \
        .getOrCreate()

In [6]:
products_table = spark.read.parquet("./data/products_parquet")

In [7]:
sales_table = spark.read.parquet("./data/sales_parquet")

In [8]:
sellers_table = spark.read.parquet("./data/sellers_parquet")

In [9]:
print("Number of orders: {}".format(products_table.count()))

Number of orders: 75000000


In [10]:
print("Number of orders: {}".format(sellers_table.count()))

Number of orders: 10


In [11]:
print("Number of orders: {}".format(sales_table.count()))

Number of orders: 994971


In [12]:
print("Number of products sold at least once")
sales_table.agg(countDistinct(col("product_id"))).show()

Number of products sold at least once
+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
|                    988403|
+--------------------------+



In [13]:
print("Product present in more orders")
sales_table.groupBy(col("product_id")).agg(count("*").alias("cnt")).orderBy(col("cnt").desc()).limit(1).show()

Product present in more orders
+----------+---+
|product_id|cnt|
+----------+---+
|  28592106|  3|
+----------+---+



In [14]:
sales_table.groupBy(col("date")).agg(countDistinct(col("product_id")).alias("distinct_products_sold")).orderBy(col("distinct_products_sold").desc()).show()

+----------+----------------------+
|      date|distinct_products_sold|
+----------+----------------------+
|2020-07-06|                100300|
|2020-07-09|                 99946|
|2020-07-01|                 99831|
|2020-07-03|                 99490|
|2020-07-04|                 99301|
|2020-07-02|                 99296|
|2020-07-05|                 99294|
|2020-07-07|                 99213|
|2020-07-08|                 99203|
|2020-07-10|                 98460|
+----------+----------------------+



In [15]:
products_table

DataFrame[product_id: string, product_name: string, price: string]

In [16]:
sales_table

DataFrame[order_id: string, product_id: string, seller_id: string, date: string, num_pieces_sold: string, bill_raw_text: string]

In [17]:
sellers_table

DataFrame[seller_id: string, seller_name: string, daily_target: string]

In [18]:
from pyspark.sql.functions import monotonically_increasing_id

In [19]:
from pyspark.sql.functions import col, concat, lit

In [20]:
joined_table = sales_table.join(products_table, 
                                "product_id", 
                                "inner")

In [21]:
joined_table = joined_table.withColumn("revenue", col("num_pieces_sold") * col("price"))

In [22]:
average_revenue = (joined_table.groupBy().agg({"revenue":"avg"}).withColumnRenamed("avg(revenue)", "average_revenue"))

In [23]:
average_revenue.show()

+-----------------+
|  average_revenue|
+-----------------+
|3814.589524719816|
+-----------------+



In [24]:
# Step 1: Join tables
merged_table = sales_table.join(sellers_table, "seller_id", "inner").join(products_table, "product_id", "inner")

# Step 2: Calculate contribution percentage for each order
merged_table = merged_table.withColumn("contribution_percentage", (col("num_pieces_sold") / col("daily_target")) * 100)

# Step 3: Calculate average contribution percentage for each seller
average_contribution = merged_table.groupBy("seller_id", "seller_name").agg(avg("contribution_percentage").alias("average_contribution"))

# Display the result
average_contribution.show()


+---------+-----------+--------------------+
|seller_id|seller_name|average_contribution|
+---------+-----------+--------------------+
|        6|   seller_6|0.004781768623614526|
|        7|   seller_7|0.002595167082595...|
|        5|   seller_5| 0.00421090304838121|
|        2|   seller_2|0.006691129824818656|
|        1|   seller_1| 0.01963972876684036|
|        3|   seller_3| 0.01628585525508843|
|        9|   seller_9|0.003838322005051...|
|        8|   seller_8|0.009214312305192011|
|        4|   seller_4|0.003295439904807...|
+---------+-----------+--------------------+



In [29]:
# Step 1: Join tables
merged_table = sales_table.join(sellers_table, "seller_id", "inner").join(products_table, "product_id", "inner")

# Step 2: Calculate the number of pieces sold for each seller and product
sales_count = merged_table.groupBy("product_id", "seller_id").agg({"num_pieces_sold": "sum"}).withColumnRenamed("sum(num_pieces_sold)", "total_pieces_sold")

# Step 3: Use window functions to rank sellers based on sales for each product
windowSpec = Window.partitionBy("product_id").orderBy(desc("total_pieces_sold"))

sales_count = sales_count.withColumn("rank", dense_rank().over(windowSpec))

# Step 4: Filter for the second most and least selling sellers for each product
second_most_selling = sales_count.filter(col("rank") == 2)
least_selling = sales_count.filter(col("rank") == 1)

# Display the results
second_most_selling.show()
least_selling.show()

+----------+---------+-----------------+----+
|product_id|seller_id|total_pieces_sold|rank|
+----------+---------+-----------------+----+
|   1015908|        8|             86.0|   2|
|  10277686|        4|             30.0|   2|
|  10282881|        7|             30.0|   2|
|  10646888|        8|             38.0|   2|
|  10669968|        3|             64.0|   2|
|  10707485|        7|             11.0|   2|
|  10851278|        6|             47.0|   2|
|  10989157|        4|              9.0|   2|
|  11191322|        7|             22.0|   2|
|  11201560|        1|             36.0|   2|
|   1121644|        3|             51.0|   2|
|  11230312|        6|              2.0|   2|
|  11374026|        8|              8.0|   2|
|  11408756|        7|             12.0|   2|
|   1143343|        7|             32.0|   2|
|  11476908|        3|             37.0|   2|
|  11650447|        4|             39.0|   2|
|  11809907|        6|             45.0|   2|
|  11853953|        4|            

In [30]:
# Filter for product_id = 0 in second_most_selling DataFrame
filtered_second_most_selling = second_most_selling.filter(col("product_id") == 0)

# Filter for product_id = 0 in least_selling DataFrame
filtered_least_selling = least_selling.filter(col("product_id") == 0)

# Display the filtered results
filtered_second_most_selling.show()
filtered_least_selling.show()


+----------+---------+-----------------+----+
|product_id|seller_id|total_pieces_sold|rank|
+----------+---------+-----------------+----+
+----------+---------+-----------------+----+

+----------+---------+-----------------+----+
|product_id|seller_id|total_pieces_sold|rank|
+----------+---------+-----------------+----+
+----------+---------+-----------------+----+



In [28]:
from pyspark.sql.window import Window

In [34]:
# Define the UDF function
def algo(order_id, bill_text):
    order_id = int(order_id)  # Cast order_id to an integer
    if order_id % 2 == 0:
        # Count the number of capital 'A' in the bill text
        count_A = bill_text.upper().count('A')
        # Iteratively apply MD5
        hashed_result = hashlib.md5(bill_text.encode()).hexdigest()
        for _ in range(count_A):
            hashed_result = hashlib.md5(hashed_result.encode()).hexdigest()
    else:
        # Apply SHA256
        hashed_result = hashlib.sha256(bill_text.encode()).hexdigest()
    
    return hashed_result

# Register the UDF with Spark session
algo_udf = udf(algo, StringType())
spark.udf.register("algo", algo_udf)

# Apply the UDF to create a new column 'hashed_bill'
sales_table = sales_table.withColumn("hashed_bill", algo_udf(col("order_id"), col("bill_raw_text")))

# Show the resulting DataFrame
sales_table.show()

+--------+----------+---------+----------+---------------+--------------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|         hashed_bill|
+--------+----------+---------+----------+---------------+--------------------+--------------------+
|  475442|  59216308|        4|2020-07-07|             11|bdfihbvlcxeqioqgm...|0e3869182df64cff0...|
|  475460|  64349211|        9|2020-07-05|             93|shwfatjysteuxvjfl...|e496b80b6895ecc77...|
|  475745|   6147551|        6|2020-07-07|             33|czhqinijvbhlsljac...|c736acb79325722c4...|
|  475748|  18524928|        1|2020-07-09|              3|nfjxfasnrnivfxfdt...|61ab57bccf538f7d5...|
|  475765|  44920059|        6|2020-07-08|             77|kqhmsiqtofuowcwfv...|6c570d87e88d70f51...|
|  476233|  53991698|        4|2020-07-10|             10|gfsgwwnduxpmevgxc...|449f6dd5d426b81aa...|
|  476247|   9083087|        8|2020-07-02|             45|qbvwimpyblobdeeas...|50bffee08801

In [32]:
import hashlib