In [1]:
import findspark
findspark.init()
import pyspark
import csv
import random
from faker import Faker
from datetime import datetime
import random
import pandas as pd
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import *
from pyspark.sql import Window
from pyspark.sql.functions import col

conf = pyspark.SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '4G')
        .set('spark.driver.memory', '4G')
        .set('spark.driver.maxResultSize', '10G'))
sc = pyspark.SparkContext(conf=conf)


In [3]:
def create_test_data():
    f=Faker('en_GB') 
    g=open("orders.csv","w")
    w=csv.writer(g)
    w.writerow(('order_id','order_item_id','customer_id','creation_date', 'completion_date',
                'product_id','qnt','cost','category_id'))
    categories = {'HTC':{'htcone':100, 'htc2':110, 'htc3':220, 'htc4':1000},
                  'Samsung':{'sam1':21, 'sam2':50, 'sam3':20},
                  'Apple':{'app1':300, 'app2':10, 'app3':20}}
    #products = {'HTC','Samsung','Apple'}
    
    for orderId in range(100000):
        customer = random.randrange(1,1000,1)
        itemsinorder = random.randrange(1,5,1)
        #print(itemsinorder)
        for orderItemId in range(itemsinorder):
            qnt = random.choice([1,2,3])
            category = random.choice(categories.items())
            #print(category)
            #print(category[0])
            product = random.choice(category[1].items())
            #print(product)
            #print(product[1])
            w.writerow((orderId+1,
                        orderItemId+1,
                        customer,
                        f.date_time_this_month(before_now=True, after_now=False, tzinfo=None),
                        f.date_time_this_month(before_now=False, after_now=True, tzinfo=None),
                        product[0],
                        qnt,
                        product[1],
                        category[0]))
    g.close()
create_test_data()

In [4]:
df = pd.read_csv('orders.csv')
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 250238 entries, 0 to 250237
Data columns (total 9 columns):
order_id           250238 non-null int64
order_item_id      250238 non-null int64
customer_id        250238 non-null int64
creation_date      250238 non-null object
completion_date    250238 non-null object
product_id         250238 non-null object
qnt                250238 non-null int64
cost               250238 non-null int64
category_id        250238 non-null object
dtypes: int64(5), object(4)
memory usage: 17.2+ MB


## Read the data

In [5]:
%%time

sqlContext = SQLContext(sc)
taxi_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('orders.csv')

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 4.01 s


In [6]:
taxi_df.show()

+--------+-------------+-----------+-------------------+-------------------+----------+---+----+-----------+
|order_id|order_item_id|customer_id|      creation_date|    completion_date|product_id|qnt|cost|category_id|
+--------+-------------+-----------+-------------------+-------------------+----------+---+----+-----------+
|       1|            1|        985|2018-01-08 22:10:13|2018-01-18 04:41:36|      app2|  3|  10|      Apple|
|       1|            2|        985|2018-01-09 06:51:27|2018-01-25 05:36:40|      sam1|  3|  21|    Samsung|
|       1|            3|        985|2018-01-06 14:28:38|2018-01-28 15:43:54|      app1|  3| 300|      Apple|
|       2|            1|        741|2018-01-03 19:46:18|2018-01-13 23:11:08|      app1|  3| 300|      Apple|
|       3|            1|        384|2018-01-06 05:26:44|2018-01-12 09:29:07|      htc3|  3| 220|        HTC|
|       3|            2|        384|2018-01-05 21:03:29|2018-01-12 08:31:25|      app1|  2| 300|      Apple|
|       3|         

In [7]:
taxi_df.registerTempTable("orders")

In [8]:
taxi_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- creation_date: timestamp (nullable = true)
 |-- completion_date: timestamp (nullable = true)
 |-- product_id: string (nullable = true)
 |-- qnt: integer (nullable = true)
 |-- cost: integer (nullable = true)
 |-- category_id: string (nullable = true)



In [9]:
df2 = sqlContext.sql('''SELECT product_id,
                          category_id,
                          sum(cost * qnt) as revenue
                        FROM orders
                      group by product_id, category_id order by revenue desc''')

In [10]:
df2.show()

+----------+-----------+--------+
|product_id|category_id| revenue|
+----------+-----------+--------+
|      htc4|        HTC|41508000|
|      app1|      Apple|16788600|
|      htc3|        HTC| 9144300|
|      htc2|        HTC| 4595690|
|    htcone|        HTC| 4244000|
|      sam2|    Samsung| 2799850|
|      sam1|    Samsung| 1161615|
|      sam3|    Samsung| 1114920|
|      app3|      Apple| 1105620|
|      app2|      Apple|  557930|
+----------+-----------+--------+



In [17]:
window = Window.partitionBy("category_id").orderBy(col("revenue").desc())
N = 3
df2.withColumn("dense_rank", dense_rank().over(window)).filter(col("dense_rank") <= N ).show()


+----------+-----------+--------+----------+
|product_id|category_id| revenue|dense_rank|
+----------+-----------+--------+----------+
|      sam2|    Samsung| 2799850|         1|
|      sam1|    Samsung| 1161615|         2|
|      sam3|    Samsung| 1114920|         3|
|      htc4|        HTC|41508000|         1|
|      htc3|        HTC| 9144300|         2|
|      htc2|        HTC| 4595690|         3|
|      app1|      Apple|16788600|         1|
|      app3|      Apple| 1105620|         2|
|      app2|      Apple|  557930|         3|
+----------+-----------+--------+----------+

