In [249]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import hashlib

In [2]:
spark = SparkSession \
    .builder \
    .appName('Spark++ Application') \
    .getOrCreate()

## Load datasets

In [49]:
df_products = spark.read.parquet('hdfs:///R07/products_parquet/')
df_sellers = spark.read.parquet('hdfs:///R07/sellers_parquet/')
df_sales = spark.read.parquet('hdfs:///R07/sales_parquet/')

In [50]:
datasets = [df_products, df_sellers, df_sales]

In [51]:
for dataset in datasets:
    dataset.printSchema()
    dataset.show(3)

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   30|
|         2|   product_2|   91|
+----------+------------+-----+
only showing top 3 rows

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|     2500000|
|        1|   seller_1|      257237|
|        2|   seller_2|      754188|
+---------+-----------+------------+
only showing top 3 rows

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable =

## How many distinct products are sold each day?

In [52]:
distinct_products = df_sales.groupby(col("date")).agg(countDistinct(col("product_id")).alias("distinct_products_sold")).orderBy(col("distinct_products_sold").desc())

In [53]:
distinct_products.printSchema()
distinct_products.show()

root
 |-- date: string (nullable = true)
 |-- distinct_products_sold: long (nullable = false)

+----------+----------------------+
|      date|distinct_products_sold|
+----------+----------------------+
|2020-07-06|                100765|
|2020-07-09|                100501|
|2020-07-01|                100337|
|2020-07-03|                100017|
|2020-07-02|                 99807|
|2020-07-05|                 99796|
|2020-07-04|                 99791|
|2020-07-07|                 99756|
|2020-07-08|                 99662|
|2020-07-10|                 98973|
+----------+----------------------+



## What is the average revenue of the orders?

__Join `products` and `sales` column with `product id`__

In [74]:
joinExpression = df_products["product_id"] == df_sales['product_id']
product_id_products_sales = df_sales.join(df_products, joinExpression).drop(df_products['product_id'])

In [100]:
product_id_products_sales.printSchema()
product_id_products_sales.show()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)

+--------+----------+---------+----------+---------------+--------------------+----------------+-----+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|    product_name|price|
+--------+----------+---------+----------+---------------+--------------------+----------------+-----+
|12478308|  10005243|        6|2020-07-04|             98|qfvpgiscflyjxphcq...|product_10005243|   44|
| 8996776|  10023464|        9|2020-07-03|             59|jjbyqkzcimBfoehbv...|product_10023464|   19|
|10476976|  10050363|        6|2020-07-03|             18|xqhlvkpxtzrfdadry...|product_10050363|   98|
| 5977582|  10089524|        2|2020-07-01|  

__Multiply `num pieces sold` with `price` for the revenue__

In [58]:
df_revenue = joined_products_sales.withColumn('revenue', joined_products_sales['price'] * joined_products_sales['num_pieces_sold'])

In [59]:
df_revenue.printSchema()
df_revenue.show(5)

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- revenue: double (nullable = true)

+--------+----------+---------+----------+---------------+--------------------+----------------+-----+-------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|    product_name|price|revenue|
+--------+----------+---------+----------+---------------+--------------------+----------------+-----+-------+
|12478308|  10005243|        6|2020-07-04|             98|qfvpgiscflyjxphcq...|product_10005243|   44| 4312.0|
| 8996776|  10023464|        9|2020-07-03|             59|jjbyqkzcimBfoehbv...|product_10023464|   19| 1121.0|
|10476976|  10050363|        6|2020-07-03|             18|xqhlvkpxtz

__Find the average value of the orders__

In [65]:
avg_revenue = df_revenue.groupBy().mean('revenue')

In [66]:
avg_revenue.printSchema()
avg_revenue.show()

root
 |-- avg(revenue): double (nullable = true)

+------------------+
|      avg(revenue)|
+------------------+
|1246.1338560822878|
+------------------+



__Average revenue of the orders: $1246.13__

## What is the average daily revenue of each product?

__Get `revenue` per `product id`__

In [67]:
product_revenue = df_revenue.groupBy('date', 'product_id').agg((sum('revenue')).alias('product_revenue')).orderBy(col('date').asc())

In [68]:
product_revenue.printSchema()
product_revenue.show()

