# PySpark Exercises

This notebook solves PySpark exercises, using both RDDs and Spark DataFrames. Original exercises were taken from [Six Spark Exercises to Rule Them All](https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565).

## Dataset

Schema of the dataset can be seen in image below.

![dataset](https://miro.medium.com/max/700/1*wA4xJu3LMcm_vR5pFJkLpA.png)

The metadata of the table is the following:

- Sales
    - `order_id`: The order ID
    - `product_id`: The single product sold in the order. All orders have exactly one product)
    - `seller_id`: The selling employee ID that sold the product
    - `num_pieces_sold`: The number of units sold for the specific product in the order
    - `bill_raw_text`: A string that represents the raw text of the bill associated with the order
    - `date`: The date of the order.
- Products
    - `product_id`: The product ID
    - `product_name`: The product name
    - `price`: The product price
- Sellers
    - `seller_id`: The seller ID
    - `seller_name`: The seller name
    - `daily_target`: The number of items (regardless of the product type) that the seller needs to hit his/her quota. For example, if the daily target is 100,000, the employee needs to sell 100,000 products he can hit the quota by selling 100,000 units of product_0, but also selling 30,000 units of product_1 and 70,000 units of product_2

This data is generated with the following script.

In [2]:
import pandas as pd
from tqdm import tqdm
import csv
import random
import string
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

random.seed(1999)

letters = string.ascii_lowercase
letters_upper = string.ascii_uppercase
for _i in range(0, 10):
    letters += letters

for _i in range(0, 10):
    letters += letters_upper


def random_string(stringLength=10):
    """Generate a random string of fixed length """
    return ''.join(random.sample(letters, stringLength))


print("Products between {} and {}".format(1, 75000000))
product_ids = [x for x in range(1, 75000000)]
dates = ['2020-07-01', '2020-07-02', '2020-07-03', '2020-07-04', '2020-07-05', '2020-07-06', '2020-07-07', '2020-07-08',
         '2020-07-09', '2020-07-10']
seller_ids = [x for x in range(1, 10)]


#   Generate products
products = [[0, "product_0", 22]]
for p in tqdm(product_ids):
    products.append([p, "product_{}".format(p), random.randint(1, 150)])
#   Save dataframe
df = pd.DataFrame(products)
df.columns = ["product_id", "product_name", "price"]
df.to_csv("data/products.csv", index=False)
del df
del products

#   Generate sellers
sellers = [[0, "seller_0", 2500000]]
for s in tqdm(seller_ids):
    sellers.append([s, "seller_{}".format(s), random.randint(12000, 2000000)])
#   Save dataframe
df = pd.DataFrame(sellers)
df.columns = ["seller_id", "seller_name", "daily_target"]
df.to_csv("data/sellers.csv", index=False)

#   Generate sales
total_rows = 500000
prod_zero = int(total_rows * 0.95)
prod_others = total_rows - prod_zero + 1
df_array = [["order_id", "product_id", "seller_id", "date", "num_pieces_sold", "bill_raw_text"]]
with open('data/sales.csv', 'w', newline='') as f:
    csvwriter = csv.writer(f)
    csvwriter.writerows(df_array)

order_id = 0
for i in tqdm(range(0, 40)):
    df_array = []

    for i in range(0, prod_zero):
        order_id += 1
        df_array.append([order_id, 0, 0, random.choice(dates), random.randint(1, 100), random_string(500)])

    with open('data/sales.csv', 'a', newline='') as f:
        csvwriter = csv.writer(f)
        csvwriter.writerows(df_array)

    df_array = []
    for i in range(0, prod_others):
        order_id += 1
        df_array.append(
            [order_id, random.choice(product_ids), random.choice(seller_ids), random.choice(dates),
             random.randint(1, 100), random_string(500)])

    with open('data/sales.csv', 'a', newline='') as f:
        csvwriter = csv.writer(f)
        csvwriter.writerows(df_array)

print("Done")

Products between 1 and 75000000


100%|██████████| 74999999/74999999 [09:43<00:00, 128453.74it/s]
100%|██████████| 9/9 [00:00<00:00, 28706.26it/s]
100%|██████████| 40/40 [1:29:58<00:00, 134.96s/it]

Done





And stored to Parquet using the following script.

In [3]:
spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .appName("GenerateData") \
    .getOrCreate()

products = spark.read.csv("data/products.csv", header=True, mode="DROPMALFORMED")
products.show()
products.write.parquet("data/products_parquet", mode="overwrite")

sales = spark.read.csv("data/sales.csv", header=True, mode="DROPMALFORMED")
sales.show()
sales.repartition(200, col("product_id")).write.parquet("data/sales_parquet", mode="overwrite")

sellers = spark.read.csv("data/sellers.csv", header=True, mode="DROPMALFORMED")
sellers.show()
sellers.write.parquet("data/sellers_parquet", mode="overwrite")

