In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
 builder. \
 config('spark.ui.port', '0'). \
 config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
 enableHiveSupport(). \
 master('yarn'). \
 getOrCreate()

In [2]:
spark

#### 1. we need to find top 10 customers who have spent the most amount
(premium customers)

###### Loading the data to RDD’s

In [3]:
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

In [4]:
orders_rdd.take(5)

['1,2013-07-25 00:00:00.0,11599,CLOSED',
 '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT',
 '3,2013-07-25 00:00:00.0,12111,COMPLETE',
 '4,2013-07-25 00:00:00.0,8827,CLOSED',
 '5,2013-07-25 00:00:00.0,11318,COMPLETE']

In [5]:
order_items_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

In [6]:
order_items_rdd.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

###### Taking the order_id and order_item_subtotal from order_items_rdd

In [7]:
order_items_map = order_items_rdd.map(lambda x: ((int(x.split(',')[1]),float(x.split(',')[4]))))

In [8]:
order_items_map.take(5)

[(1, 299.98), (2, 199.99), (2, 250.0), (2, 129.99), (4, 49.98)]

###### Taking the order_id and order_customer_id from orders_rdd

In [9]:
orders_map = orders_rdd.map(lambda x: (int(x.split(',')[0]),(int(x.split(',')[2]))))

In [10]:
orders_map.take(6)

[(1, 11599), (2, 256), (3, 12111), (4, 8827), (5, 11318), (6, 7130)]

###### Joining the 2 rdd’s using the common column – order_id

In [11]:
join_rdd = order_items_map.join(orders_map)

In [12]:
mapped_rdd = join_rdd.map(lambda x: (x[1][1],x[1][0]))

In [13]:
mapped_rdd.take(6)

[(8774, 49.98),
 (8774, 299.97),
 (8774, 249.9),
 (8774, 49.98),
 (8774, 149.94),
 (2533, 399.98)]

###### Now we need to sum up the amount for each customer and then sort it in descending order

In [14]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x,y : x+y).sortBy(lambda x: x[1], ascending=False)
reduced_rdd.take(10)

[(791, 10524.169999999996),
 (9371, 9299.029999999999),
 (8766, 9296.14),
 (1657, 9223.71),
 (2641, 9130.92),
 (1288, 9019.11),
 (3710, 9019.099999999999),
 (4249, 8918.85),
 (5654, 8904.95),
 (5624, 8761.98)]

##### 2. top 10 product id's with most quantities sold

######  Loading data to RDD

In [15]:
order_items_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

In [16]:
order_items_rdd.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

###### We are taking order_item_product_id, order_item_quantity columns and then adding the quantities of each product item and sorting it.

In [17]:
mapped_rdd = order_items_rdd.map(lambda x: ((int(x.split(",")[2])),(int(x.split(",")[3]))))

In [18]:
mapped_rdd.take(5)

[(957, 1), (1073, 1), (502, 5), (403, 1), (897, 2)]

In [19]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x, y: x + y)

In [20]:
reduced_rdd.take(5)

[(502, 62956), (1014, 57803), (926, 930), (134, 801), (276, 919)]

In [21]:
top_products = reduced_rdd.sortBy(lambda x: x[1], ascending=False)

In [22]:
top_products.take(5)

[(365, 73698), (502, 62956), (1014, 57803), (191, 36680), (627, 31735)]

##### 3. how many customers are from Caguas city

###### Loading data to RDD

In [23]:
customers_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")

In [24]:
customers_rdd.take(5)

['1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521',
 '2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126',
 '3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725',
 '4,Mary,Jones,XXXXXXXXX,XXXXXXXXX,8324 Little Common,San Marcos,CA,92069',
 '5,Robert,Hudson,XXXXXXXXX,XXXXXXXXX,"10 Crystal River Mall ",Caguas,PR,00725']

###### Taking customer_city and filtering based on city names = Caguas

In [25]:
mapped_rdd = customers_rdd.map(lambda x: x.split(",")[6])

In [26]:
mapped_rdd.take(6)