root
 |-- date: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_revenue: double (nullable = true)

+----------+----------+---------------+
|      date|product_id|product_revenue|
+----------+----------+---------------+
|2020-07-01|  11641075|         2010.0|
|2020-07-01|  12852991|         4224.0|
|2020-07-01|  11666191|         1800.0|
|2020-07-01|  10226294|        11440.0|
|2020-07-01|  11696381|         3230.0|
|2020-07-01|   1040523|          884.0|
|2020-07-01|  12122064|         4180.0|
|2020-07-01|  10862581|         1485.0|
|2020-07-01|  12300725|          495.0|
|2020-07-01|   1101513|         9956.0|
|2020-07-01|  12306462|         9984.0|
|2020-07-01|  11099735|         2790.0|
|2020-07-01|  12367671|          240.0|
|2020-07-01|  11365713|         3572.0|
|2020-07-01|  12519264|         7830.0|
|2020-07-01|  10374427|        12615.0|
|2020-07-01|  12570494|          946.0|
|2020-07-01|  10964827|          737.0|
|2020-07-01|  12741537|      

__Find average revenue by date__

In [79]:
avg_daily_revenue_per_product = product_revenue.groupBy('date', 'product_id').agg({'product_revenue':'mean'}).orderBy(col('date').asc())

In [80]:
avg_daily_revenue_per_product.printSchema()
avg_daily_revenue_per_product.show()

root
 |-- date: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- avg(product_revenue): double (nullable = true)

+----------+----------+--------------------+
|      date|product_id|avg(product_revenue)|
+----------+----------+--------------------+
|2020-07-01|  10089524|              5300.0|
|2020-07-01|  10150439|              5112.0|
|2020-07-01|  10160884|              4628.0|
|2020-07-01|  10749424|                63.0|
|2020-07-01|  10867431|              9702.0|
|2020-07-01|  11023830|                64.0|
|2020-07-01|   1111881|             11997.0|
|2020-07-01|  11228690|               588.0|
|2020-07-01|  11304131|              4026.0|
|2020-07-01|  11768624|               221.0|
|2020-07-01|  11790842|              7866.0|
|2020-07-01|  11872001|              6555.0|
|2020-07-01|  11885051|              5723.0|
|2020-07-01|  11923737|               651.0|
|2020-07-01|  12061585|              8888.0|
|2020-07-01|  12105556|              2178.0|
|2020-07-

## For each seller, what is the average % contribution of an order to the sellers daily quota?

__Join `Revenue dataframe` and `Sellers dataframe` with `Seller id`__

In [72]:
joinExpression = df_revenue['seller_id'] == df_sellers['seller_id']
seller_id_revenue_seller = df_revenue.join(df_sellers, joinExpression).drop(df_sellers['seller_id'])

In [73]:
seller_id_revenue_seller.printSchema()
seller_id_revenue_seller.show()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)

+--------+----------+---------+----------+---------------+--------------------+----------------+-----+-------+-----------+------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|    product_name|price|revenue|seller_name|daily_target|
+--------+----------+---------+----------+---------------+--------------------+----------------+-----+-------+-----------+------------+
|12478308|  10005243|        6|2020-07-04|             98|qfvpgiscflyjxphcq...|product_10005243|   44| 4312.0|   seller_6|     1

__Get order quota by getting average of the orders, and the `daily target` for each seller__ 

In [77]:
order_quota_per_seller = seller_id_revenue_seller.groupBy('seller_id').agg(mean('revenue').alias('avg_order'), max('daily_target').alias('daily_target')).orderBy(col('seller_id').asc())

In [78]:
order_quota_per_seller.printSchema()
order_quota_per_seller.show()

root
 |-- seller_id: string (nullable = true)
 |-- avg_order: double (nullable = true)
 |-- daily_target: string (nullable = true)

+---------+------------------+------------+
|seller_id|         avg_order|daily_target|
+---------+------------------+------------+
|        0|1110.9372444210526|     2500000|
|        1|3818.4719823112678|      257237|
|        2| 3819.833871243246|      754188|
|        3| 3824.481469172176|      310462|
|        4|3807.6749784110534|     1532808|
|        5|3803.7337879033857|     1199693|
|        6|3811.2040280996785|     1055915|
|        7| 3816.358735590778|     1946998|
|        8|3801.7181057340235|      547320|
|        9| 3829.333327348463|     1318051|
+---------+------------------+------------+



