In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = (
SparkSession.builder.appName("RDDApp")
    .master("local[4]")
    .getOrCreate()
)

23/08/06 11:51:19 WARN Utils: Your hostname, nishi-ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.1.40 instead (on interface enp0s31f6)
23/08/06 11:51:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/06 11:51:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

In [3]:
# Create a variabel for SparkContext
sc = spark.sparkContext

In [4]:
# Create RDD using Parallelize
numbresRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])

In [5]:
# Check RDD partitions
numbresRDD.getNumPartitions()

4

In [6]:
# Get Results from RDD
output = numbresRDD.collect()
print(output)

[1, 2, 3, 4, 5, 6, 7, 8, 9]


In [7]:
# get an array of 2 records
numbresRDD.take(2)

                                                                                

[1, 2]

In [8]:
# get first record of RDD
# first() return and element while take(1) return and array with one lement
numbresRDD.first()

1

In [9]:
# Create RDD with Complex Type
employeeRDD = sc.parallelize([
    [1, "Neha", 10000],
    [2, "Steve", 20000],
    [3, "Kari", 30000],
    [4, "Ivan", 40000],
    [5, "Mohit", 50000],
])

In [10]:
# Get the first element of the RDD
employeeRDD.first()

[1, 'Neha', 10000]

In [11]:
# Get array of first two element
employeeRDD.take(2)

[[1, 'Neha', 10000], [2, 'Steve', 20000]]

In [12]:
# Read Taxi Zone data 
taxiZoneDataRDD = sc.textFile("/home/nishi/PycharmProjects/pythonProject/pySpark/DataFiles/TaxiZones.csv")

In [13]:
# Get first 10 records
taxiZoneDataRDD.take(10)

['1,EWR,Newark Airport,EWR',
 '2,Queens,Jamaica Bay,Boro Zone',
 '3,Bronx,Allerton/Pelham Gardens,Boro Zone',
 '4,Manhattan,Alphabet City,Yellow Zone',
 '5,Staten Island,Arden Heights,Boro Zone',
 '6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone',
 '7,Queens,Astoria,Boro Zone',
 '8,Queens,Astoria Park,Boro Zone',
 '9,Queens,Auburndale,Boro Zone',
 '10,Queens,Baisley Park,Boro Zone']

In [14]:
taxiZoneDataRDD.getNumPartitions()

2

In [15]:
# Read Taxi Zone data with 4 partitions
taxiZoneDataRDD = sc.textFile("/home/nishi/PycharmProjects/pythonProject/pySpark/DataFiles/TaxiZones.csv", 4)
taxiZoneDataRDD.getNumPartitions()

4

In [16]:
# Creating one RD from another RDD using lambda function
taxiZoneWithColsRDD = (taxiZoneDataRDD.map( lambda zone: zone.split(",")))
taxiZoneWithColsRDD.take(5)

[['1', 'EWR', 'Newark Airport', 'EWR'],
 ['2', 'Queens', 'Jamaica Bay', 'Boro Zone'],
 ['3', 'Bronx', 'Allerton/Pelham Gardens', 'Boro Zone'],
 ['4', 'Manhattan', 'Alphabet City', 'Yellow Zone'],
 ['5', 'Staten Island', 'Arden Heights', 'Boro Zone']]

In [17]:
# Apply filter Operation
# Get data where Borough is 'Manhattan' and zone begin with 'Central'

filterZoneRDD = (
taxiZoneDataRDD.map(lambda zone: zone.split(","))
    .filter(
lambda zoneRow: zoneRow[1] == "Manhattan"
    and zoneRow[2].lower().startswith("central")
)
)
filterZoneRDD.take(5)

[['41', 'Manhattan', 'Central Harlem', 'Boro Zone'],
 ['42', 'Manhattan', 'Central Harlem North', 'Boro Zone'],
 ['43', 'Manhattan', 'Central Park', 'Yellow Zone']]

