In [1]:
from pyspark.sql import SparkSession

import getpass

username = getpass.getuser()

spark = SparkSession.\
builder. \
config('spark.ui.port','0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

spark

In [2]:
orders_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")
order_items_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

In [3]:
order_items_total = order_items_rdd.map(lambda x:(x.split(",")[0],float(x.split(",")[4])))
order_items_total.take(5)

[('1', 299.98), ('2', 199.99), ('3', 250.0), ('4', 129.99), ('5', 49.98)]

In [4]:
orders_customer = orders_rdd.map(lambda x:(x.split(",")[0],x.split(",")[2]))
orders_customer.take(5)

[('1', '11599'), ('2', '256'), ('3', '12111'), ('4', '8827'), ('5', '11318')]

In [5]:
orders_joined = orders_customer.join(order_items_total)
orders_joined.take(5)

[('34566', ('3066', 199.99)),
 ('34568', ('1271', 79.98)),
 ('34569', ('11083', 399.98)),
 ('34577', ('7733', 129.99)),
 ('34583', ('1558', 199.99))]

In [6]:
mapped_rdd=orders_joined.map(lambda x:(x[1][0],x[1][1]))
mapped_rdd.take(5)

[('3066', 199.99),
 ('1271', 79.98),
 ('11083', 399.98),
 ('7733', 129.99),
 ('1558', 199.99)]

In [7]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x,y:x+y ).sortBy(lambda x:x[1],False)
reduced_rdd.take(10)
# reduced_rdd.sortBy()

[('9639', 4299.68),
 ('5897', 4279.610000000001),
 ('6316', 3679.55),
 ('9056', 3559.7000000000003),
 ('5004', 3519.55),
 ('11689', 3440.75),
 ('5146', 3434.75),
 ('898', 3369.75),
 ('5582', 3359.5200000000004),
 ('10235', 3289.6200000000003)]

In [8]:
order_items_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

In [9]:
id_quantity = order_items_rdd.map(lambda x:(int(x.split(",")[2]),int(x.split(",")[3])))
reduced_rdd = id_quantity.reduceByKey(lambda x,y : x+y)
reduced_rdd.sortBy(lambda x:x[1],False).take(10)

[(365, 73698),
 (502, 62956),
 (1014, 57803),
 (191, 36680),
 (627, 31735),
 (403, 22246),
 (1004, 17325),
 (1073, 15500),
 (957, 13729),
 (977, 998)]

In [10]:
customers_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")
customers_rdd.take(5)

['1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521',
 '2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126',
 '3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725',
 '4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069',
 '5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,"10 Crystal River Mall ",Caguas,PR,00725']

In [11]:
filtered_rdd = customers_rdd.filter(lambda x:x.split(",")[-3]=="Caguas")
filtered_rdd.take(10)

['3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725',
 '5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,"10 Crystal River Mall ",Caguas,PR,00725',
 '7,Melissa,Wilcox,XXXXXXXXX,XXXXXXXXX,9453 High Concession,Caguas,PR,00725',
 '9,Mary,Perez,XXXXXXXXX,XXXXXXXXX,3616 Quaking Street,Caguas,PR,00725',
 '11,Mary,Huffman,XXXXXXXXX,XXXXXXXXX,3169 Stony Woods,Caguas,PR,00725',
 '13,Mary,Baldwin,XXXXXXXXX,XXXXXXXXX,7922 Iron Oak Gardens,Caguas,PR,00725',
 '16,Tiffany,Smith,XXXXXXXXX,XXXXXXXXX,6651 Iron Port,Caguas,PR,00725',
 '19,Stephanie,Mitchell,XXXXXXXXX,XXXXXXXXX,3543 Red Treasure Bay,Caguas,PR,00725',
 '21,William,Zimmerman,XXXXXXXXX,XXXXXXXXX,"3323 Old Willow Mall ",Caguas,PR,00725',
 '24,Mary,Smith,XXXXXXXXX,XXXXXXXXX,9417 Emerald Towers,Caguas,PR,00725']

In [12]:
filtered_rdd_2 = filtered_rdd.map(lambda x:x.split(",")[0]).distinct()
filtered_rdd_2.count()

