In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

In [3]:
mapped_rdd = orders_rdd.map(lambda x:(x.split(",")[3],1))

In [4]:
mapped_rdd.take(10)

[('CLOSED', 1),
 ('PENDING_PAYMENT', 1),
 ('COMPLETE', 1),
 ('CLOSED', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('COMPLETE', 1),
 ('PROCESSING', 1),
 ('PENDING_PAYMENT', 1),
 ('PENDING_PAYMENT', 1)]

In [5]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x,y:x+y)

In [6]:
reduced_rdd.collect()

[('CLOSED', 7556),
 ('CANCELED', 1428),
 ('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('SUSPECTED_FRAUD', 1558),
 ('PENDING', 7610),
 ('ON_HOLD', 3798),
 ('PROCESSING', 8275),
 ('PAYMENT_REVIEW', 729)]

In [7]:
reduced_sorted = reduced_rdd.sortBy(lambda x:x[1],False)

In [8]:
reduced_sorted.collect()

[('COMPLETE', 22899),
 ('PENDING_PAYMENT', 15030),
 ('PROCESSING', 8275),
 ('PENDING', 7610),
 ('CLOSED', 7556),
 ('ON_HOLD', 3798),
 ('SUSPECTED_FRAUD', 1558),
 ('CANCELED', 1428),
 ('PAYMENT_REVIEW', 729)]

In [9]:
customers_mapped = orders_rdd.map(lambda x:(x.split(",")[2],1))

In [10]:
customers_mapped.take(5)

[('11599', 1), ('256', 1), ('12111', 1), ('8827', 1), ('11318', 1)]

In [11]:
customers_aggregated = customers_mapped.reduceByKey(lambda x,y:x+y)

In [12]:
customers_aggregated.take(20)

[('256', 10),
 ('12111', 6),
 ('11318', 6),
 ('7130', 7),
 ('2911', 6),
 ('5657', 12),
 ('9149', 4),
 ('9842', 7),
 ('7276', 5),
 ('9488', 7),
 ('2711', 3),
 ('333', 6),
 ('656', 5),
 ('6983', 6),
 ('4189', 3),
 ('4840', 2),
 ('5863', 6),
 ('8214', 5),
 ('7776', 8),
 ('1549', 4)]

In [13]:
customers_sorted = customers_aggregated.sortBy(lambda x:x[1],False)

In [14]:
customers_sorted.take(10)

[('5897', 16),
 ('6316', 16),
 ('12431', 16),
 ('569', 16),
 ('4320', 15),
 ('221', 15),
 ('5624', 15),
 ('5283', 15),
 ('12284', 15),
 ('5654', 15)]

In [15]:
distinct_customers = orders_rdd.map(lambda x:(x.split(",")[2])).distinct()

In [16]:
distinct_customers.count()

12405

In [17]:
orders_rdd.count()

68883

In [18]:
filtered_orders = orders_rdd.filter(lambda x:x.split(",")[3] == 'CLOSED')

In [19]:
filtered_orders.take(20)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '12,2013-07-25 00:00:00.0,1837,CLOSED',
 '18,2013-07-25 00:00:00.0,1205,CLOSED',
 '24,2013-07-25 00:00:00.0,11441,CLOSED',
 '25,2013-07-25 00:00:00.0,9503,CLOSED',
 '37,2013-07-25 00:00:00.0,5863,CLOSED',
 '51,2013-07-25 00:00:00.0,12271,CLOSED',
 '57,2013-07-25 00:00:00.0,7073,CLOSED',
 '61,2013-07-25 00:00:00.0,4791,CLOSED',
 '62,2013-07-25 00:00:00.0,9111,CLOSED',
 '87,2013-07-25 00:00:00.0,3065,CLOSED',
 '90,2013-07-25 00:00:00.0,9131,CLOSED',
 '101,2013-07-25 00:00:00.0,5116,CLOSED',
 '116,2013-07-26 00:00:00.0,8763,CLOSED',
 '129,2013-07-26 00:00:00.0,9937,CLOSED',
 '133,2013-07-26 00:00:00.0,10604,CLOSED',
 '191,2013-07-26 00:00:00.0,16,CLOSED',
 '201,2013-07-26 00:00:00.0,9055,CLOSED',
 '211,2013-07-26 00:00:00.0,10372,CLOSED']

In [20]:
filtered_mapped = filtered_orders.map(lambda x:(x.split(",")[2],1))

In [21]:
filtered_mapped.take(10)

[('11599', 1),
 ('8827', 1),
 ('1837', 1),
 ('1205', 1),
 ('11441', 1),
 ('9503', 1),
 ('5863', 1),
 ('12271', 1),
 ('7073', 1),
 ('4791', 1)]

In [22]:
filtered_aggregated = filtered_mapped.reduceByKey(lambda x,y:x+y)

In [23]:
filtered_aggregated.take(20)

[('5863', 1),
 ('12271', 2),
 ('7073', 1),
 ('3065', 2),
 ('5116', 2),
 ('8763', 1),
 ('10604', 2),
 ('16', 1),
 ('9055', 3),
 ('10372', 3),
 ('11715', 1),
 ('5925', 1),
 ('8309', 3),
 ('948', 1),
 ('5191', 1),
 ('7650', 2),
 ('4199', 2),
 ('6989', 1),
 ('5011', 4),
 ('11394', 1)]

In [24]:
filtered_sorted = filtered_aggregated.sortBy(lambda x:x[1],False)

In [25]:
filtered_sorted.take(10)

[('1833', 6),
 ('1363', 5),
 ('1687', 5),
 ('5493', 5),
 ('5011', 4),
 ('8974', 4),
 ('2321', 4),
 ('3736', 4),
 ('8368', 4),
 ('2236', 4)]