['Brownsville', 'Littleton', 'Caguas', 'San Marcos', 'Caguas', 'Passaic']

In [27]:
filtered_rdd = mapped_rdd.filter(lambda x: x == 'Caguas')

In [28]:
filtered_rdd.take(6)

['Caguas', 'Caguas', 'Caguas', 'Caguas', 'Caguas', 'Caguas']

###### Calling count() action

In [29]:
filtered_rdd.count()

4584

##### 4. Top 3 states with maximum customers

###### Loading data to RDD

In [30]:
customers_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")

###### Taking customer_states from customers_rdd and adding and sorting to get top 3 states

In [31]:
mapped_rdd = customers_rdd.map(lambda x: (x.split(",")[7],1))

In [32]:
mapped_rdd.take(10)

[('TX', 1),
 ('CO', 1),
 ('PR', 1),
 ('CA', 1),
 ('PR', 1),
 ('NJ', 1),
 ('PR', 1),
 ('MA', 1),
 ('PR', 1),
 ('VA', 1)]

In [33]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x, y: x + y)

In [34]:
reduced_rdd.take(10)

[('TX', 635),
 ('CO', 122),
 ('PR', 4771),
 ('CA', 2012),
 ('VA', 136),
 ('NM', 73),
 ('LA', 63),
 ('NY', 775),
 ('OH', 276),
 ('NC', 150)]

In [35]:
sorted_rdd = reduced_rdd.sortBy(lambda x: x[1], ascending=False)

In [36]:
sorted_rdd.take(5)

[('PR', 4771), ('CA', 2012), ('NY', 775), ('TX', 635), ('IL', 523)]

##### 5. how many customers have spent more than $1000 in total

###### Loading the data to RDD’s

In [37]:
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

In [38]:
order_items_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

###### Taking the order_id and order_item_subtotal from order_items_rdd

In [39]:
order_items_map = order_items_rdd.map(lambda x: ((int(x.split(',')[1]),float(x.split(',')[4]))))

In [40]:
order_items_map.take(4)

[(1, 299.98), (2, 199.99), (2, 250.0), (2, 129.99)]

###### Taking the order_id and order_customer_id from orders_rdd

In [41]:
orders_map = orders_rdd.map(lambda x: (int(x.split(',')[0]),(int(x.split(',')[2]))))

In [42]:
orders_map.take(7)

[(1, 11599), (2, 256), (3, 12111), (4, 8827), (5, 11318), (6, 7130), (7, 4530)]

###### Joining the 2 rdd’s using the common column – order_id

In [43]:
join_rdd = order_items_map.join(orders_map)

In [45]:
join_rdd.take(3)

[(4, (49.98, 8827)), (4, (299.95, 8827)), (4, (150.0, 8827))]

######  taking order_customer_id and order_item_subtotal.

In [46]:
mapped_rdd = join_rdd.map(lambda x: (x[1][1],x[1][0]))

In [47]:
mapped_rdd.take(3)

[(8827, 49.98), (8827, 299.95), (8827, 150.0)]

######  sum up the amount for each customer

In [48]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x,y : x+y)

######  filter the records having order_item_subtotal >1000 and counting it.
 Here we are caching the results for optimization

In [49]:
final_rdd = reduced_rdd.filter(lambda x: x[1] > 1000).cache()

In [50]:
final_rdd.count()

11148

##### 6. which state has most number of orders in CLOSED status

###### Load data to RDD

In [51]:
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

In [52]:
customers_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")

###### Taking order_customer_id and order_status columns and extracting only CLOSED orders from orders_rdd

In [53]:
orders_map = orders_rdd.map(lambda x: ((int(x.split(',')[2])),(x.split(',')[3]))).filter(lambda x: x[1] ==
'CLOSED')

###### Taking customer_id and customer_state columns from customer_rdd

In [54]:
customers_map = customers_rdd.map(lambda x: (int(x.split(',')[0]),x.split(',')[7]))

In [55]:
customers_map.take(5)