22/06/21 14:11:11 WARN Utils: Your hostname, MSI resolves to a loopback address: 127.0.1.1; using 172.17.179.177 instead (on interface eth0)
22/06/21 14:11:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/06/21 14:11:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   30|
|         2|   product_2|   91|
|         3|   product_3|   37|
|         4|   product_4|  145|
|         5|   product_5|  128|
|         6|   product_6|   66|
|         7|   product_7|  145|
|         8|   product_8|   51|
|         9|   product_9|   44|
|        10|  product_10|   53|
|        11|  product_11|   13|
|        12|  product_12|  104|
|        13|  product_13|  102|
|        14|  product_14|   24|
|        15|  product_15|   14|
|        16|  product_16|   38|
|        17|  product_17|   72|
|        18|  product_18|   16|
|        19|  product_19|   46|
+----------+------------+-----+
only showing top 20 rows



                                                                                

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-03|             98|frlnwjcoaxsaubnat...|
|       2|         0|        0|2020-07-07|             23|zsnrbwrlflvqqmbcz...|
|       3|         0|        0|2020-07-02|             79|gmxnirkafafnohboh...|
|       4|         0|        0|2020-07-07|              5|xrgknaskXkfcxcnzj...|
|       5|         0|        0|2020-07-10|             79|tzkqoynsqnfomkpbt...|
|       6|         0|        0|2020-07-05|             87|qoluiczrckaygkzbi...|
|       7|         0|        0|2020-07-08|             14|ivwpwrpuhrjgjdauj...|
|       8|         0|        0|2020-07-02|             64|hoalxshwHpqgyvqtm...|
|       9|         0|        0|2020-07-02|             45|vysrvsdfvekabcmwo...|
|      10|         0|        0|2020-07-0

                                                                                

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|     2500000|
|        1|   seller_1|     1375559|
|        2|   seller_2|      205349|
|        3|   seller_3|       71546|
|        4|   seller_4|     1315668|
|        5|   seller_5|      627802|
|        6|   seller_6|     1997104|
|        7|   seller_7|      593329|
|        8|   seller_8|       24388|
|        9|   seller_9|      348255|
+---------+-----------+------------+



## Read data into Spark

Import libraries

In [18]:
from pyspark.sql import SparkSession as sqlSparkSession
from pyspark.sql.functions import *
import pandas as pd

Read data into Spark

In [8]:
# Create session
spark = sqlSparkSession.builder.getOrCreate()

# Read as Spark DF
products_df = spark.read.parquet('./data/products_parquet')
sales_df = spark.read.parquet('./data/sales_parquet')
sellers_df = spark.read.parquet('./data/sellers_parquet')

# Spark RDD
products_rdd = products_df.rdd
sales_rdd = sales_df.rdd
sellers_rdd = sellers_df.rdd

#### 1. Find out how many orders, how many products and how many sellers are in the data.

In [16]:
# Spark DF
print(f"""
Products: {products_df.count():,d}
Sales: {sales_df.count():,d}
Sellers: {sellers_df.count():,d}
""")

                                                                                


Products: 75,000,000
Sales: 20,000,040
Sellers: 10



In [13]:
# RDD
products_count = products_rdd.count()
sales_count = sales_rdd.count()
sellers_count = sellers_rdd.count()

print(f"""
Products: {products_count:,d}
Sales: {sales_count:,d}
Sellers: {sellers_count:,d}
""")




Products: 69,998,598
Sales: 20,000,040
Sellers: 10



                                                                                



#### 2. How many products have been sold at least once?

In [35]:
# DF1
products_sold = sales_df.drop_duplicates(['product_id']).count()
print(f"Products sold at least once: {products_sold:,d}")

Products sold at least once: 993,299


                                                                                



In [36]:
# DF2
sales_df.agg(countDistinct('product_id')).show()

+-----------------+
|count(product_id)|
+-----------------+
|           993299|
+-----------------+



                                                                                



In [42]:
sales_df.show(5)

Exception in thread "serve RDD 149" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)

In [40]:
# RDD
sales_rdd.map(lambda x: x[])

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/rcadavid/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/rcadavid/.local/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 



##### 3. Which is the product contained in more orders?

##### 4. How many distinct products have been sold in each day?

In [18]:
sales.show(5)

[Stage 45:>                                                         (0 + 1) / 1]

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-03|             98|frlnwjcoaxsaubnat...|
|       2|         0|        0|2020-07-07|             23|zsnrbwrlflvqqmbcz...|
|       3|         0|        0|2020-07-02|             79|gmxnirkafafnohboh...|
|       4|         0|        0|2020-07-07|              5|xrgknaskXkfcxcnzj...|
|       5|         0|        0|2020-07-10|             79|tzkqoynsqnfomkpbt...|
+--------+----------+---------+----------+---------------+--------------------+
only showing top 5 rows



                                                                                

In [21]:
sales_df.groupby('date').agg({'*': 'count'}).show()

+----------+--------+
|      date|count(1)|
+----------+--------+
|2020-07-03| 2001301|
|2020-07-07| 2000202|
|2020-07-01| 2001066|
|2020-07-08| 2001015|
|2020-07-04| 2001640|
|2020-07-10| 1998316|
|2020-07-09| 1999799|
|2020-07-06| 2000492|
|2020-07-02| 1999256|
|2020-07-05| 1996953|
+----------+--------+



                                                                                

