### Demonstrate various methods to perform joins in Spark ###

**1. Perform join using transformation on RDD**<br>
**2. Perform join using Spark SQL on data frames**<br>
**3. Perform join using SQL on temporary tables**<br>
**4. Perform join using SQL on tables**<br>

In [105]:
# Import package(s).
import findspark
findspark.init()

from pyspark import SparkContext
from pyspark.sql import SQLContext

import os, json

try:
    sc
    sc.stop()
except NameError:
    pass
finally:
    print("Spinning up Spark cluster ...")
    sc = SparkContext(appName = 'JoinsDemo', master = 'local[8]')
    spark = SQLContext(sc)

sc

Spinning up Spark cluster ...


Get working directory. Get configurations from configuration file. Use configurations to set location(s) of data file(s).

In [106]:
# Get current working directory.
currentWorkingDirectory = os.getcwd()

# Load configuration file.
with open(currentWorkingDirectory + '\configuration.json', 'r') as configurationFile:
    dictConfigurations = json.load(configurationFile)

# Get path part for data file(s) from configuration file.
zipDataPathPart = dictConfigurations['zipDataLocation']

# Get current working directory's parent.
currentWorkingDirectoryParent = os.path.dirname(currentWorkingDirectory)

# Get full path for data file(s).
zipDataPath = os.path.abspath(os.path.join(currentWorkingDirectoryParent, zipDataPathPart))

Import and preview zip code data.

In [107]:
# Import zip data file.
rddZip = sc.textFile(zipDataPath)

# Display line count.
print('rddZip line count:', rddZip.count())

# Display first 10 lines.
rddZip.take(10)

rddZip line count: 81832