__Calculate the average contribution percentage by dividing `Average order` by `daily target`__

In [86]:
seller_avg_contribution_pct = df_orders.withColumn('avg_contribution_pct', df_orders['avg_order']/df_orders['daily_target']*100)

In [87]:
seller_avg_contribution_pct.printSchema()
seller_avg_contribution_pct.show()

root
 |-- seller_id: string (nullable = true)
 |-- avg_order: double (nullable = true)
 |-- daily_target: string (nullable = true)
 |-- avg_contribution_pct: double (nullable = true)

+---------+------------------+------------+--------------------+
|seller_id|         avg_order|daily_target|avg_contribution_pct|
+---------+------------------+------------+--------------------+
|        0|1110.9372444210526|     2500000| 0.04443748977684211|
|        1|3818.4719823112678|      257237|  1.4844178645806272|
|        2| 3819.833871243246|      754188|  0.5064829818617169|
|        3| 3824.481469172176|      310462|  1.2318678193054788|
|        4|3807.6749784110534|     1532808| 0.24841173704802255|
|        5|3803.7337879033857|     1199693| 0.31705892990151524|
|        6|3811.2040280996785|     1055915|  0.3609385251748179|
|        7| 3816.358735590778|     1946998| 0.19601246306317613|
|        8|3801.7181057340235|      547320|  0.6946060998563954|
|        9| 3829.333327348463|     1

In [96]:
seller_avg_contribution_pct.select('*', round(col('avg_contribution_pct'),2)).show()

+---------+------------------+------------+--------------------+------------------------------+
|seller_id|         avg_order|daily_target|avg_contribution_pct|round(avg_contribution_pct, 2)|
+---------+------------------+------------+--------------------+------------------------------+
|        0|1110.9372444210526|     2500000| 0.04443748977684211|                          0.04|
|        1|3818.4719823112678|      257237|  1.4844178645806272|                          1.48|
|        2| 3819.833871243246|      754188|  0.5064829818617169|                          0.51|
|        3| 3824.481469172176|      310462|  1.2318678193054788|                          1.23|
|        4|3807.6749784110534|     1532808| 0.24841173704802255|                          0.25|
|        5|3803.7337879033857|     1199693| 0.31705892990151524|                          0.32|
|        6|3811.2040280996785|     1055915|  0.3609385251748179|                          0.36|
|        7| 3816.358735590778|     19469

## Who are the second most selling and the least selling persons (sellers) for each product?

### a. Least selling sellers per each product

In [119]:
seller_id_revenue_seller.show()

+--------+----------+---------+----------+---------------+--------------------+----------------+-----+-------+-----------+------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|    product_name|price|revenue|seller_name|daily_target|
+--------+----------+---------+----------+---------------+--------------------+----------------+-----+-------+-----------+------------+
|12478308|  10005243|        6|2020-07-04|             98|qfvpgiscflyjxphcq...|product_10005243|   44| 4312.0|   seller_6|     1055915|
| 8996776|  10023464|        9|2020-07-03|             59|jjbyqkzcimBfoehbv...|product_10023464|   19| 1121.0|   seller_9|     1318051|
|10476976|  10050363|        6|2020-07-03|             18|xqhlvkpxtzrfdadry...|product_10050363|   98| 1764.0|   seller_6|     1055915|
| 5977582|  10089524|        2|2020-07-01|             53|jchvhzbeaicqitpvx...|product_10089524|  100| 5300.0|   seller_2|      754188|
| 1482892|  10122266|        2|2020-07-07|      

In [121]:
products_sales = seller_id_revenue_seller.withColumn('num_pieces_sold', seller_id_revenue_seller['num_pieces_sold'].cast('int'))
products_sales.dtypes

[('order_id', 'string'),
 ('product_id', 'string'),
 ('seller_id', 'string'),
 ('date', 'string'),
 ('num_pieces_sold', 'int'),
 ('bill_raw_text', 'string'),
 ('product_name', 'string'),
 ('price', 'string'),
 ('revenue', 'double'),
 ('seller_name', 'string'),
 ('daily_target', 'string')]

