In [None]:
!pip install pyspark py4j

**Initialization**

In [2]:
import sys

from pyspark.sql import SparkSession
from pyspark.context import SparkContext

ss = SparkSession \
    .builder \
    .appName("BigData") \
    .master("local")\
    .getOrCreate()

**Matrix-Vector Multiplication**

In [5]:
rddA = ss.read.csv("A.csv", header= True).rdd
rddv = ss.read.csv("v.csv", header= True).rdd

rA = rddA.map(lambda x: ((int(x[0]), int(x[1])), (float(x[2]))))
rv = rddv.flatMap(lambda x: [((j,int(x[0])), (float(x[1]))) for j in range(1,3,1)])
r = rA+rv
r = r.reduceByKey(lambda x,y: x*y).map(lambda x: (x[0][0],x[1])).reduceByKey(lambda x,y: x+y)

print(rA.collect(), rv.collect(), r.collect(), sep='\n')

[((1, 1), 5.0), ((1, 2), 5.0), ((2, 1), 1.0), ((2, 2), 1.0)]
[((1, 1), 5.0), ((2, 1), 5.0), ((1, 2), 6.0), ((2, 2), 6.0)]
[(2, 11.0), (1, 55.0)]


**Relational-Algebra Operations**

In [6]:
rddL1 = ss.read.csv("Links1.csv", header= True).rdd
rddL2 = ss.read.csv("Links2.csv", header= True).rdd

rL1 = rddL1.map(lambda x: ((x[1]), (x[0], x[1])))
rL2 = rddL2.map(lambda x: ((x[0]), (x[0], x[1])))

r = rL1 + rL2 

r = r.reduceByKey(lambda x, y: x + y)

r.collect()

