In [1]:
import os
#pyspark_submit_args ='--master local[*] --executor-memory 2G --driver-memory 2G --num-executors 8 pyspark-shell' 
pyspark_submit_args ='--master local[*] --executor-memory 2G --driver-memory 2G --num-executors 4 pyspark-shell' 
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

In [2]:
import pandas as pd
from tqdm import tqdm
import csv
import random
import string
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os

from typing import List

In [3]:
import pyspark
from pyspark.sql.functions import when, col, mean, desc, round,avg,round,count
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

In [4]:
class NAMES :
    
    __DIR_CSV : str
    __DIR_PARQUET: str
    
    def __init__(self,csv='csv',parquet='parquet') :
        self.__DIR_CSV = csv
        self.__DIR_PARQUET=parquet
       
    def __getnames(self,dir : str, ext: str) :
        name : List[str] = ['products','sellers','seles']
        try:
            os.mkdir(dir)
        except OSError as error:
            print(f"{dir} already exists")
        return (os.path.join(dir,name[0] + '.' + ext),os.path.join(dir,name[1] + '.' + ext),os.path.join(dir,name[2] + '.' + ext))
                
    def getcsv(self) : 
        return self.__getnames(self.__DIR_CSV,'csv')
                                                            
    def getparquet(self) : 
        return self.__getnames(self.__DIR_PARQUET,'parquet')


In [5]:
class READDF:
    
    def __init__(self,N) :
        self.N = N
        
    def readdf(self) :
        productsname,sellersname,salesname = self.N.getparquet()
        products = spark.read.parquet(productsname)
        sales = spark.read.parquet(salesname)
        sellers = spark.read.parquet(sellersname)
        products.createOrReplaceTempView("products")
        sales.createOrReplaceTempView("sales")
        sellers.createOrReplaceTempView("sellers")
        return (products,sales,sellers)


In [6]:
N = NAMES('/tmp/csv')
D = READDF(N)
products,sales,sellers = D.readdf()

parquet already exists


# Warm-up #1
Find out how many orders, how many products and how many sellers are in the data.

In [7]:
print('Number of products {0}'.format(products.count()))
print('Number of sales {0}'.format(sales.count()))
print('Number of sellers {0}'.format(sellers.count()))

Number of products 7500
Number of sales 1098838
Number of sellers 10


In [8]:
print("Products:")
spark.sql("SELECT COUNT(*) FROM products").show()
print("Sales:")
spark.sql("SELECT COUNT(*) FROM sales").show()
print("Sellers:")
spark.sql("SELECT COUNT(*) FROM sellers").show()

Products:
+--------+
|count(1)|
+--------+
|    7500|
+--------+

Sales:
+--------+
|count(1)|
+--------+
| 1098838|
+--------+

Sellers:
+--------+
|count(1)|
+--------+
|      10|
+--------+



__How many products have been sold at least once?__

In [9]:
sales.select('product_id').distinct().count()

7500

In [10]:
spark.sql("SELECT COUNT(DISTINCT product_id), MIN(product_id), MAX(product_id) from sales").show()

+--------------------------+---------------+---------------+
|count(DISTINCT product_id)|min(product_id)|max(product_id)|
+--------------------------+---------------+---------------+
|                      7500|              0|           7499|
+--------------------------+---------------+---------------+



__How many products haven't been sold at all?__

In [14]:
spark.sql("SELECT COUNT(DISTINCT products.product_id) from products LEFT OUTER JOIN sales USING(product_id) WHERE products.product_id IS NULL").show()

+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
|                         0|
+--------------------------+



In [22]:
products.join(sales,products.product_id == sales.product_id,'left_outer').select(products.product_id).filter(products.product_id.isNull()).count()

0

__Top sold products in number of orders__

In [45]:
sdf = sales.groupBy('product_id').agg(count('order_id').alias('count'))
sdf.join(products,products.product_id == sdf.product_id).select(products.product_id,products.product_name,'count').orderBy(col('count').desc()).show()

+----------+------------+-----+
|product_id|product_name|count|
+----------+------------+-----+
|      6962|product_6962|  198|
|      1250|product_1250|  191|
|      3207|product_3207|  188|
|      1433|product_1433|  187|
|      5523|product_5523|  185|
|      3705|product_3705|  185|
|      2106|product_2106|  185|
|      6231|product_6231|  185|
|      3432|product_3432|  185|
|       958| product_958|  185|
|      4758|product_4758|  184|
|      6687|product_6687|  183|
|      5370|product_5370|  183|
|      1310|product_1310|  183|
|       811| product_811|  182|
|      1425|product_1425|  182|
|      6838|product_6838|  182|
|      2464|product_2464|  182|
|      5939|product_5939|  182|
|      3813|product_3813|  182|
+----------+------------+-----+
only showing top 20 rows



In [46]:
spark.sql("WITH P as (SELECT product_id, COUNT(distinct order_id) as count FROM sales group by product_id) select products.product_id,products.product_name,count from products JOIN P USING(product_id) ORDER BY count desc").show()

+----------+------------+-----+
|product_id|product_name|count|
+----------+------------+-----+
|      6962|product_6962|  198|
|      1250|product_1250|  191|
|      3207|product_3207|  188|
|      1433|product_1433|  187|
|      3432|product_3432|  185|
|       958| product_958|  185|
|      5523|product_5523|  185|
|      3705|product_3705|  185|
|      2106|product_2106|  185|
|      4758|product_4758|  184|
|      6231|product_6231|  184|
|      1310|product_1310|  183|
|      6687|product_6687|  183|
|      5370|product_5370|  183|
|      1425|product_1425|  182|
|      3813|product_3813|  182|
|       811| product_811|  182|
|      2464|product_2464|  182|
|      5939|product_5939|  182|
|      6838|product_6838|  182|
+----------+------------+-----+
only showing top 20 rows



__How many distinct products have been sold in each date__

In [47]:
spark.sql("select date,count(distinct product_id) as number from sales group by date").show()

+----------+------+
|      date|number|
+----------+------+
|2020-07-03|  7500|
|2020-07-07|  7500|
|2020-07-01|  7500|
|2020-07-08|  7500|
|2020-07-04|  7500|
|2020-07-10|  7500|
|2020-07-09|  7500|
|2020-07-06|  7500|
|2020-07-02|  7500|
|2020-07-05|  7500|
+----------+------+

