<a href="https://colab.research.google.com/github/vaddanki001/Spark-Mini-Project/blob/master/Spark_Mini_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### **Hadoop Installation Part**

Hadoop is a Java-based programming framework that supports the processing and storage of extremely large datasets on a cluster of inexpensive machines. It was the first major open source project in the big data playing field and is sponsored by the Apache Software Foundation.

### **Pre Installation Steps (Clone from GIT)**

In [1]:
!rm -R Spark-Mini-Project
!rm spark-3.0.2-bin-hadoop2.7-hive1.2.tgz

rm: cannot remove 'Spark-Mini-Project': No such file or directory
rm: cannot remove 'spark-3.0.2-bin-hadoop2.7-hive1.2.tgz': No such file or directory


In [2]:
!git clone https://github.com/vaddanki001/Spark-Mini-Project

Cloning into 'Spark-Mini-Project'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (4/4), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (6/6), done.


### **Step 1:Installing Hadoop and Spark**



In [3]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop2.7-hive1.2.tgz

In [4]:
!tar xf spark-3.0.2-bin-hadoop2.7-hive1.2.tgz

!pip install -q findspark

!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/27/67/5158f846202d7f012d1c9ca21c3549a58fd3c6707ae8ee823adcaca6473c/pyspark-3.0.2.tar.gz (204.8MB)
[K     |████████████████████████████████| 204.8MB 73kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 20.1MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186687 sha256=917f883b0f8250495b90f8a3b592d9f2b1dc15a51fbac9ea1f2cc9137164a4a4
  Stored in directory: /root/.cache/pip/wheels/8b/09/da/c1f2859bcc86375dc972c5b6af4881b3603269bcc4c9be5d16
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.2


### **Step2: install JDK**
Hadoop/Spark requires that you set the path to Java, either as an environment variable or in the Hadoop configuration file.

In [5]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#To find the default Java path
!readlink -f /usr/bin/java | sed "s:bin/java::"

/usr/lib/jvm/java-11-openjdk-amd64/


### Step 3: Setting **Java and Spark Home**

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop2.7-hive1.2"

In [7]:
!echo $SPARK_HOME

/content/spark-3.0.2-bin-hadoop2.7-hive1.2


### Step 4: **Initiate Spark Session**

In [8]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

### Step 5: **Check Datafile exists**

In [9]:
!cat /content/Spark-Mini-Project/data.csv

1,I,VXIO456XLBB630221,Nissan,Altima,2003,2002-05-08,Initial sales from TechMotors
2,I,INU45KIOOPA343980,Mercedes,C300,2015,2014-01-01,Sold from EuroMotors
3,A,VXIO456XLBB630221,,,,2014-07-02,Head on collision
4,R,VXIO456XLBB630221,,,,2014-08-05,Repair transmission
5,I,VOME254OOXW344325,Mercedes,E350,2015,2014-02-01,Sold from Carmax
6,R,VOME254OOXW344325,,,,2015-02-06,Wheel allignment service
7,R,VXIO456XLBB630221,,,,2015-01-01,Replace right head light
8,I,EXOA00341AB123456,Mercedes,SL550,2016,2015-01-01,Sold from AceCars
9,A,VOME254OOXW344325,,,,2015-10-01,Side collision
10,R,VOME254OOXW344325,,,,2015-09-01,Changed tires
11,R,EXOA00341AB123456,,,,2015-05-01,Repair engine
12,A,EXOA00341AB123456,,,,2015-05-03,Vehicle rollover
13,R,VOME254OOXW344325,,,,2015-09-01,Replace passenger side door
14,I,UXIA769ABCC447906,Toyota,Camery,2017,2016-05-08,Initial sales from Carmax
15,R,UXIA769ABCC447906,,,,2020-01-02,Initial sales from Carmax
16,A,INU45KIOOPA343980,,,,2020-05-01,Side collision


### Step 6: **Read the Datafile**

In [10]:
raw_rdd = sc.textFile("/content/Spark-Mini-Project/data.csv")

### Step 7: **Split the row delimited by ,**

In [160]:
records_rdd = raw_rdd.map(lambda x : x.split(","))

records_rdd.first()

['1',
 'I',
 'VXIO456XLBB630221',
 'Nissan',
 'Altima',
 '2003',
 '2002-05-08',
 'Initial sales from TechMotors']

### Step 8: **Map the records by vin number and sort**

In [162]:
sortedRDD = records_rdd.map(lambda line : (line[2], (line) ) ).sortByKey()

sortedRDD.take(5)

[('EXOA00341AB123456',
  ['8',
   'I',
   'EXOA00341AB123456',
   'Mercedes',
   'SL550',
   '2016',
   '2015-01-01',
   'Sold from AceCars']),
 ('EXOA00341AB123456',
  ['11', 'R', 'EXOA00341AB123456', '', '', '', '2015-05-01', 'Repair engine']),
 ('EXOA00341AB123456',
  ['12',
   'A',
   'EXOA00341AB123456',
   '',
   '',
   '',
   '2015-05-03',
   'Vehicle rollover']),
 ('INU45KIOOPA343980',
  ['2',
   'I',
   'INU45KIOOPA343980',
   'Mercedes',
   'C300',
   '2015',
   '2014-01-01',
   'Sold from EuroMotors']),
 ('INU45KIOOPA343980',
  ['16',
   'A',
   'INU45KIOOPA343980',
   '',
   '',
   '',
   '2020-05-01',
   'Side collision'])]

### Step 9: **Map the records by vin number and filter records with Initial sale**

In [161]:
vinMakeRDD = records_rdd.filter(lambda x: x[1] == 'I')
vinKV = vinMakeRDD.map(lambda line : (line[2], (line[3] ,line[4] ,line[5]) ) )

vinKV.take(5)


[('VXIO456XLBB630221', ('Nissan', 'Altima', '2003')),
 ('INU45KIOOPA343980', ('Mercedes', 'C300', '2015')),
 ('VOME254OOXW344325', ('Mercedes', 'E350', '2015')),
 ('EXOA00341AB123456', ('Mercedes', 'SL550', '2016')),
 ('UXIA769ABCC447906', ('Toyota', 'Camery', '2017'))]

### Step 10: **Join vinKV rdd and sorted rdd with vin as the key**

In [163]:
finalRDD = sortedRDD.join(vinKV)


replacedRDD = finalRDD.map(lambda x : [x[1][0][0],x[1][0][1],x[1][0][2],x[1][1][0],x[1][1][1],x[1][1][2],x[1][0][6],x[1][0][7] ])
# replacedRDD = finalRDD.map(lambda x : x[1][1][0] )
for i in replacedRDD.take(20): print(i)

['8', 'I', 'EXOA00341AB123456', 'Mercedes', 'SL550', '2016', '2015-01-01', 'Sold from AceCars']
['11', 'R', 'EXOA00341AB123456', 'Mercedes', 'SL550', '2016', '2015-05-01', 'Repair engine']
['12', 'A', 'EXOA00341AB123456', 'Mercedes', 'SL550', '2016', '2015-05-03', 'Vehicle rollover']
['14', 'I', 'UXIA769ABCC447906', 'Toyota', 'Camery', '2017', '2016-05-08', 'Initial sales from Carmax']
['15', 'R', 'UXIA769ABCC447906', 'Toyota', 'Camery', '2017', '2020-01-02', 'Initial sales from Carmax']
['5', 'I', 'VOME254OOXW344325', 'Mercedes', 'E350', '2015', '2014-02-01', 'Sold from Carmax']
['6', 'R', 'VOME254OOXW344325', 'Mercedes', 'E350', '2015', '2015-02-06', 'Wheel allignment service']
['9', 'A', 'VOME254OOXW344325', 'Mercedes', 'E350', '2015', '2015-10-01', 'Side collision']
['10', 'R', 'VOME254OOXW344325', 'Mercedes', 'E350', '2015', '2015-09-01', 'Changed tires']
['13', 'R', 'VOME254OOXW344325', 'Mercedes', 'E350', '2015', '2015-09-01', 'Replace passenger side door']
['2', 'I', 'INU45KIOO

### Step 11: **Map the records with make and year and reduce by make-year key**

In [171]:
finalRDD = replacedRDD.filter(lambda x: x[1] != 'I').map(lambda x: (x[3] +'-' +x[5],1)).reduceByKey(lambda x,y : x+y)

finalRDD.take(20)

[('Nissan-2003', 3),
 ('Toyota-2017', 1),
 ('Mercedes-2015', 5),
 ('Mercedes-2016', 2)]