['"RecordNumber","Zipcode","ZipCodeType","City","State","LocationType","Lat","Long","Xaxis","Yaxis","Zaxis","WorldRegion","Country","LocationText","Location","Decommisioned","TaxReturnsFiled","EstimatedPopulation","TotalWages","Notes"',
 '1,"00704","STANDARD","PARC PARQUE","PR","NOT ACCEPTABLE",17.96,-66.22,0.38,-0.87,0.30,"NA","US","Parc Parque, PR","NA-US-PR-PARC PARQUE","false",,,,',
 '2,"00704","STANDARD","PASEO COSTA DEL SUR","PR","NOT ACCEPTABLE",17.96,-66.22,0.38,-0.87,0.30,"NA","US","Paseo Costa Del Sur, PR","NA-US-PR-PASEO COSTA DEL SUR","false",,,,',
 '3,"00704","STANDARD","SECT LANAUSSE","PR","NOT ACCEPTABLE",17.96,-66.22,0.38,-0.87,0.30,"NA","US","Sect Lanausse, PR","NA-US-PR-SECT LANAUSSE","false",,,,',
 '4,"00704","STANDARD","URB EUGENE RICE","PR","NOT ACCEPTABLE",17.96,-66.22,0.38,-0.87,0.30,"NA","US","Urb Eugene Rice, PR","NA-US-PR-URB EUGENE RICE","false",,,,',
 '5,"00704","STANDARD","URB GONZALEZ","PR","NOT ACCEPTABLE",17.96,-66.22,0.38,-0.87,0.30,"NA","US","Urb Gonza

For the purposes of demonstrating joins between two datasets, vertically split rddZip into two RDDs:

1. **rddZipCodeCity:** ZipCode, City
2. **rddZipCodeState:** ZipCode, State

Create rddZipCodeCity.

In [108]:
# Create function to parse out Zip Code & City from rddZip.
def parseOutZipCodeCity(line):
    fields = line.split(',')
    zipCode = fields[1].replace('"', '')
    city = fields[3].replace('"', '')
    
    return(zipCode, city)
    
# Test function on single line.
parseOutZipCodeCity('1,"00704","STANDARD","PARC PARQUE","PR","NOT ACCEPTABLE",17.96,-66.22,0.38,-0.87,0.30,"NA","US","Parc Parque, PR","NA-US-PR-PARC PARQUE","false",,,,')


('00704', 'PARC PARQUE')

In [109]:
# Create rddZipCodeCity RDD.
rddZipCodeCity = rddZip.map(parseOutZipCodeCity)

# Filter out header line.
rddZipCodeCity = rddZipCodeCity.filter(lambda x: x != ('Zipcode', 'City'))

# Display first 10 lines.
rddZipCodeCity.take(10)

[('00704', 'PARC PARQUE'),
 ('00704', 'PASEO COSTA DEL SUR'),
 ('00704', 'SECT LANAUSSE'),
 ('00704', 'URB EUGENE RICE'),
 ('00704', 'URB GONZALEZ'),
 ('00704', 'URB LA FABRICA'),
 ('00704', 'URB MONTE SORIA 2'),
 ('00704', 'VILLAS DEL COQUI'),
 ('00705', 'AIBONITO'),
 ('00705', 'BDA SAN LUIS')]

Create rddZipCodeState.

In [110]:
# Create function to parse out Zip Code & State from rddZip.
def parseOutZipCodeState(line):
    fields = line.split(',')
    zipCode = fields[1].replace('"', '')
    state = fields[4].replace('"', '')
    
    return(zipCode, state)
    
# Test function on single line.
parseOutZipCodeState('1,"00704","STANDARD","PARC PARQUE","PR","NOT ACCEPTABLE",17.96,-66.22,0.38,-0.87,0.30,"NA","US","Parc Parque, PR","NA-US-PR-PARC PARQUE","false",,,,')


('00704', 'PR')

In [111]:
# Create rddZipCodeState RDD. Use distinct values.
rddZipCodeState = rddZip.map(parseOutZipCodeState).distinct()

# Filter out header line.
rddZipCodeState = rddZipCodeState.filter(lambda x: x != ('Zipcode', 'State'))

# Display first 10 lines.
rddZipCodeState.take(10)

[('09790', 'AE'),
 ('65079', 'MO'),
 ('57709', 'SD'),
 ('27321', 'NC'),
 ('00678', 'PR'),
 ('75001', 'TX'),
 ('08219', 'NJ'),
 ('40252', 'KY'),
 ('03303', 'NH'),
 ('74047', 'OK')]

### Perform join using transformation on RDD ###

Since *Zip Code* is the key in both rddZipCodeCity and rddZipCodState, use *Zip Code* as join key.

In [112]:
rddZipCodeCityState = rddZipCodeCity.join(rddZipCodeState)

# Display first 10 lines.
rddZipCodeCityState.take(10)

[('67056', ('HALSTEAD', 'KS')),
 ('11973', ('UPTON', 'NY')),
 ('98117', ('SEATTLE', 'WA')),
 ('98117', ('CROWN HILL', 'WA')),
 ('46407', ('GARY', 'IN')),
 ('12451', ('LEEDS', 'NY')),
 ('20656', ('LOVEVILLE', 'MD')),
 ('17365', ('WELLSVILLE', 'PA')),
 ('75860', ('TEAGUE', 'TX')),
 ('75860', ('CLAY HILL', 'TX'))]

### Perform join using Spark SQL on data frames ###

Convert rddZipCodeCity and rddZipCodeState to data frames.

In [113]:
# Create dfZipCodeCity data frame.
dfZipCodeCity = rddZipCodeCity.toDF(['ZipCode', 'City'])

# Display first 10 lines.
dfZipCodeCity.show(10)

+-------+-------------------+
|ZipCode|               City|
+-------+-------------------+
|  00704|        PARC PARQUE|
|  00704|PASEO COSTA DEL SUR|
|  00704|      SECT LANAUSSE|
|  00704|    URB EUGENE RICE|
|  00704|       URB GONZALEZ|
|  00704|     URB LA FABRICA|
|  00704|  URB MONTE SORIA 2|
|  00704|   VILLAS DEL COQUI|
|  00705|           AIBONITO|
|  00705|       BDA SAN LUIS|
+-------+-------------------+
only showing top 10 rows



In [114]:
# Create dfZipCodeState data frame.
dfZipCodeState = rddZipCodeState.toDF(['ZipCode', 'State'])

# Display first 10 lines.
dfZipCodeState.show(10)

+-------+-----+
|ZipCode|State|
+-------+-----+
|  09790|   AE|
|  65079|   MO|
|  57709|   SD|
|  27321|   NC|
|  00678|   PR|
|  75001|   TX|
|  08219|   NJ|
|  40252|   KY|
|  03303|   NH|
|  74047|   OK|
+-------+-----+
only showing top 10 rows



Perform inner join of dfZipCodeCity and dfZipCodeState using *ZipCode* field.

In [115]:
dfZipCodeCityState = dfZipCodeCity.join(dfZipCodeState, dfZipCodeCity['ZipCode'] == dfZipCodeState['ZipCode'])

# Display first 20 lines.
dfZipCodeCityState.show()

+-------+--------------------+-------+-----+
|ZipCode|                City|ZipCode|State|
+-------+--------------------+-------+-----+
|  02053|              MEDWAY|  02053|   MA|
|  02070|        SHELDONVILLE|  02070|   MA|
|  02070|            WRENTHAM|  02070|   MA|
|  02090|            WESTWOOD|  02090|   MA|
|  02090|           ISLINGTON|  02090|   MA|
|  03442|          BENNINGTON|  03442|   NH|
|  03904|             KITTERY|  03904|   ME|
|  04438|           FRANKFORT|  04438|   ME|
|  06382|          UNCASVILLE|  06382|   CT|
|  06518|              HAMDEN|  06518|   CT|
|  06518|           NEW HAVEN|  06518|   CT|
|  06518|CENTERVILLE-MOUNT...|  06518|   CT|
|  06518|        MOUNT CARMEL|  06518|   CT|
|  06518|             N HAVEN|  06518|   CT|
|  07820|           ALLAMUCHY|  07820|   NJ|
|  08648|   LAWRENCE TOWNSHIP|  08648|   NJ|
|  08648|            LAWRENCE|  08648|   NJ|
|  08648|        LAWRENCE TWP|  08648|   NJ|
|  08648|       LAWRENCEVILLE|  08648|   NJ|
|  08648| 

The join above returns *ZipCode* twice, since *ZipCode* exists in each dataframe.

The join method below returns *ZipCode* only once.

In [116]:
dfZipCodeCityState2 = dfZipCodeCity.join(dfZipCodeState, ['ZipCode'])

# Display first 10 lines.
dfZipCodeCityState2.show()

+-------+--------------------+-----+
|ZipCode|                City|State|
+-------+--------------------+-----+
|  02053|              MEDWAY|   MA|
|  02070|        SHELDONVILLE|   MA|
|  02070|            WRENTHAM|   MA|
|  02090|            WESTWOOD|   MA|
|  02090|           ISLINGTON|   MA|
|  03442|          BENNINGTON|   NH|
|  03904|             KITTERY|   ME|
|  04438|           FRANKFORT|   ME|
|  06382|          UNCASVILLE|   CT|
|  06518|              HAMDEN|   CT|
|  06518|           NEW HAVEN|   CT|
|  06518|CENTERVILLE-MOUNT...|   CT|
|  06518|        MOUNT CARMEL|   CT|
|  06518|             N HAVEN|   CT|
|  07820|           ALLAMUCHY|   NJ|
|  08648|   LAWRENCE TOWNSHIP|   NJ|
|  08648|            LAWRENCE|   NJ|
|  08648|        LAWRENCE TWP|   NJ|
|  08648|       LAWRENCEVILLE|   NJ|
|  08648|             TRENTON|   NJ|
+-------+--------------------+-----+
only showing top 20 rows



### Perform join using SQL on temporary tables ###

Create temporary tables from dfZipCodeCity and dfZipCodeState.

In [117]:
# Create temporary table from dfZipCodeCity data frame.
dfZipCodeCity.registerTempTable('tblZipCodeCity')

# Display first 20 lines.
dfZipCodeCityFromTempTable = spark.sql('select * from tblZipCodeCity')
dfZipCodeCityFromTempTable.show()

+-------+--------------------+
|ZipCode|                City|
+-------+--------------------+
|  00704|         PARC PARQUE|
|  00704| PASEO COSTA DEL SUR|
|  00704|       SECT LANAUSSE|
|  00704|     URB EUGENE RICE|
|  00704|        URB GONZALEZ|
|  00704|      URB LA FABRICA|
|  00704|   URB MONTE SORIA 2|
|  00704|    VILLAS DEL COQUI|
|  00705|            AIBONITO|
|  00705|        BDA SAN LUIS|
|  00705|           BO LLANOS|
|  00705|  BRISAS DE AIBONITO|
|  00705|COLINAS DE SAN FR...|
|  00705|       EST DEL LLANO|
|  00705|     EXT BELLA VISTA|
|  00705|        EXT SAN LUIS|
|  00705|   EXT VILLA ROSALES|
|  00705|        REPTO ROBLES|
|  00705|     URB BELLA VISTA|
|  00705|     URB BUENA VISTA|
+-------+--------------------+
only showing top 20 rows



In [118]:
# Create temporary table from dfZipCodeState data frame.
dfZipCodeState.registerTempTable('tblZipCodeState')

dfZipCodeStateFromTempTable = spark.sql('select * from tblZipCodeState')

# Display first 20 lines.
dfZipCodeStateFromTempTable.show()

+-------+-----+
|ZipCode|State|
+-------+-----+
|  09790|   AE|
|  65079|   MO|
|  57709|   SD|
|  27321|   NC|
|  00678|   PR|
|  75001|   TX|
|  08219|   NJ|
|  40252|   KY|
|  03303|   NH|
|  74047|   OK|
|  48091|   MI|
|  55379|   MN|
|  45821|   OH|
|  12803|   NY|
|  16257|   PA|
|  15675|   PA|
|  21521|   MD|
|  04645|   ME|
|  67880|   KS|
|  22850|   VA|
+-------+-----+
only showing top 20 rows



Perform inner join of tblZipCodeCity and tblZipCodeState using *ZipCode* field.

In [119]:
dfZipCodeCityStateFromTempTables = spark.sql('select zcc.ZipCode, zcc.City, zcs.State from tblZipCodeCity zcc \
                                                inner join tblZipCodeState zcs \
                                                on zcc.ZipCode = zcs.ZipCode')

# Display first 20 lines.
dfZipCodeCityStateFromTempTables.show()

+-------+--------------------+-----+
|ZipCode|                City|State|
+-------+--------------------+-----+
|  02053|              MEDWAY|   MA|
|  02070|        SHELDONVILLE|   MA|
|  02070|            WRENTHAM|   MA|
|  02090|            WESTWOOD|   MA|
|  02090|           ISLINGTON|   MA|
|  03442|          BENNINGTON|   NH|
|  03904|             KITTERY|   ME|
|  04438|           FRANKFORT|   ME|
|  06382|          UNCASVILLE|   CT|
|  06518|              HAMDEN|   CT|
|  06518|           NEW HAVEN|   CT|
|  06518|CENTERVILLE-MOUNT...|   CT|
|  06518|        MOUNT CARMEL|   CT|
|  06518|             N HAVEN|   CT|
|  07820|           ALLAMUCHY|   NJ|
|  08648|   LAWRENCE TOWNSHIP|   NJ|
|  08648|            LAWRENCE|   NJ|
|  08648|        LAWRENCE TWP|   NJ|
|  08648|       LAWRENCEVILLE|   NJ|
|  08648|             TRENTON|   NJ|
+-------+--------------------+-----+
only showing top 20 rows



### Perform join using SQL on tables ###

Create tables from dfZipCodeCity and dfZipCodeState.

In [123]:
# Create table from dfZipCodeCity data frame.
dfZipCodeCity.write.mode('overwrite').saveAsTable('tblZipCodeCity2')

# Display first 20 lines.
dfZipCodeCityFromTable = spark.sql('select * from tblZipCodeCity2')
dfZipCodeCityFromTable.show()

+-------+--------------------+
|ZipCode|                City|
+-------+--------------------+
|  00704|         PARC PARQUE|
|  00704| PASEO COSTA DEL SUR|
|  00704|       SECT LANAUSSE|
|  00704|     URB EUGENE RICE|
|  00704|        URB GONZALEZ|
|  00704|      URB LA FABRICA|
|  00704|   URB MONTE SORIA 2|
|  00704|    VILLAS DEL COQUI|
|  00705|            AIBONITO|
|  00705|        BDA SAN LUIS|
|  00705|           BO LLANOS|
|  00705|  BRISAS DE AIBONITO|
|  00705|COLINAS DE SAN FR...|
|  00705|       EST DEL LLANO|
|  00705|     EXT BELLA VISTA|
|  00705|        EXT SAN LUIS|
|  00705|   EXT VILLA ROSALES|
|  00705|        REPTO ROBLES|
|  00705|     URB BELLA VISTA|
|  00705|     URB BUENA VISTA|
+-------+--------------------+
only showing top 20 rows



In [124]:
# Create table from dfZipCodeState data frame.
dfZipCodeState.write.mode('overwrite').saveAsTable('tblZipCodeState2')

# Display first 20 lines.
dfZipCodeStateFromTable = spark.sql('select * from tblZipCodeState2')
dfZipCodeStateFromTable.show()

+-------+-----+
|ZipCode|State|
+-------+-----+
|  05829|   VT|
|  59903|   MT|
|  59535|   MT|
|  14470|   NY|
|  36576|   AL|
|  33183|   FL|
|  46721|   IN|
|  76487|   TX|
|  28656|   NC|
|  85392|   AZ|
|  71266|   LA|
|  23112|   VA|
|  16853|   PA|
|  04217|   ME|
|  49427|   MI|
|  07605|   NJ|
|  03071|   NH|
|  76934|   TX|
|  54152|   WI|
|  80721|   CO|
+-------+-----+
only showing top 20 rows



Perform inner join of tblZipCodeCity2 and tblZipCodeState2 using *ZipCode* field.

In [122]:
dfZipCodeCityStateFromTables = spark.sql('select zcc2.ZipCode, zcc2.City, zcs2.State from tblZipCodeCity2 zcc2 \
                                                inner join tblZipCodeState2 zcs2 \
                                                on zcc2.ZipCode = zcs2.ZipCode')

# Display first 20 lines.
dfZipCodeCityStateFromTables.show()

+-------+--------------------+-----+
|ZipCode|                City|State|
+-------+--------------------+-----+
|  00704|         PARC PARQUE|   PR|
|  00704| PASEO COSTA DEL SUR|   PR|
|  00704|       SECT LANAUSSE|   PR|
|  00704|     URB EUGENE RICE|   PR|
|  00704|        URB GONZALEZ|   PR|
|  00704|      URB LA FABRICA|   PR|
|  00704|   URB MONTE SORIA 2|   PR|
|  00704|    VILLAS DEL COQUI|   PR|
|  00705|            AIBONITO|   PR|
|  00705|        BDA SAN LUIS|   PR|
|  00705|           BO LLANOS|   PR|
|  00705|  BRISAS DE AIBONITO|   PR|
|  00705|COLINAS DE SAN FR...|   PR|
|  00705|       EST DEL LLANO|   PR|
|  00705|     EXT BELLA VISTA|   PR|
|  00705|        EXT SAN LUIS|   PR|
|  00705|   EXT VILLA ROSALES|   PR|
|  00705|        REPTO ROBLES|   PR|
|  00705|     URB BELLA VISTA|   PR|
|  00705|     URB BUENA VISTA|   PR|
+-------+--------------------+-----+
only showing top 20 rows