[(1, 'TX'), (2, 'CO'), (3, 'PR'), (4, 'CA'), (5, 'PR')]

###### Joining 2 Rdd’s by common column – customer_id

In [56]:
join_data = orders_map.join(customers_map)

In [57]:
join_data.take(5)

[(1492, ('CLOSED', 'CA')),
 (1492, ('CLOSED', 'CA')),
 (6000, ('CLOSED', 'PR')),
 (1352, ('CLOSED', 'MA')),
 (1352, ('CLOSED', 'MA'))]

In [58]:
mapped_rdd = join_data.map(lambda x: (x[1][1],1))

In [61]:
mapped_rdd.take(4)

[('MA', 1), ('MA', 1), ('NC', 1), ('NC', 1)]

In [62]:
final_rdd = mapped_rdd.reduceByKey(lambda x,y:x+y).sortBy(lambda x: x[1], ascending=False)

In [63]:
final_rdd.take(1)

[('PR', 2891)]

##### 7. how many customers are active (active customers are the one's who placed atleast one order)

###### Load data to RDD

In [65]:
orders_rdd= spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

In [66]:
mapped_rdd = orders_rdd.map(lambda x: (int(x.split(",")[2]),1))

In [67]:
mapped_rdd.take(5)

[(11599, 1), (256, 1), (12111, 1), (8827, 1), (11318, 1)]

######  It then reduces the RDD by key, summing up the values

In [68]:
reduced_rdd = mapped_rdd.reduceByKey(lambda x,y: x + y)

###### Finally, it filters the RDD to include only customers who have made at least one order

In [69]:
filtered_rdd = reduced_rdd.filter(lambda x: x[1] >= 1)

In [70]:
filtered_rdd.count()

12405

##### 8. What is the revenue generated by each state in sorted order

###### Load data to RDD

In [4]:
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

In [6]:
customers_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")

In [7]:
order_items_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

###### Taking order_customer_id and order_id from orders_rdd

In [8]:
orders_map = orders_rdd.map(lambda x: ((int(x.split(',')[2])),int(x.split(',')[0])))

###### Taking customer_id and customer_state from customers_rdd

In [9]:
customers_map = customers_rdd.map(lambda x: ((int(x.split(',')[0])),x.split(',')[7]))

###### Join the 2 rdds using the common column – customer_id

In [10]:
join_rdd = orders_map.join(customers_map)

In [11]:
mapped_rdd = join_rdd.map(lambda x : (x[1][0],x[1][1]))

###### Taking order_id and order_item_subtotal from order_items_rdd

In [12]:
order_items_map = order_items_rdd.map(lambda x: (int(x.split(',')[1]),float(x.split(',')[4])))

###### Now join the 2 rdds with the common column – order_id

In [13]:
join_new_rdd = mapped_rdd.join(order_items_map)

In [14]:
reduced_rdd = join_new_rdd.map(lambda x : (x[1][0],x[1][1])).reduceByKey(lambda x,y:x+y)

In [15]:
final_rdd = reduced_rdd.sortBy(lambda x: x[1], ascending=False)

In [16]:
final_rdd.collect()

