In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('Pyspark-Examples') \
                    .getOrCreate()

In [3]:
emp = [(1,"Smith",-1,"2018","10","M",3000), 
    (2,"Rose",1,"2010","20","M",4000), 
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000), 
    (5,"Brown",2,"2010","40","",-1), 
      (6,"Brown",2,"2010","50","",-1) 
  ]

#["emp_id","name","superior_emp_id","year_joined", "emp_dept_id","gender","salary"]

In [4]:
dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]

#["dept_name","dept_id"]

In [6]:
empRDD = spark.sparkContext.parallelize(emp)
deptRDD = spark.sparkContext.parallelize(dept)

print(empRDD.collect())
print(deptRDD.collect())

[(1, 'Smith', -1, '2018', '10', 'M', 3000), (2, 'Rose', 1, '2010', '20', 'M', 4000), (3, 'Williams', 1, '2010', '10', 'M', 1000), (4, 'Jones', 2, '2005', '10', 'F', 2000), (5, 'Brown', 2, '2010', '40', '', -1), (6, 'Brown', 2, '2010', '50', '', -1)]
[('Finance', 10), ('Marketing', 20), ('Sales', 30), ('IT', 40)]


In [7]:
type(empRDD)
type(deptRDD)

pyspark.rdd.RDD

In [11]:
empRdd1 = empRDD.map(lambda r : (int(r[4]),(r[0],r[1],r[2],r[3],r[5],r[6])))
print(empRdd1.collect())

[(10, (1, 'Smith', -1, '2018', 'M', 3000)), (20, (2, 'Rose', 1, '2010', 'M', 4000)), (10, (3, 'Williams', 1, '2010', 'M', 1000)), (10, (4, 'Jones', 2, '2005', 'F', 2000)), (40, (5, 'Brown', 2, '2010', '', -1)), (50, (6, 'Brown', 2, '2010', '', -1))]


In [17]:
deptRdd1 = deptRDD.map(lambda r: (int(r[1]),r[0]))
print(deptRdd1.collect())

[(10, 'Finance'), (20, 'Marketing'), (30, 'Sales'), (40, 'IT')]


# INNER JOIN

In [23]:
empJoin = empRdd1.join(deptRdd1)
#print(empJoin.collect())
#print(type(empJoin))

for i in empJoin.collect():
    print(i)

[(10, ((1, 'Smith', -1, '2018', 'M', 3000), 'Finance')), (10, ((3, 'Williams', 1, '2010', 'M', 1000), 'Finance')), (10, ((4, 'Jones', 2, '2005', 'F', 2000), 'Finance')), (20, ((2, 'Rose', 1, '2010', 'M', 4000), 'Marketing')), (40, ((5, 'Brown', 2, '2010', '', -1), 'IT'))]
<class 'pyspark.rdd.PipelinedRDD'>
(10, ((1, 'Smith', -1, '2018', 'M', 3000), 'Finance'))
(10, ((3, 'Williams', 1, '2010', 'M', 1000), 'Finance'))
(10, ((4, 'Jones', 2, '2005', 'F', 2000), 'Finance'))
(20, ((2, 'Rose', 1, '2010', 'M', 4000), 'Marketing'))
(40, ((5, 'Brown', 2, '2010', '', -1), 'IT'))


In [26]:
#formatting the output
resjoin = empJoin.map(lambda x: (x[0],x[1][0][0],x[1][0][1],x[1][1]))
print(resjoin.collect())

[(10, 1, 'Smith', 'Finance'), (10, 3, 'Williams', 'Finance'), (10, 4, 'Jones', 'Finance'), (20, 2, 'Rose', 'Marketing'), (40, 5, 'Brown', 'IT')]


# Left Outer Join

In [24]:
empLeftJoin = empRdd1.leftOuterJoin(deptRdd1)
for i in empLeftJoin.collect():
    print(i)

(10, ((1, 'Smith', -1, '2018', 'M', 3000), 'Finance'))
(10, ((3, 'Williams', 1, '2010', 'M', 1000), 'Finance'))
(10, ((4, 'Jones', 2, '2005', 'F', 2000), 'Finance'))
(20, ((2, 'Rose', 1, '2010', 'M', 4000), 'Marketing'))
(40, ((5, 'Brown', 2, '2010', '', -1), 'IT'))
(50, ((6, 'Brown', 2, '2010', '', -1), None))


# Right Outer Join

In [25]:
empRightJoin = empRdd1.rightOuterJoin(deptRdd1)
for i in empRightJoin.collect():
    print(i)

(10, ((1, 'Smith', -1, '2018', 'M', 3000), 'Finance'))
(10, ((3, 'Williams', 1, '2010', 'M', 1000), 'Finance'))
(10, ((4, 'Jones', 2, '2005', 'F', 2000), 'Finance'))
(20, ((2, 'Rose', 1, '2010', 'M', 4000), 'Marketing'))
(40, ((5, 'Brown', 2, '2010', '', -1), 'IT'))
(30, (None, 'Sales'))


# Full outer join

In [27]:
empFullJoin = empRdd1.fullOuterJoin(deptRdd1)
for i in empFullJoin.collect():
    print(i)

(10, ((1, 'Smith', -1, '2018', 'M', 3000), 'Finance'))
(10, ((3, 'Williams', 1, '2010', 'M', 1000), 'Finance'))
(10, ((4, 'Jones', 2, '2005', 'F', 2000), 'Finance'))
(20, ((2, 'Rose', 1, '2010', 'M', 4000), 'Marketing'))
(40, ((5, 'Brown', 2, '2010', '', -1), 'IT'))
(50, ((6, 'Brown', 2, '2010', '', -1), None))
(30, (None, 'Sales'))


# add salaries in every dept

In [32]:
topSalDept = empLeftJoin.map(lambda x: (x[0],int(x[1][0][5])))
computeSalbyDept = topSalDept.reduceByKey(lambda x,y : x+y)
salByTop = computeSalbyDept.sortByKey()
print(salByTop.collect())

[(10, 6000), (20, 4000), (40, -1), (50, -1)]