__Tried pivot table method, but wasn't too helpful__

In [129]:
pivot_product_seller = products_sales.groupBy('product_id').pivot('seller_id').sum("num_pieces_sold")

In [130]:
pivot_product_seller.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- 0: long (nullable = true)
 |-- 1: long (nullable = true)
 |-- 2: long (nullable = true)
 |-- 3: long (nullable = true)
 |-- 4: long (nullable = true)
 |-- 5: long (nullable = true)
 |-- 6: long (nullable = true)
 |-- 7: long (nullable = true)
 |-- 8: long (nullable = true)
 |-- 9: long (nullable = true)



In [131]:
pivot_product_seller.show()

+----------+----+----+----+----+----+----+----+----+----+----+
|product_id|   0|   1|   2|   3|   4|   5|   6|   7|   8|   9|
+----------+----+----+----+----+----+----+----+----+----+----+
|  10005243|null|null|null|null|null|null|  98|null|null|null|
|  10023464|null|null|null|null|null|null|null|null|null|  59|
|  10050363|null|null|null|null|null|null|  18|null|null|null|
|  10089524|null|null|  53|null|null|null|null|null|null|null|
|  10122266|null|null|  25|null|null|null|null|null|null|null|
|  10134574|null|null|null|  28|null|null|null|null|null|null|
|  10150439|null|null|null|null|  72|null|null|null|null|null|
|  10158822|null|null|null|null|null|null|null|null|null|  86|
|  10160884|null|null|null|null|null|null|null|  89|null|null|
|  10172594|null|null|  60|null|null|null|null|null|null|null|
|  10175294|null|null|null|null|null|null|null|  29|null|null|
|   1017716|null|null|null|null|null|null|null|null|null|  82|
|  10200802|null|null|  42|null|null|null|null|null|nul

__Find total sold by sellers per product__

In [158]:
df_total_sold = products_sales.groupby('product_id','seller_id').agg(sum('num_pieces_sold').alias('total_sold')).sort('product_id', 'seller_id')

In [159]:
df_total_sold.printSchema()
df_total_sold.show()

root
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- total_sold: long (nullable = true)

+----------+---------+----------+
|product_id|seller_id|total_sold|
+----------+---------+----------+
|         0|        0| 959445802|
|  10000006|        4|        39|
|  10000047|        9|        29|
|  10000271|        5|        51|
|   1000030|        5|        89|
|   1000042|        9|        47|
|  10000429|        6|         6|
|  10000601|        8|         6|
|  10000607|        2|        30|
|   1000065|        7|        33|
|  10000712|        7|        59|
|  10000715|        6|        51|
|  10000865|        1|        67|
|  10000971|        9|        50|
|   1000101|        5|        61|
|  10001082|        1|         4|
|  10001097|        3|        92|
|  10001174|        9|        71|
|  10001234|        5|        68|
|   1000124|        9|        73|
+----------+---------+----------+
only showing top 20 rows



__Find the minimum total sold per product_id__

In [160]:
min_sold = df_total_sold.groupby('product_id').agg(min('total_sold').alias('min_sold'))

In [161]:
min_sold.printSchema()
min_sold.show()

root
 |-- product_id: string (nullable = true)
 |-- min_sold: long (nullable = true)

+----------+--------+
|product_id|min_sold|
+----------+--------+
|  10005243|      98|
|  10023464|      59|
|  10050363|      18|
|  10089524|      53|
|  10122266|      25|
|  10134574|      28|
|  10150439|      72|
|  10158822|      86|
|  10160884|      89|
|  10172594|      60|
|  10175294|      29|
|   1017716|      82|
|  10200802|      42|
|  10215353|      22|
|  10218345|      98|
|  10220977|      76|
|  10255853|      26|
|  10288525|      63|
|  10304712|      48|
|  10324080|       5|
+----------+--------+
only showing top 20 rows



__Check the least sold per product by comparing between `minimum sold` from min sold dataframe and `total sold` from total sold dataframe.__

In [165]:
joinExpression = [min_sold['product_id'] == df_total_sold['product_id'], min_sold['min_sold'] == df_total_sold['total_sold']]
least_sold_per_product = min_sold.join(df_total_sold, joinExpression).drop(df_total_sold['product_id'])

