In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkConf, SparkContext

In [3]:
conf = SparkConf().setMaster('local[4]').setAppName('BroadcastVariablesOps')

In [4]:
sc = SparkContext(conf=conf).getOrCreate()

In [5]:
def read_products():
    products_text_wrapper = open('products.csv', 'r')
    
    products = products_text_wrapper.readlines()
    
    product_id_name = {}
    
    for line in products:
        if 'productCategoryId' not in line:
            product_id = int(line.split(',')[0])
            product_name = line.split(',')[2]
            product_id_name.update({product_id:product_name})
    
    return product_id_name

In [6]:
products = read_products()

In [7]:
broadcast_products = sc.broadcast(products)

In [8]:
broadcast_products.value.get(114)

"Nike Men's Fly Shorts 2.0"

# Read orders and create an RDD

In [9]:
order_items_rdd = sc.textFile('order_items.csv') \
.filter(lambda x: 'orderItemOrderId' not in x)

In [10]:
order_items_rdd.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

# Make Pair RDD

In [16]:
def make_order_items_pair_rdd(line):
    order_item_product_id = int(line.split(',')[2])
    order_item_sub_total = float(line.split(',')[4])
    
    return (order_item_product_id, order_item_sub_total)

In [17]:
order_items_pair_rdd = order_items_rdd.map(make_order_items_pair_rdd)

In [18]:
order_items_pair_rdd.take(5)

[(957, 299.98), (1073, 199.99), (502, 250.0), (403, 129.99), (897, 49.98)]

# Query to find products with the most turnover

In [23]:
sorted_orders = order_items_pair_rdd.reduceByKey(lambda x,y: (x+y)) \
.map(lambda x: (x[1], x[0])) \
.sortByKey(False) \
.map(lambda x: (x[1], x[0])) \

# Join orders and products which are broadcast variable

In [24]:
sorted_orders_with_product_name = sorted_orders \
.map(lambda x: (broadcast_products.value.get(x[0]), x[1]))

In [25]:
sorted_orders_with_product_name.take(10)

[('Field & Stream Sportsman 16 Gun Fire Safe', 6929653.499999708),
 ('Perfect Fitness Perfect Rip Deck', 4421143.019999639),
 ("Diamondback Women's Serene Classic Comfort Bi", 4118425.419999785),
 ("Nike Men's Free 5.0+ Running Shoe", 3667633.1999997487),
 ("Nike Men's Dri-FIT Victory Golf Polo", 3147800.0),
 ('Pelican Sunstream 100 Kayak', 3099844.999999871),
 ("Nike Men's CJ Elite 2 TD Football Cleat", 2891757.5399998166),
 ("O'Brien Men's Neoprene Life Vest", 2888993.939999649),
 ("Under Armour Girls' Toddler Spine Surge Runni", 1269082.649999932),
 ('adidas Youth Germany Black/Red Away Match Soc', 67830.0)]