In [1]:
from pyspark.sql import SparkSession
import getpass

username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port','0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    master('yarn'). \
    getOrCreate()

In [2]:
# Question 1

In [3]:
orders_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/orders/*")

In [4]:
order_items_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/order_items/*")

In [5]:
rdd1 = orders_rdd.map(lambda x: (int(x.split(",")[0]), int(x.split(",")[2])))

In [6]:
rdd_1 = order_items_rdd.map(lambda x: (int(x.split(",")[1]), float(x.split(",")[4])))

In [7]:
rdd_joined = rdd_1.join(rdd1)

In [8]:
rdd_joined.take(5)

[(4, (49.98, 8827)),
 (4, (299.95, 8827)),
 (4, (150.0, 8827)),
 (4, (199.92, 8827)),
 (8, (179.97, 2911))]

In [9]:
rdd_mapped = rdd_joined.map(lambda x: (x[1][1], x[1][0]))

In [10]:
rdd_reduced = rdd_mapped.reduceByKey(lambda x, y: x + y)

In [11]:
rdd_sorted = rdd_reduced.sortBy(lambda x: x[1], ascending = False)

In [12]:
rdd_sorted.take(10)

[(791, 10524.169999999998),
 (9371, 9299.029999999999),
 (8766, 9296.14),
 (1657, 9223.710000000001),
 (2641, 9130.92),
 (1288, 9019.11),
 (3710, 9019.099999999999),
 (4249, 8918.85),
 (5654, 8904.95),
 (5624, 8761.98)]

In [13]:
rdd2 = order_items_rdd.map(lambda x: (x.split(",")[2], int(x.split(",")[3])))

In [14]:
rdd2.reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending = False).take(10)

[('365', 73698),
 ('502', 62956),
 ('1014', 57803),
 ('191', 36680),
 ('627', 31735),
 ('403', 22246),
 ('1004', 17325),
 ('1073', 15500),
 ('957', 13729),
 ('977', 998)]

In [15]:
customers_rdd = spark.sparkContext.textFile("/public/trendytech/retail_db/customers/*")

In [16]:
customers_rdd.filter(lambda x: x.split(",")[6].lower() == "caguas").count()

4584

In [17]:
customers_rdd.map(lambda x: (x.split(",")[7], 1)).reduceByKey(lambda x, y: x+ y).sortBy(lambda x: x[1], ascending = False).take(3)

[('PR', 4771), ('CA', 2012), ('NY', 775)]

In [18]:
rdd_sorted.filter(lambda x: float(x[1]) > 1000).count()

11148

In [19]:
rdd6 = customers_rdd.map(lambda x: (x.split(",")[0], x.split(",")[7]))

In [20]:
rdd_6 = orders_rdd.map(lambda x: (x.split(",")[2], x.split(",")[3]))

In [21]:
joined_rdd_6 = rdd6.join(rdd_6)

In [22]:
joined_rdd_6.take(10)

[('6241', ('IL', 'COMPLETE')),
 ('6241', ('IL', 'CLOSED')),
 ('6241', ('IL', 'ON_HOLD')),
 ('6243', ('IL', 'ON_HOLD')),
 ('6243', ('IL', 'PENDING')),
 ('6243', ('IL', 'PENDING_PAYMENT')),
 ('6245', ('PR', 'COMPLETE')),
 ('6245', ('PR', 'COMPLETE')),
 ('6245', ('PR', 'PENDING')),
 ('6252', ('TX', 'CLOSED'))]

In [23]:
joined_rdd_6.map(lambda x: x[1]).filter(lambda x: x[1] == "CLOSED").map(lambda x: (x[0], 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], False).take(5)

[('PR', 2891), ('CA', 1232), ('NY', 450), ('TX', 403), ('IL', 313)]

In [24]:
orders_rdd.map(lambda x: x.split(",")[2]).distinct().count()

12405

In [25]:
rdd8 = orders_rdd.map(lambda x: (x.split(",")[2], x.split(",")[0]))

In [26]:
rdd_8 = customers_rdd.map(lambda x: (x.split(",")[0], x.split(",")[7]))

In [27]:
rdd8_joined = rdd8.join(rdd_8)

In [28]:
rdd8_joined.take(5)

[('256', ('2', 'IL')),
 ('256', ('9467', 'IL')),
 ('256', ('13037', 'IL')),
 ('256', ('23971', 'IL')),
 ('256', ('24394', 'IL'))]

In [29]:
rdd8_mapped = rdd8_joined.map(lambda x: x[1])

In [30]:
rdd__8 = order_items_rdd.map(lambda x: (x.split(",")[1], float(x.split(",")[4])))

In [31]:
rdd__8_joined = rdd8_mapped.join(rdd__8)

In [32]:
rdd__8_joined.take(5)

[('4', ('TX', 49.98)),
 ('4', ('TX', 299.95)),
 ('4', ('TX', 150.0)),
 ('4', ('TX', 199.92)),
 ('16', ('PR', 119.98))]

In [33]:
rdd__8_joined.map(lambda x: x[1]).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], False).collect()

[('PR', 13208867.689999282),
 ('CA', 5542722.999999759),
 ('NY', 2152706.739999986),
 ('TX', 1731407.4899999993),
 ('IL', 1457225.8300000036),
 ('FL', 1048609.7700000026),
 ('OH', 773804.1100000018),
 ('MI', 730078.9700000015),
 ('PA', 724375.9300000014),
 ('NJ', 606550.9900000012),
 ('AZ', 566459.2900000012),
 ('GA', 467765.18000000087),
 ('MD', 456100.4200000006),
 ('NC', 378877.6400000001),
 ('CO', 358310.6000000003),
 ('VA', 344824.35000000015),
 ('OR', 315239.51000000024),
 ('MA', 306025.7300000003),
 ('TN', 297614.4100000002),
 ('NV', 276364.9700000002),
 ('MO', 260417.28000000017),
 ('HI', 238208.1100000001),
 ('CT', 211264.2400000001),
 ('UT', 184356.83000000007),
 ('LA', 182954.7400000001),
 ('NM', 181366.65000000002),
 ('WA', 172982.45000000007),
 ('WI', 164058.32),
 ('MN', 133183.97),
 ('SC', 129411.79000000001),
 ('IN', 110015.13999999996),
 ('DC', 108841.47999999995),
 ('KY', 94894.74999999996),
 ('KS', 85865.06999999995),
 ('DE', 48873.249999999985),
 ('RI', 47439.3499999

In [34]:
# Question 3

In [2]:
stdent_reviews_rdd = spark.sparkContext.textFile("/public/trendytech/reviews/trendytech-student-reviews.csv")

In [3]:
words_rdd = stdent_reviews_rdd.flatMap(lambda x: x.split(" ")).map(lambda x: x.lower())

In [4]:
boring_words_rdd = spark.sparkContext.textFile("/user/itv019463/data/boringwords.txt").map(lambda x: x.lower())

In [5]:
subtracted_rdd = words_rdd.subtract(boring_words_rdd)

In [6]:
subtracted_rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+ y).sortBy(lambda x: x[1], False).take(20)

[('data', 201),
 ('sumit', 109),
 ('trendytech', 67),
 ('', 64),
 ('data.', 34),
 ('course.', 33),
 ("sir's", 23),
 ('trendy', 14),
 ("master's", 13),
 ('course,', 13),
 ('domain.', 12),
 ("trendytech's", 12),
 ('sir.', 11),
 ('concepts.', 9),
 ('program.', 9),
 ('field.', 9),
 ('hands-on', 8),
 ('fresher', 8),
 ('amazing.', 8),
 ('career.', 7)]

In [7]:
spark.stop()