In [18]:
# List of Zone with even location ID
evenLocationRDD = (
taxiZoneWithColsRDD.filter(
lambda locationFilter: int(locationFilter[0])%2==0
)
)
evenLocationRDD.take(5)

[['2', 'Queens', 'Jamaica Bay', 'Boro Zone'],
 ['4', 'Manhattan', 'Alphabet City', 'Yellow Zone'],
 ['6', 'Staten Island', 'Arrochar/Fort Wadsworth', 'Boro Zone'],
 ['8', 'Queens', 'Astoria Park', 'Boro Zone'],
 ['10', 'Queens', 'Baisley Park', 'Boro Zone']]

In [19]:
evenLocationRDD.collect()

[['2', 'Queens', 'Jamaica Bay', 'Boro Zone'],
 ['4', 'Manhattan', 'Alphabet City', 'Yellow Zone'],
 ['6', 'Staten Island', 'Arrochar/Fort Wadsworth', 'Boro Zone'],
 ['8', 'Queens', 'Astoria Park', 'Boro Zone'],
 ['10', 'Queens', 'Baisley Park', 'Boro Zone'],
 ['12', 'Manhattan', 'Battery Park', 'Yellow Zone'],
 ['14', 'Brooklyn', 'Bay Ridge', 'Boro Zone'],
 ['16', 'Queens', 'Bayside', 'Boro Zone'],
 ['18', 'Bronx', 'Bedford Park', 'Boro Zone'],
 ['20', 'Bronx', 'Belmont', 'Boro Zone'],
 ['22', 'Brooklyn', 'Bensonhurst West', 'Boro Zone'],
 ['24', 'Manhattan', 'Bloomingdale', 'Yellow Zone'],
 ['26', 'Brooklyn', 'Borough Park', 'Boro Zone'],
 ['28', 'Queens', 'Briarwood/Jamaica Hills', 'Boro Zone'],
 ['30', 'Queens', 'Broad Channel', 'Boro Zone'],
 ['32', 'Bronx', 'Bronxdale', 'Boro Zone'],
 ['34', 'Brooklyn', 'Brooklyn Navy Yard', 'Boro Zone'],
 ['36', 'Brooklyn', 'Bushwick North', 'Boro Zone'],
 ['38', 'Queens', 'Cambria Heights', 'Boro Zone'],
 ['40', 'Brooklyn', 'Carroll Gardens', 'B

In [20]:
# Pair RDD
import math
numbersRDD = sc.parallelize([2,3,4,5,6])
##Create Pair RDD
numberWithSquareRootRDD = (
numbersRDD.map(
lambda num:(num, math.sqrt(num)))
)
numberWithSquareRootRDD.collect()

[(2, 1.4142135623730951),
 (3, 1.7320508075688772),
 (4, 2.0),
 (5, 2.23606797749979),
 (6, 2.449489742783178)]

In [21]:
# Create Pair RDD (locationId as key, Value as 1)

taxoZonesPairRdd = (
taxiZoneWithColsRDD.map(lambda zoneRow: (zoneRow[1] # Key - Borough
                                         ,1         # Value - 1
                                        ))
)
taxoZonesPairRdd.take(10)

[('EWR', 1),
 ('Queens', 1),
 ('Bronx', 1),
 ('Manhattan', 1),
 ('Staten Island', 1),
 ('Staten Island', 1),
 ('Queens', 1),
 ('Queens', 1),
 ('Queens', 1),
 ('Queens', 1)]

In [22]:
# Calculate count of records for each Borough
boroughCountRDD = (
     taxoZonesPairRdd.reduceByKey(lambda value1, value2: value1 + value2)
 )
boroughCountRDD.collect()

                                                                                

[('Bronx', 43),
 ('Staten Island', 20),
 ('EWR', 1),
 ('Manhattan', 69),
 ('Brooklyn', 61),
 ('Unknown', 2),
 ('Queens', 69)]