[('PR', 13208867.68999928),
 ('CA', 5542722.999999759),
 ('NY', 2152706.739999984),
 ('TX', 1731407.4899999977),
 ('IL', 1457225.8300000038),
 ('FL', 1048609.7700000028),
 ('OH', 773804.1100000018),
 ('MI', 730078.9700000018),
 ('PA', 724375.9300000018),
 ('NJ', 606550.9900000009),
 ('AZ', 566459.290000001),
 ('GA', 467765.18000000046),
 ('MD', 456100.42000000045),
 ('NC', 378877.64),
 ('CO', 358310.60000000027),
 ('VA', 344824.3500000002),
 ('OR', 315239.5100000001),
 ('MA', 306025.73000000016),
 ('TN', 297614.4100000002),
 ('NV', 276364.97000000015),
 ('MO', 260417.2800000002),
 ('HI', 238208.11000000013),
 ('CT', 211264.2400000001),
 ('UT', 184356.8300000001),
 ('LA', 182954.74000000005),
 ('NM', 181366.6500000001),
 ('WA', 172982.4500000001),
 ('WI', 164058.32000000004),
 ('MN', 133183.97),
 ('SC', 129411.78999999995),
 ('IN', 110015.13999999996),
 ('DC', 108841.47999999995),
 ('KY', 94894.74999999996),
 ('KS', 85865.06999999996),
 ('DE', 48873.249999999985),
 ('RI', 47439.34999999

##### 1.Find the top 10 states with the highest no. of positive cases.

In [17]:
rdd = spark.sparkContext.textFile("/public/trendytech/covid19/cases/covid_dataset_cases.csv")

###### Take state and positive columns from rdd

In [18]:
filtered_rdd = rdd.map(lambda x : (x.split(",")[1],int(x.split(",")[2])))

In [19]:
positive_cases = filtered_rdd.reduceByKey(lambda x,y : x + y).sortBy(lambda x: x[1], ascending=False)

In [21]:
positive_cases.take(10)

[('WA', 1701),
 ('GA', 1017),
 ('MH', 730),
 ('MI', 61),
 ('CA', 53),
 ('GJ', 35),
 ('BR', 23),
 ('JH', 13),
 ('CG', 8),
 ('RI', 6)]

##### 2.Find the total count of people in ICU currently

In [24]:
rdd_2 = spark.sparkContext.textFile("/public/trendytech/covid19/cases/covid_dataset_cases.csv")

In [25]:
icu_count = rdd_2.map(lambda x: int(x.split(",")[7])).sum()

In [27]:
print(icu_count)

PythonRDD[54] at RDD at PythonRDD.scala:53


##### 3.Find the top 15 States having maximum no. of recovery.

In [28]:
rdd_3 = spark.sparkContext.textFile("/public/trendytech/covid19/cases/covid_dataset_cases.csv")

In [29]:
recovered_rdd = rdd_3.map(lambda x: (x.split(",")[1],int(x.split(",")[11]))).reduceByKey(lambda x,y:
x+y)

In [30]:
recovered_rdd.take(7)

[('MH', 165),
 ('RI', 72),
 ('CA', 23),
 ('AP', 84),
 ('HP', 19),
 ('AS', 30),
 ('CG', 17)]

In [31]:
sorted_rdd = recovered_rdd.sortBy(lambda x: x[1], ascending=False)

In [32]:
sorted_rdd.take(17)

[('WA', 451),
 ('MH', 165),
 ('MI', 101),
 ('GA', 87),
 ('AP', 84),
 ('RI', 72),
 ('BR', 68),
 ('JH', 50),
 ('KA', 43),
 ('AZ', 38),
 ('AS', 30),
 ('GJ', 27),
 ('CA', 23),
 ('HR', 20),
 ('HP', 19),
 ('CG', 17)]

##### 4.Find the top 3 States having least no. of deaths.

In [33]:
rdd_4 = spark.sparkContext.textFile("/public/trendytech/covid19/cases/covid_dataset_cases.csv")

In [34]:
death_rdd = rdd_4.map(lambda x: (x.split(",")[1],int(x.split(",")[23]))).reduceByKey(lambda x,y: x+y)

In [35]:
least_rdd = death_rdd.sortBy(lambda x: x[1])

In [36]:
least_rdd.take(3)

[('AS', 9), ('JH', 10), ('CG', 31)]

##### 5.Find the total number of people hospitalized currently.

In [37]:
rdd_5 = spark.sparkContext.textFile("/public/trendytech/covid19/cases/covid_dataset_cases.csv")

In [38]:
hosp_rdd = rdd_5.map(lambda x: int(x.split(",")[5])).sum()

In [39]:
print(hosp_rdd)

1319


##### 6.List the twitter handle and fips code for the top 15 states with the highest number of total cases.

In [41]:
cases_rdd = spark.sparkContext.textFile("/public/trendytech/covid19/cases/covid_dataset_cases.csv")

In [42]:
states_rdd = spark.sparkContext.textFile("/public/trendytech/covid19/states/covid_dataset_states.csv")

###### Taking state, twitter and fips columns from states_rdd

In [43]:
states_mapped =states_rdd.map(lambda x: (x.split(",")[0],(x.split(",")[5],int(x.split(",")[8]))))

###### Taking state, total columns from cases_rdd

In [44]:
rdd1 = cases_rdd.map(lambda x: (x.split(",")[1],int(x.split(",")[28])))

###### Summing up total cases for each states

In [45]:
total_cases = rdd1.reduceByKey(lambda x, y: x + y)

###### Joining the 2 rdds based on common column -> state

In [46]:
joined_rdd = total_cases.join(states_mapped)

In [47]:
final_rdd = joined_rdd.sortBy(lambda x: x[1][0], ascending=False)

In [48]:
final_rdd.take(15)

[('WA', (2100, ('@WACovid', 44))),
 ('GA', (1034, ('@GACovid', 44))),
 ('MH', (730, ('@MHCovid', 26))),
 ('CA', (515, ('@CACovid', 4))),
 ('MI', (61, ('@MICovid', 53))),
 ('GJ', (35, ('@GJCovid', 44))),
 ('AZ', (34, ('@AZCovid', 53))),
 ('BR', (23, ('@BRCovid', 53))),
 ('RI', (16, ('@RICovid', 26))),
 ('JH', (13, ('@JHCovid', 53))),
 ('CG', (8, ('@CGCovid', 53))),
 ('KA', (5, ('@KACovid', 53))),
 ('HP', (4, ('@HPCovid', 53))),
 ('AS', (2, ('@ASCovid', 6))),
 ('HR', (2, ('@HRCovid', 9)))]

##### 1.Find the top 20 words from Trendytech Students Google Reviews excluding the boring words.

In [3]:
rdd1 = spark.sparkContext.textFile("/public/trendytech/reviews/trendytech-student-reviews.csv")

In [4]:
rdd1.take(2)

['I got to know about this course by recommendation from one of my senior and i would say that the course has Excellent lectures(content) that explored my mind.',
 'The Sumit is very much passionate about teaching Big Data and his style of teaching is very unique and engaging the flow of course content.']

######  converting all the words to lowercase

In [5]:
rdd2 = rdd1.flatMap(lambda x:x.split(" ")).map(lambda x:x.lower())

In [6]:
rdd2.take(5)

['i', 'got', 'to', 'know', 'about']

In [13]:
rdd3 = rdd2.map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y)

In [14]:
rdd3.take(5)

[('was', 61), ('learning', 31), ('journey', 6), ('sumit', 109), ('i', 215)]

###### Load the boringwords

In [9]:
boring_words=spark.sparkContext.textFile("/user/itv006334/TT/boringwords.txt")

In [10]:
boring_words.take(6)

['shouldnt', 'worrying', 'simplify', 'tidy', 'shouldnt', 'yep']

##### broadcasting the boring words to all worker nodes

In [11]:
broadcast_bw = spark.sparkContext.broadcast(boring_words.collect())

##### Filtering the words which are not in broadcast_bw( boringwords).

In [15]:
rdd4 = rdd3.filter(lambda x : x[0] not in broadcast_bw.value)

In [16]:
rdd5 = rdd4.reduceByKey(lambda x,y : x+y).sortBy(lambda x: x[1], ascending=False)

In [17]:
rdd5.take(20)

[('data', 201),
 ('sumit', 109),
 ('trendytech', 67),
 ('', 64),
 ('data.', 34),
 ('course.', 33),
 ("sir's", 23),
 ('trendy', 14),
 ('course,', 13),
 ("master's", 13),
 ('domain.', 12),
 ("trendytech's", 12),
 ('sir.', 11),
 ('program.', 9),
 ('field.', 9),
 ('concepts.', 9),
 ('hands-on', 8),
 ('fresher', 8),
 ('amazing.', 8),
 ('career.', 7)]