# Group By Example using RDD

In this notebook we would look at creating RDD using a text file which contains customer data and then we will find the customer counts based on cities,

RDD's are building blocks for Spark and we shall export a few methods to handle data analysis using them.

# Setup the Spark Context
## Set the Java path

In [1]:
# SPARK works with only java 8 so specify the java8 path
import os
os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_202'

## Setup SparkContext

In [2]:
# SparkContext will be the starting point for any execution
from pyspark import SparkContext, SparkConf

#Specify the host ip of for spark 
conf = SparkConf().set('spark.driver.host', '127.0.0.1')

# Specify the SparkContext based on above configuration
sc = SparkContext(master='local', appName='myAppName', conf=conf)

# Get Input data

Usually this would be from HDFS, however for now it would be local machine path
and guess what spark can support txt, csv, JSON and many other data sources

In [3]:
# Read the csv file and load it into an RDD with name input
input=sc.textFile('group_by_test.txt')

## Check the RDD

In [4]:
input

group_by_test.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

The RDD is an object and not a pandas dataframe, hence we would need special functions to view data in it.

In [5]:
# View top 3 records
input.top(3)

['Cust_9999997,,India,east,Jharkand,Ranchi,2020-01-10 15:10:38.562000',
 'Cust_9999995,,India,south,tamil nadu,Chennai,2020-01-10 15:10:38.562000',
 'Cust_9999964,,India,west,Maharashtra,Mumbai,2020-01-10 15:10:38.562000']

In [6]:
# View random 5 records
input.take(5)

['Cust_2337980,,India,south,karnataka,Bangalore,2020-01-10 15:10:38.562000',
 'Cust_5132227,,India,north,Haryana,Rohtak,2020-01-10 15:10:38.562000',
 'Cust_3758463,,India,west,Gujarat,Ahmedabad,2020-01-10 15:10:38.562000',
 'Cust_5782791,,India,north,UttarPradesh,Kanpur,2020-01-10 15:10:38.562000',
 'Cust_6471658,,India,north,Haryana,Rohtak,2020-01-10 15:10:38.562000']

# Bring out City value

RDD's are immutable, which means, once there is a value in RDD, it cannot be overridden.

Hence for any transformations, it would create a new RDD

In [7]:
input.map(lambda x:x.split(',')[5])

PythonRDD[4] at RDD at PythonRDD.scala:53

The above line says PythonRDD[4] as this would be the 4th transformation on the input RDD from intiation

In [8]:
# Make city a seperate RDD
city=input.map(lambda x:x.split(',')[5])

In [9]:
city.top(4)

['Vadodara', 'Vadodara', 'Vadodara', 'Vadodara']

# Check cities and their counts

In [10]:
counts=city.countByValue()

In [11]:
counts

defaultdict(int,
            {'Bangalore': 6153,
             'Rohtak': 6403,
             'Ahmedabad': 6216,
             'Kanpur': 6323,
             'Ranchi': 6250,
             'Mysore': 6255,
             'Lucknow': 6258,
             'Vadodara': 6307,
             'Chennai': 6127,
             'Pune': 6327,
             'Madhurai': 6271,
             'Gurgaon': 6205,
             'Jamshedpur': 6266,
             'Darjeeling': 6285,
             'Kolkata': 6204,
             'Mumbai': 6150})

# Stop the SPARK Context

In [12]:
sc.stop()

# Summary

1. SparkContext will be the starting point
2. We can read data from multiple data sources
3. RDD's are immutable and hence we will get a new RDD each time a transformation is ran
4. 'top' is a function to check first rows, 'take' is similar to sample in pandas
5. we can write 'map' and 'lambda' similar to pandas dataframe to work on all elements
6. 'countByValue' is similar to value_counts() which will give the counts for each value.
7. SparkContext once started should be stopped at end.