[('url12', ('url46', 'url12')),
 ('url48', ('url47', 'url48', 'url10', 'url48')),
 ('url26', ('url44', 'url26', 'url26', 'url43')),
 ('url3', ('url25', 'url3')),
 ('url36', ('url46', 'url36', 'url36', 'url48')),
 ('url42', ('url43', 'url42')),
 ('url37', ('url31', 'url37')),
 ('url10', ('url10', 'url13', 'url10', 'url25')),
 ('url8', ('url8', 'url47')),
 ('url29', ('url29', 'url31')),
 ('url16', ('url16', 'url48', 'url16', 'url4')),
 ('url43', ('url43', 'url25')),
 ('url27', ('url27', 'url49')),
 ('url38', ('url38', 'url37')),
 ('url11', ('url11', 'url43')),
 ('url18', ('url18', 'url37')),
 ('url5', ('url5', 'url45')),
 ('url6', ('url6', 'url1')),
 ('url47', ('url16', 'url47', 'url47', 'url17', 'url47', 'url23')),
 ('url34', ('url34', 'url34')),
 ('url45', ('url13', 'url45')),
 ('url19', ('url38', 'url19', 'url7', 'url19')),
 ('url50', ('url19', 'url50')),
 ('url41', ('url2', 'url41', 'url41', 'url11')),
 ('url25', ('url42', 'url25')),
 ('url28', ('url41', 'url28')),
 ('url23', ('url20

In [7]:
r = r.flatMap(lambda x: [((y[j], y[j+1], y[k+1])) for y in x if type(y) != str and len(y) >= 4 
                         for j in range(0, len(y), 2) 
                         for k in range(j+2, len(y), 2) if y[j] != y[k] and y[j+1] == y[k]])
r.collect()

[('url44', 'url26', 'url43'),
 ('url46', 'url36', 'url48'),
 ('url16', 'url47', 'url17'),
 ('url16', 'url47', 'url23'),
 ('url2', 'url41', 'url11'),
 ('url12', 'url32', 'url14')]

**Matrix Multiplication**

In [8]:
rddA = ss.read.csv("A.csv", header= True).rdd
rddB = ss.read.csv("B.csv", header= True).rdd

rA = rddA.map(lambda x: ((int(x[1]),('A', int(x[0]), (float(x[2]))))))
rB = rddB.map(lambda x: ((int(x[0]),('B', int(x[1]), (float(x[2]))))))
r = rA + rB

print(r.collect())

r = r.reduceByKey(lambda x,y: x+y).map(lambda x: [((int(x[1][i-1]), int(x[1][j-1])), x[1][i]*x[1][j]) 
                                                                        for i in range(2, int(len(x[1])/2)+2, 3) 
                                                                        for j in range(int(len(x[1])/2)+2, len(x[1])+2, 3)])

print(r.collect())

r = r.flatMap(lambda x: x).reduceByKey(lambda x, y: x+y).sortBy(lambda x: x[0])

r.collect()

[(1, ('A', 1, 5.0)), (2, ('A', 1, 5.0)), (1, ('A', 2, 1.0)), (2, ('A', 2, 1.0)), (1, ('B', 1, 3.0)), (1, ('B', 2, 2.0)), (2, ('B', 1, 5.0)), (2, ('B', 2, 2.0))]
[[((1, 1), 25.0), ((1, 2), 10.0), ((2, 1), 5.0), ((2, 2), 2.0)], [((1, 1), 15.0), ((1, 2), 10.0), ((2, 1), 3.0), ((2, 2), 2.0)]]


[((1, 1), 40.0), ((1, 2), 20.0), ((2, 1), 8.0), ((2, 2), 4.0)]

**Matrix Multiplication with One MapReduce Step**

In [9]:
rddA = ss.read.csv("A.csv", header= True).rdd
rddB = ss.read.csv("B.csv", header= True).rdd

a = rddA.count()
b = rddB.count()

rA = rddA.flatMap(lambda x: [((int(x[0]), k), ('A', int(x[1]), float(x[2]))) for k in range(1, int(a/2)+1)])
rB = rddB.flatMap(lambda x: [((k, int(x[1])), ('B', int(x[1]), float(x[2]))) for k in range(1, int(b/2)+1)])

r = (rA + rB).sortBy(lambda x: x[0])

r.collect()

[((1, 1), ('A', 1, 5.0)),
 ((1, 1), ('A', 2, 5.0)),
 ((1, 1), ('B', 1, 3.0)),
 ((1, 1), ('B', 1, 5.0)),
 ((1, 2), ('A', 1, 5.0)),
 ((1, 2), ('A', 2, 5.0)),
 ((1, 2), ('B', 2, 2.0)),
 ((1, 2), ('B', 2, 2.0)),
 ((2, 1), ('A', 1, 1.0)),
 ((2, 1), ('A', 2, 1.0)),
 ((2, 1), ('B', 1, 3.0)),
 ((2, 1), ('B', 1, 5.0)),
 ((2, 2), ('A', 1, 1.0)),
 ((2, 2), ('A', 2, 1.0)),
 ((2, 2), ('B', 2, 2.0)),
 ((2, 2), ('B', 2, 2.0))]

In [10]:
r = r.reduceByKey(lambda x, y: x + y)
r.collect()

[((1, 1), ('A', 1, 5.0, 'A', 2, 5.0, 'B', 1, 3.0, 'B', 1, 5.0)),
 ((2, 2), ('A', 1, 1.0, 'A', 2, 1.0, 'B', 2, 2.0, 'B', 2, 2.0)),
 ((1, 2), ('A', 1, 5.0, 'A', 2, 5.0, 'B', 2, 2.0, 'B', 2, 2.0)),
 ((2, 1), ('A', 1, 1.0, 'A', 2, 1.0, 'B', 1, 3.0, 'B', 1, 5.0))]

In [11]:
r = r.map(lambda x: [(x[0], x[1][i]*x[1][j]) 
                                           for i in range(2, int(len(x[1])/2)+2, 3) 
                                           for j in range(int(len(x[1])/2)+2, len(x[1])+2, 3) 
                                           if x[1][i-1] == x[1][j-1]]).flatMap(lambda x: x).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[0])
r.collect()

[((1, 1), 40.0), ((1, 2), 20.0), ((2, 1), 8.0), ((2, 2), 4.0)]