reference https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-24

In [68]:
# Download from source
# import pandas as pd
# from tqdm import tqdm
# import csv
# import random
# import string
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import *

# random.seed(1999)

# letters = string.ascii_lowercase
# letters_upper = string.ascii_uppercase
# for _i in range(0, 10):
#     letters += letters

# for _i in range(0, 10):
#     letters += letters_upper


# def random_string(stringLength=10):
#     """Generate a random string of fixed length """
#     return ''.join(random.sample(letters, stringLength))


# print("Products between {} and {}".format(1, 75000000))
# product_ids = [x for x in range(1, 75000000)]
# dates = ['2020-07-01', '2020-07-02', '2020-07-03', '2020-07-04', '2020-07-05', '2020-07-06', '2020-07-07', '2020-07-08',
#          '2020-07-09', '2020-07-10']
# seller_ids = [x for x in range(1, 10)]


# #   Generate products
# products = [[0, "product_0", 22]]
# for p in tqdm(product_ids):
#     products.append([p, "product_{}".format(p), random.randint(1, 150)])
# #   Save dataframe
# df = pd.DataFrame(products)
# df.columns = ["product_id", "product_name", "price"]
# df.to_csv("products.csv", index=False)
# del df
# del products

# #   Generate sellers
# sellers = [[0, "seller_0", 2500000]]
# for s in tqdm(seller_ids):
#     sellers.append([s, "seller_{}".format(s), random.randint(12000, 2000000)])
# #   Save dataframe
# df = pd.DataFrame(sellers)
# df.columns = ["seller_id", "seller_name", "daily_target"]
# df.to_csv("sellers.csv", index=False)

# #   Generate sales
# total_rows = 500000
# prod_zero = int(total_rows * 0.95)
# prod_others = total_rows - prod_zero + 1
# df_array = [["order_id", "product_id", "seller_id", "date", "num_pieces_sold", "bill_raw_text"]]
# with open('sales.csv', 'w', newline='') as f:
#     csvwriter = csv.writer(f)
#     csvwriter.writerows(df_array)

# order_id = 0
# for i in tqdm(range(0, 40)):
#     df_array = []

#     for i in range(0, prod_zero):
#         order_id += 1
#         df_array.append([order_id, 0, 0, random.choice(dates), random.randint(1, 100), random_string(500)])

#     with open('sales.csv', 'a', newline='') as f:
#         csvwriter = csv.writer(f)
#         csvwriter.writerows(df_array)

#     df_array = []
#     for i in range(0, prod_others):
#         order_id += 1
#         df_array.append(
#             [order_id, random.choice(product_ids), random.choice(seller_ids), random.choice(dates),
#              random.randint(1, 100), random_string(500)])

#     with open('sales.csv', 'a', newline='') as f:
#         csvwriter = csv.writer(f)
#         csvwriter.writerows(df_array)

# print("Done")

# spark = SparkSession.builder \
#     .master("local") \
#     .config("spark.sql.autoBroadcastJoinThreshold", -1) \
#     .appName("Exercise1") \
#     .getOrCreate()

# products = spark.read.csv(
#     "products.csv", header=True, mode="DROPMALFORMED"
# )
# products.show()
# products.write.parquet("products_parquet", mode="overwrite")

# sales = spark.read.csv(
#     "sales.csv", header=True, mode="DROPMALFORMED"
# )
# sales.show()
# sales.repartition(200, col("product_id")).write.parquet("sales_parquet", mode="overwrite")

# sellers = spark.read.csv(
#     "sellers.csv", header=True, mode="DROPMALFORMED"
# )
# sellers.show()
# sellers.write.parquet("sellers_parquet", mode="overwrite")

In [69]:
spark

In [70]:
# spark = SparkSession.builder \
#     .master("local") \
#     .config("spark.sql.autoBroadcastJoinThreshold", -1) \
#     .config("spark.executor.memory", "500mb") \
#     .appName("Exercise1") \
#     .getOrCreate()

#### Warm-Up #1

- Find out how many orders, how many products and how many sellers are in the data.
- How many products have been sold at least once? Which is the product contained in more orders?

In [71]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [72]:
# read from parquet
product_table = spark.read.parquet

In [73]:
# read from cdv
products_table = spark.read\
                .format('csv')\
                .option('header', 'true')\
                .load('./products.csv')

In [74]:
sales_table = spark.read\
                .format('csv')\
                .option('header', 'true')\
                .load('./sales.csv')

In [75]:
sellers_table = spark.read\
                .format('csv')\
                .option('header', 'true')\
                .load('./sellers.csv')

In [76]:
#   Print the number of orders
print("Number of Orders: {}".format(sales_table.count()))