4584

In [13]:
customers_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")

In [14]:
states_rdd = customers_rdd.map(lambda x : (x.split(",")[-2],1))
state_cnt = states_rdd.reduceByKey(lambda x,y:x+y)
state_cnt.sortBy(lambda x:x[1],False).take(3)

[('PR', 4771), ('CA', 2012), ('NY', 775)]

In [15]:
orders_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")
order_items_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

In [16]:
order_items_map=order_items_rdd.map(lambda x:((int(x.split(',')[1]),float(x.split(',')[4]))))
orders_map=orders_rdd.map(lambda x:(int(x.split(',')[0]),(int(x.split(',')[2]))))

In [17]:
join_rdd=order_items_map.join(orders_map)

In [18]:
join_rdd.take(5)

[(4, (49.98, 8827)),
 (4, (299.95, 8827)),
 (4, (150.0, 8827)),
 (4, (199.92, 8827)),
 (8, (179.97, 2911))]

In [19]:
cust_amount = join_rdd.map(lambda x:(x[1][1],x[1][0]))
reduced_cust = cust_amount.reduceByKey(lambda x,y:x+y)
filtered_rdd = reduced_cust.filter(lambda x : x[-1]>1000)
result = filtered_rdd.map(lambda x:x[0]).distinct().count()

In [20]:
result

11148

In [21]:
orders_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")
customers_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")

In [22]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [23]:
customers_rdd.take(5)

['1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521',
 '2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126',
 '3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725',
 '4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069',
 '5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,"10 Crystal River Mall ",Caguas,PR,00725']

In [24]:
#pick customerid,status from orders
orders_base = orders_rdd.map(lambda x : (x.split(",")[2],x.split(",")[-1]))
orders_base.take(5)

[('11599', 'CLOSED'),
 ('256', 'PENDING_PAYMENT'),
 ('12111', 'COMPLETE'),
 ('8827', 'CLOSED'),
 ('11318', 'COMPLETE')]

In [25]:
#pick customer_id,state from customers_rdd
customers_base = customers_rdd.map(lambda x : (x.split(",")[0],x.split(",")[-2]))
customers_base.take(5)

[('1', 'TX'), ('2', 'CO'), ('3', 'PR'), ('4', 'CA'), ('5', 'PR')]

In [26]:
joined_rdd = orders_base.join(customers_base)
joined_rdd.take(5)

[('2248', ('PROCESSING', 'PR')),
 ('2248', ('ON_HOLD', 'PR')),
 ('2248', ('CLOSED', 'PR')),
 ('2248', ('COMPLETE', 'PR')),
 ('7733', ('CANCELED', 'CA'))]

In [32]:
closed_filter = joined_rdd.filter(lambda x : x[1][0]=="CLOSED")
closed_filter_new = closed_filter.map(lambda x : (x[1][1],1))
reduced_state=closed_filter_new.reduceByKey(lambda x,y:x+y)
res = reduced_state.sortBy(lambda x : x[1],False)
res.take(1)

[('PR', 2891)]

In [34]:
orders_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")
orders_rdd.take(10)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE',
 '6,2013-07-25 00:00:00.0,7130,COMPLETE',
 '7,2013-07-25 00:00:00.0,4530,COMPLETE',
 '8,2013-07-25 00:00:00.0,2911,PROCESSING',
 '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT',
 '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT']

In [35]:
orders_rdd.map(lambda x : x.split(",")[-2]).distinct().count()

12405

In [36]:
customers_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")
orders_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")
order_items_rdd=spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

In [40]:
#from orders, pick order_id,customer_id & from customers pick customer_id & state

orders_base1 = orders_rdd.map(lambda x : (int(x.split(",")[0]),int(x.split(",")[2])))
customers_base1 = customers_rdd.map(lambda x : (int(x.split(",")[0]),x.split(",")[-2]))
customers_base1.take(10)

[(1, 'TX'),
 (2, 'CO'),
 (3, 'PR'),
 (4, 'CA'),
 (5, 'PR'),
 (6, 'NJ'),
 (7, 'PR'),
 (8, 'MA'),
 (9, 'PR'),
 (10, 'VA')]

