In [1]:
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[4]").setAppName("BroadcastVariablesOps")
sc = SparkContext(conf=conf).getOrCreate()

# products.csv dosyasını okuyup (urün_id, ürün_adı) döndüren fonksiyon

In [2]:
def read_products():
    products_text_wrapper = open("products.csv", "r", encoding="utf-8")
    
    # satır okuma
    products = products_text_wrapper.readlines()
    
    product_id_name = {}
    
    for line in products:
        
        # başlıktan kurtul
        if "productCategoryId" not in line:
            product_id = int(line.split(",")[0])
            product_name = line.split(",")[2]
            
            product_id_name.update({product_id:product_name})
            
    return product_id_name

In [3]:
products = read_products()

In [4]:
broadcast_products = sc.broadcast(products)

In [6]:
broadcast_products.value.get(13)

"Under Armour Men's Renegade D Mid Football Cl"

# order_items okuma ve rdd oluşturma

In [7]:
order_items_rdd = sc.textFile("order_items.csv") \
.filter(lambda x: "orderItemOrderId" not in x)

In [9]:
order_items_rdd.take(5)

['1,1,957,1,299.98,299.98',
 '2,2,1073,1,199.99,199.99',
 '3,2,502,5,250.0,50.0',
 '4,2,403,1,129.99,129.99',
 '5,4,897,2,49.98,24.99']

# order_items pair_rdd yapma

In [10]:
def make_order_items_pair_rdd(line):
    order_item_product_id = int(line.split(",")[2])
    order_item_sub_total = float(line.split(",")[4])
    
    return (order_item_product_id, order_item_sub_total)


In [11]:
order_items_pair_rdd = order_items_rdd.map(make_order_items_pair_rdd)

In [13]:
order_items_pair_rdd.take(10)

[(957, 299.98),
 (1073, 199.99),
 (502, 250.0),
 (403, 129.99),
 (897, 49.98),
 (365, 299.95),
 (502, 150.0),
 (1014, 199.92),
 (957, 299.98),
 (365, 299.95)]

In [14]:
order_items_pair_rdd.reduceByKey(lambda x,y: (x+y)) \
.map(lambda x: (x[1], x[0])) \
.sortByKey(False) \
.map(lambda x: (x[1], x[0])) \
.take(10)

[(1004, 6929653.499999708),
 (365, 4421143.019999639),
 (957, 4118425.419999785),
 (191, 3667633.1999997487),
 (502, 3147800.0),
 (1073, 3099844.999999871),
 (403, 2891757.5399998166),
 (1014, 2888993.9399996493),
 (627, 1269082.649999932),
 (565, 67830.0)]

In [15]:
sorted_orders = order_items_pair_rdd.reduceByKey(lambda x,y: (x+y)) \
.map(lambda x: (x[1], x[0])) \
.sortByKey(False) \
.map(lambda x: (x[1], x[0])) \
# .take(10)

# order_items ile broadcast variable olan products birleştirme

In [16]:
sorted_orders_with_product_name = sorted_orders \
.map(lambda x: (broadcast_products.value.get(x[0]), x[1]))

In [17]:
sorted_orders_with_product_name.take(10)

[('Field & Stream Sportsman 16 Gun Fire Safe', 6929653.499999708),
 ('Perfect Fitness Perfect Rip Deck', 4421143.019999639),
 ("Diamondback Women's Serene Classic Comfort Bi", 4118425.419999785),
 ("Nike Men's Free 5.0+ Running Shoe", 3667633.1999997487),
 ("Nike Men's Dri-FIT Victory Golf Polo", 3147800.0),
 ('Pelican Sunstream 100 Kayak', 3099844.999999871),
 ("Nike Men's CJ Elite 2 TD Football Cleat", 2891757.5399998166),
 ("O'Brien Men's Neoprene Life Vest", 2888993.9399996493),
 ("Under Armour Girls' Toddler Spine Surge Runni", 1269082.649999932),
 ('adidas Youth Germany Black/Red Away Match Soc', 67830.0)]