***Problem Scenario*** 

Given 3 csv files

1.   Employeename.csv - fields (id, name)
2.   Employeemanager.csv - fields (id, managerName) 
3.   EmployeeSalary.csv - fields (id, salary)

using Spark RDD APIs, generate a joined output - 
(id,name,salary,managerName) and save a local csv file.




In [2]:
!pip install --quiet pyspark

[K     |████████████████████████████████| 218.4MB 60kB/s 
[K     |████████████████████████████████| 204kB 48.6MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [11]:
from pyspark import SparkConf, SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

# 1) load EmployeeManager.csv as RDD
manager = sc.textFile('/content/EmployeeManager.csv')
managerPairRDD = manager.map(lambda x: (x.split(',')[0], x.split(',')[1]))
managerPairRDD.take(5)

[('E01', 'Vishnu'),
 ('E02', 'Satyam'),
 ('E03', 'Shiv'),
 ('E04', 'Sundar'),
 ('E05', 'John')]

In [12]:
# 2) load EmployeeName.csv as RDD
name = sc.textFile('/content/EmployeeName.csv')
namePairRDD = name.map(lambda x: (x.split(',')[0], x.split(',')[1]))
namePairRDD.take(5)

[('E01', 'Lokesh'),
 ('E02', 'Bhupesh'),
 ('E03', 'Amit'),
 ('E04', 'Ratan'),
 ('E05', 'Dinesh')]

In [13]:
# 3) load EmployeeSalary.csv as RDD
salary = sc.textFile('/content/EmployeeSalary.csv')
salaryPairRDD = salary.map(lambda x: (x.split(',')[0], x.split(',')[1]))
salaryPairRDD.take(5)

[('E01', '50000'),
 ('E02', '50000'),
 ('E03', '45000'),
 ('E04', '45000'),
 ('E05', '50000')]

In [14]:
joinedRDD = namePairRDD.join(salaryPairRDD).join(managerPairRDD)
joinedRDD.take(5)

[('E09', (('Kumar', '10000'), 'Vinod')),
 ('E04', (('Ratan', '45000'), 'Sundar')),
 ('E07', (('Tejas', '50000'), 'Tanvir')),
 ('E01', (('Lokesh', '50000'), 'Vishnu')),
 ('E05', (('Dinesh', '50000'), 'John'))]

In [15]:
# align data by formatting columns of tuple using map
finalData = joinedRDD.map(lambda x: (x[0], x[1][0][0], x[1][0][1], x[1][1]))
finalData.take(5)

[('E09', 'Kumar', '10000', 'Vinod'),
 ('E04', 'Ratan', '45000', 'Sundar'),
 ('E07', 'Tejas', '50000', 'Tanvir'),
 ('E01', 'Lokesh', '50000', 'Vishnu'),
 ('E05', 'Dinesh', '50000', 'John')]

In [0]:
# you may sort the data as well by any column, just provide the column index in lambda
# Due to dependency issue, I got Py4JJavaError: org.apache.spark.api.python.PythonRDD.collectAndServe
# This should resolve if using correct java version
sortedRDD = joinedRDD.sortBy(lambda x: x[0])
sortedRDD.take(5)

In [0]:
finalData.saveAsTextFile('/content/result.csv')

In [19]:
! cat /content/result.csv/part-0000*

('E09', 'Kumar', '10000', 'Vinod')
('E04', 'Ratan', '45000', 'Sundar')
('E07', 'Tejas', '50000', 'Tanvir')
('E01', 'Lokesh', '50000', 'Vishnu')
('E05', 'Dinesh', '50000', 'John')
('E06', 'Pavan', '45000', 'Pallavi')
('E08', 'Sheela', '10000', 'Shekhar')
('E10', 'Venkat', '10000', 'Jitendra')
('E03', 'Amit', '45000', 'Shiv')
('E02', 'Bhupesh', '50000', 'Satyam')