In [42]:
order_cust_join = orders_base1.join(customers_base1)
order_cust_join.take(10)

[(6244, (5638, 'FL')),
 (6248, (11150, 'CA')),
 (6252, (9091, 'TX')),
 (6256, (3713, 'NJ')),
 (6260, (9628, 'PR')),
 (6264, (12177, 'IL')),
 (6268, (5853, 'PR')),
 (6272, (1666, 'NJ')),
 (6276, (4831, 'IL')),
 (6280, (9186, 'PR'))]

In [45]:
mapped_rdd_1 = order_cust_join.map(lambda x:(x[1][0],x[1][1]))
mapped_rdd_1.take(10)

[(5638, 'FL'),
 (11150, 'CA'),
 (9091, 'TX'),
 (3713, 'NJ'),
 (9628, 'PR'),
 (12177, 'IL'),
 (5853, 'PR'),
 (1666, 'NJ'),
 (4831, 'IL'),
 (9186, 'PR')]

In [46]:
order_items_base1 = order_items_rdd.map(lambda x:(int(x.split(",")[0]),float(x.split(",")[-2])))
order_items_base1.take(10)

[(1, 299.98),
 (2, 199.99),
 (3, 250.0),
 (4, 129.99),
 (5, 49.98),
 (6, 299.95),
 (7, 150.0),
 (8, 199.92),
 (9, 299.98),
 (10, 299.95)]

In [48]:
joined_rdd_2 = mapped_rdd_1.join(order_items_base1)
joined_rdd_2.take(10)

[(8136, ('NY', 299.98)),
 (2256, ('MD', 124.95)),
 (2256, ('NY', 124.95)),
 (2256, ('NJ', 124.95)),
 (8220, ('PR', 119.98)),
 (8220, ('MD', 119.98)),
 (8220, ('PR', 119.98)),
 (6174, ('MD', 159.96)),
 (618, ('FL', 399.98)),
 (618, ('PR', 399.98))]

In [50]:
fin_mapped = joined_rdd_2.map(lambda x : (x[1][0],x[1][1]))
fin_mapped.take(10)

[('PR', 199.99),
 ('PR', 399.98),
 ('CA', 299.98),
 ('PR', 299.98),
 ('FL', 299.98),
 ('PR', 299.98),
 ('CA', 129.99),
 ('PR', 129.99),
 ('CA', 129.99),
 ('PR', 49.98)]

In [53]:
res = fin_mapped.reduceByKey(lambda x,y:x+y)
res_fin = res.sortBy(lambda x:x[1],False)
res_fin.collect()

[('PR', 937988.2600000022),
 ('CA', 392986.0700000002),
 ('NY', 156973.85000000003),
 ('TX', 124398.84999999998),
 ('IL', 104542.49999999993),
 ('FL', 71457.25999999995),
 ('PA', 51268.68999999999),
 ('OH', 50620.839999999975),
 ('MI', 49637.96999999999),
 ('NJ', 42829.529999999984),
 ('AZ', 41158.62999999998),
 ('GA', 33905.109999999986),
 ('NC', 32799.19999999999),
 ('MD', 31671.08999999999),
 ('VA', 26247.26),
 ('CO', 25473.97),
 ('OR', 23570.35),
 ('MA', 21894.190000000002),
 ('TN', 21723.369999999995),
 ('NV', 21072.19),
 ('MO', 19135.82),
 ('HI', 17558.59),
 ('UT', 14525.44),
 ('NM', 14247.43),
 ('WA', 14171.509999999998),
 ('CT', 13898.109999999999),
 ('LA', 12721.39),
 ('WI', 12432.670000000002),
 ('SC', 8484.970000000001),
 ('DC', 8074.110000000001),
 ('MN', 7713.240000000001),
 ('IN', 6853.18),
 ('KY', 6690.89),
 ('KS', 5884.3),
 ('DE', 4882.4400000000005),
 ('RI', 3179.5900000000006),
 ('OK', 3105.6400000000003),
 ('WV', 2879.6800000000003),
 ('AR', 2719.69),
 ('ND', 2426.72