In [166]:
least_sold_per_product.printSchema()

root
 |-- min_sold: long (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- total_sold: long (nullable = true)



In [168]:
least_sold_per_product.orderBy('product_id').drop('total_sold').show()

+---------+----------+---------+
| min_sold|product_id|seller_id|
+---------+----------+---------+
|959445802|         0|        0|
|       39|  10000006|        4|
|       29|  10000047|        9|
|       51|  10000271|        5|
|       89|   1000030|        5|
|       47|   1000042|        9|
|        6|  10000429|        6|
|        6|  10000601|        8|
|       30|  10000607|        2|
|       33|   1000065|        7|
|       59|  10000712|        7|
|       51|  10000715|        6|
|       67|  10000865|        1|
|       50|  10000971|        9|
|       61|   1000101|        5|
|        4|  10001082|        1|
|       92|  10001097|        3|
|       71|  10001174|        9|
|       68|  10001234|        5|
|       73|   1000124|        9|
+---------+----------+---------+
only showing top 20 rows



### b. Second most selling sellers per each product

__Find the maximum sold amount__

In [169]:
max_sold = df_total_sold.groupby('product_id').agg(max('total_sold').alias('max_sold'))

In [170]:
max_sold.printSchema()
max_sold.show()

root
 |-- product_id: string (nullable = true)
 |-- max_sold: long (nullable = true)

+----------+--------+
|product_id|max_sold|
+----------+--------+
|  10005243|      98|
|  10023464|      59|
|  10050363|      18|
|  10089524|      53|
|  10122266|      25|
|  10134574|      28|
|  10150439|      72|
|  10158822|      86|
|  10160884|      89|
|  10172594|      60|
|  10175294|      29|
|   1017716|      82|
|  10200802|      42|
|  10215353|      22|
|  10218345|      98|
|  10220977|      76|
|  10255853|      26|
|  10288525|      63|
|  10304712|      48|
|  10324080|       5|
+----------+--------+
only showing top 20 rows



__Create a new dataframe by getting rid of the `maximum sold` in the `total sold dataframe`__

In [199]:
joinExpression = [max_sold['product_id'] == df_total_sold['product_id'], max_sold['max_sold'] != df_total_sold['total_sold']]
total_sold_max_dropped = max_sold.join(df_total_sold, joinExpression).drop(df_total_sold['product_id'])

In [200]:
total_sold_max_dropped.printSchema()
total_sold_max_dropped.show()

root
 |-- max_sold: long (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- total_sold: long (nullable = true)

+--------+----------+---------+----------+
|max_sold|product_id|seller_id|total_sold|
+--------+----------+---------+----------+
|     100|  12194170|        3|        85|
|      49|  14700981|        1|        29|
|      99|  16113262|        7|        16|
|     100|  17806195|        5|        81|
|      99|  19121475|        3|        53|
|      57|  19351340|        6|        40|
|      77|  22001601|        4|        70|
|      66|  22357251|        3|        25|
|      22|  24279067|        3|         6|
|      57|  24401456|        6|         4|
|      80|  27293252|        6|        64|
|      73|  29343804|        4|        46|
|      76|    307787|        1|        62|
|      80|   3086568|        1|         2|
|      36|  31598409|        3|        29|
|      78|  32858319|        9|         4|
|      93|  3391

__Get the max sold from the new dataFrame__

In [203]:
second_max_sold_prep = total_sold_max_dropped.groupby('product_id').agg(max('total_sold').alias('second_max_sold'))

In [204]:
second_max_sold_prep.printSchema()
second_max_sold_prep.show()

root
 |-- product_id: string (nullable = true)
 |-- second_max_sold: long (nullable = true)

+----------+---------------+
|product_id|second_max_sold|
+----------+---------------+
|  12194170|             85|
|  14700981|             29|
|  16113262|             16|
|  17806195|             81|
|  19121475|             53|
|  19351340|             40|
|  22001601|             70|
|  22357251|             25|
|  24279067|              6|
|  24401456|              4|
|  27293252|             64|
|  29343804|             46|
|    307787|             62|
|   3086568|              2|
|  31598409|             29|
|  32858319|              4|
|  33915630|             48|
|  36993002|             10|
|  39748505|              1|
|   3984141|             56|
+----------+---------------+
only showing top 20 rows



__Get the maximum number of sold by comparing with `total sold` in total sold dataframe, which is the 2nd maximum sold number__

In [206]:
joinExpression = [second_max_sold_prep['product_id'] == df_total_sold['product_id'], second_max_sold_prep['second_max_sold'] == df_total_sold['total_sold']]
second_max_sold = second_max_sold_prep.join(df_total_sold, joinExpression).drop(df_total_sold['product_id'])

In [210]:
second_max_sold.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- second_max_sold: long (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- total_sold: long (nullable = true)



In [209]:
second_max_sold.orderBy('product_id').drop('total_sold').show()

+----------+---------------+---------+
|product_id|second_max_sold|seller_id|
+----------+---------------+---------+
|    100142|             47|        9|
|  10030330|             68|        5|
|  10031766|             76|        5|
|  10059280|             23|        7|
|  10063288|             61|        7|
|  10067998|              9|        9|
|  10074531|              2|        9|
|  10079600|             21|        9|
|  10090404|             81|        6|
|  10099820|             11|        3|
|   1011418|             50|        4|
|  10140476|             50|        4|
|   1015828|             61|        4|
|   1015908|             86|        8|
|   1016707|              9|        4|
|  10176212|             36|        2|
|  10189576|             74|        5|
|  10196252|             41|        7|
|  10208871|             39|        7|
|  10220712|             30|        3|
+----------+---------------+---------+
only showing top 20 rows



__(CS Students Only)__

## Create a new column called "hashed_bill" defined as follows:

- If the order_id is even: apply MD5 hashing iteratively to the bill_raw_text field, once for each 'A' (capital 'A') present in the text. E.g. if the bill text is 'nbAAnllA', you would apply hashing three times iteratively (only if the order number is even)

- If the order_id is odd: apply SHA256 hashing to the bill text

In [247]:
df_sales.show()

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|
|       6|         0|        0|2020-07-01|             82|lmuhhkpyuoyslwmvX...|
|       7|         0|        0|2020-07-04|             15|zoqweontumefxbgvu...|
|       8|         0|        0|2020-07-08|             79|sgldfgtcxufasnvsc...|
|       9|         0|        0|2020-07-10|             25|jnykelwjjebgkwgmu...|
|      10|         0|        0|2020-07-0

In [256]:
df_hashing = df_sales.withColumn('order_id', df_sales['order_id'].cast('int'))
df_hashing.dtypes

[('order_id', 'int'),
 ('product_id', 'string'),
 ('seller_id', 'string'),
 ('date', 'string'),
 ('num_pieces_sold', 'string'),
 ('bill_raw_text', 'string')]

__Create a custom hasing function, and apply to the dataframe__

In [269]:
def hash_function(order_id, bill_raw_text):
    if order_id%2 == 0:
        count_A = bill_raw_text.count('A')
        if count_A > 0:
            for c in range(count_A):
                md = hashlib.md5(bill_raw_text.encode()).hexdigest()
            return md
        else: 
            return bill_raw_text
    else:
        sha_value = hashlib.sha256(bill_raw_text.encode()).hexdigest()
        return sha_value

In [270]:
spark_udf = udf(hash_function, StringType())
final_df = df_hashing.withColumn('hashed_bill', spark_udf('order_id', 'bill_raw_text'))
final_df.show()

+--------+----------+---------+----------+---------------+--------------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|         hashed_bill|
+--------+----------+---------+----------+---------------+--------------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|f6fa2a8be04a4ead6...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|416376a64cd652e7b...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|787d361b162a6aa1a...|
|       6|         0|        0|2020-07-01|             82|lmuhhkpyuoyslwmvX...|lmuhhkpyuoyslwmvX...|
|       7|         0|        0|2020-07-04|             15|zoqweontumefxbgvu...|4540f452a7c4

## Finally, check if there are any duplicates in the new column

In [271]:
final_df.select('bill_raw_text').count()

20000040

In [272]:
final_df.select('hashed_bill').distinct().count()

20000040

__Comparing the number of `bill_raw_text` and unique number of `hashed_bill`, there is no duplicates in the `hashed_bill` column.__