Number of Orders: 20000040


In [77]:
#   Print the number of sellers
print("Number of sellers: {}".format(sellers_table.count()))

Number of sellers: 10


In [78]:
#   Print the number of products
print("Number of products: {}".format(products_table.count()))

Number of products: 75000000


In [79]:
#   Output how many products have been actually sold at least once
print("Number of products sold at least once")
sales_table.agg(countDistinct(col("product_id")).alias("products_sold_at_least_one_count")).show()

Number of products sold at least once
+--------------------------------+
|products_sold_at_least_one_count|
+--------------------------------+
|                          993299|
+--------------------------------+



In [80]:
#   Output which is the product that has been sold in more orders
print("Product present in more orders")
sales_table.groupBy(col("product_id"))\
            .agg(count("*").alias("cnt"))\
                 .orderBy(col("cnt").desc())\
                 .limit(1)\
                 .show()

Product present in more orders
+----------+--------+
|product_id|     cnt|
+----------+--------+
|         0|19000000|
+----------+--------+



#### Warm-up #2

How many distinct products have been sold in each day?

In [81]:
sales_table.groupby(col("date"))\
            .agg(countDistinct(col("product_id"))\
                 .alias("distinct_products_sold"))\
                    .orderBy(col("distinct_products_sold").desc())\
                        .show()

+----------+----------------------+
|      date|distinct_products_sold|
+----------+----------------------+
|2020-07-04|                100294|
|2020-07-03|                100224|
|2020-07-10|                100218|
|2020-07-08|                100048|
|2020-07-05|                 99991|
|2020-07-06|                 99869|
|2020-07-09|                 99801|
|2020-07-02|                 99768|
|2020-07-01|                 99755|
|2020-07-07|                 99453|
+----------+----------------------+



#### Exercise #1

##### What is the average revenue of the orders?

In [82]:
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
# revenue = price * quantity

In [83]:
# do the join the print the results
sales_table.join(products_table, sales_table["product_id"]==products_table["product_id"], "inner")\
                     .agg(avg(products_table["price"]*sales_table["num_pieces_sold"])).show()

+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|            1245.9236386027228|
+------------------------------+



- The important thing to observe here is that we are NOT salting ALL the products, but only those that drive skewness (in the example we are getting the 100 most frequent products). 
- Salting the whole dataset would be problematic since the number of rows would grow linearly on the “salting factor”:

In [84]:
# Step 1 - Check and select the skewed keys 
# In this case we are retrieving the top 100 keys: these will be the only salted keys.
results = sales_table.groupby(sales_table["product_id"]).count().sort(col("count").desc()).limit(100).collect()


In [85]:
# Step 2 - What we want to do is:
#  a. Duplicate the entries that we have in the dimension table for the most common products, e.g.
#       product_0 will become: product_0-1, product_0-2, product_0-3 and so on
#  b. On the sales table, we are going to replace "product_0" with a random duplicate (e.g. some of them 
#     will be replaced with product_0-1, others with product_0-2, etc.)
# Using the new "salted" key will unskew the join

# Let's create a dataset to do the trick
REPLICATION_FACTOR = 101
l = []
replicated_products = []
for _r in results:
    replicated_products.append(_r["product_id"])
    for _rep in range(0, REPLICATION_FACTOR):
        l.append((_r["product_id"], _rep))
rdd = spark.sparkContext.parallelize(l)
replicated_df = rdd.map(lambda x: Row(product_id=x[0], replication=int(x[1])))
replicated_df = spark.createDataFrame(replicated_df)

In [86]:
# Step 3: Generate the salted key
products_table = products_table.join(broadcast(replicated_df),
                                     products_table["product_id"] == replicated_df["product_id"], "left"). \
    withColumn("salted_join_key", when(replicated_df["replication"].isNull(), products_table["product_id"]).otherwise(
    concat(replicated_df["product_id"], lit("-"), replicated_df["replication"])))

sales_table = sales_table.withColumn("salted_join_key", when(sales_table["product_id"].isin(replicated_products),
                                                             concat(sales_table["product_id"], lit("-"),
                                                                    round(rand() * (REPLICATION_FACTOR - 1), 0).cast(
                                                                        IntegerType()))).otherwise(
    sales_table["product_id"]))

In [87]:
#   Step 4: Finally let's do the join
print(sales_table.join(products_table, sales_table["salted_join_key"] == products_table["salted_join_key"],
                       "inner").
      agg(avg(products_table["price"] * sales_table["num_pieces_sold"])).show())

print("Ok")

+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|            1245.9236386027228|
+------------------------------+

None
Ok
