#### Installing required packages

In [1]:

#!pip3 install sodapy
#!pip3 install pyspark
#!pip3 install pyspark psycopg2-binary

#### Import required packages in the notebook

In [2]:
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext,SparkSession

import pyspark.sql.functions as f
from pyspark.sql.types  import IntegerType, DecimalType

from sodapy import Socrata
import os


In [3]:
!export SPARK_CLASSPATH=/Users/sakshimehta/Documents/Installations/spark/jars/postgresql-42.6.0.jar

#### Setting the Spark app 

In [4]:
os.environ['SPARK_CLASSPATH'] = "/Users/sakshimehta/Documents/Installations/spark/jars/postgresql-42.6.0.jar"

print('\n Job Begins')

sc = SparkContext().getOrCreate()
config = sc.getConf()
config.set('spark.cores.max','4')
config.set('spark.executor.memory', '8G')
config.set('spark.driver.maxResultSize', '8g')
config.set('spark.kryoserializer.buffer.max', '512m')
config.set("spark.driver.cores", "4")

sc.stop()

sc = SparkContext(conf = config) 
sqlContext = SQLContext(sc)

spark = SparkSession.builder \
                    .appName('ETL-Pipeline') \
                    .config("spark.jars", "/Users/sakshimehta/Documents/Installations/spark/jars/postgresql-42.6.0.jar") \
                    .getOrCreate()




 Job Begins


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/28 00:40:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/04/28 00:40:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


#### Extract the Restaurant violations data from the NYC Open Data API using sodapy

In [5]:
# The Host Name for the API endpoint (the https:// part will be added automatically)
data_url='data.cityofnewyork.us'    

# The data set at the API endpoint (311 data in this case)
data_set='43nn-pn8j'                

# Create the client to point to the API endpoint
client = Socrata(data_url, None)

# The results are returned as JSON from API / converted to Python list of dictionaries by sodapy.
result = client.get("43nn-pn8j", select="COUNT(*)")

print("\n The count of the raw Restaurant Inspection dataset is:", result[0]['COUNT'])

# Using pagination to retrieve the whole raw dataset for the NYC Open Data API to load it into a dataframe
start = 0             
chunk_size = 10000     

results = []           

while True:
     
     results.extend( client.get(data_set, offset = start, limit = chunk_size))
     start = start + chunk_size
     if (start > int(result[0]['COUNT']) ):
        break

print("\n Total results retrieved are:", len(results))
print("\n Printing first two results:\n")
print(results[:2])

print("\n NYC open data API data extract completed")






 The count of the raw Restaurant Inspection dataset is: 202109

 Total results retrieved are: 202109

 Printing first two results:

[{'camis': '50122300', 'dba': 'JACK DIAMONDS', 'boro': 'Manhattan', 'building': '140', 'street': 'EAST   27 STREET', 'zipcode': '10016', 'phone': '6463988157', 'inspection_date': '1900-01-01T00:00:00.000', 'critical_flag': 'Not Applicable', 'record_date': '2023-04-27T06:00:14.000', 'latitude': '40.741848594155', 'longitude': '-73.982533732212', 'community_board': '106', 'council_district': '02', 'census_tract': '006800', 'bin': '1018150', 'bbl': '1008820059', 'nta': 'MN21'}, {'camis': '50123470', 'dba': 'Lydig Pizza Pizza Inc', 'boro': 'Manhattan', 'building': '84', 'street': 'HESTER STREET', 'zipcode': '10002', 'phone': '6466435434', 'inspection_date': '1900-01-01T00:00:00.000', 'critical_flag': 'Not Applicable', 'record_date': '2023-04-27T06:00:14.000', 'latitude': '40.716306876678', 'longitude': '-73.99230192632', 'community_board': '103', 'council_dis

#### Reading results (list of dictionaries) into Pyspark dataframe

In [6]:
dataDF = spark.createDataFrame(results)

dataDF.cache()

# display dataframe count
print("\n Count of total rows in the Spark dataframe:\n") 
dataDF.count()

print("\n Printing the first few rows of the dataDF dataframe:\n")
dataDF.show(10, truncate = False)

print("\n Printing the schema of the dataDF dataframe:\n")
dataDF.printSchema()

print("\n Intial spark df create completed")


23/04/28 00:41:04 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.



 Count of total rows in the Spark dataframe:



23/04/28 00:41:06 WARN TaskSetManager: Stage 0 contains a task of very large size (12101 KiB). The maximum recommended task size is 1000 KiB.
                                                                                


 Printing the first few rows of the dataDF dataframe:



23/04/28 00:41:14 WARN TaskSetManager: Stage 3 contains a task of very large size (12101 KiB). The maximum recommended task size is 1000 KiB.


+----------+-------+---------+--------+--------+------------+---------------+----------------+--------------+------------------------+-----------------------+---------------+----------------+----+----------+-----------------------+----------------+-------+------+-------------------+-----+----------+---------------+-----+--------------+---------------------+
|bbl       |bin    |boro     |building|camis   |census_tract|community_board|council_district|critical_flag |dba                     |inspection_date        |latitude       |longitude       |nta |phone     |record_date            |street          |zipcode|action|cuisine_description|grade|grade_date|inspection_type|score|violation_code|violation_description|
+----------+-------+---------+--------+--------+------------+---------------+----------------+--------------+------------------------+-----------------------+---------------+----------------+----+----------+-----------------------+----------------+-------+------+-----------------

### Data Cleaning Process: 

##### Step 1: Remove special characters from the phone number data

In [7]:
dataDF = dataDF.withColumn('phone', f.regexp_replace('phone', ' ', '')) \
       .withColumn('phone', f.regexp_replace('phone', '_', '')) \
       .withColumn('phone', f.regexp_replace('phone', '.', '') ) \
       .withColumn('phone', f.regexp_replace('phone', '__', '') )

##### Step 2: Store the boro_id instead of the complete Borough name in the boro attribute in postgresql tables

In [8]:
dataDF = dataDF.withColumn('boro_id', f.when(f.lower(dataDF.boro) == "manhattan", 1)
                                      .when(f.lower(dataDF.boro) == "bronx", 2)
                                      .when(f.lower(dataDF.boro) == "brooklyn", 3)
                                      .when(f.lower(dataDF.boro) == "queens", 4)
                                      .otherwise(5))

##### Step 3: Cast some of the data columns into their relevant data types

In [9]:
dataDF = dataDF.withColumn("camis", dataDF["camis"].cast(IntegerType())) \
                .withColumn("zipcode", dataDF["zipcode"].cast(IntegerType())) \
                .withColumn("phone", dataDF["phone"].cast(IntegerType())) \
                .withColumn("score", dataDF["score"].cast(IntegerType())) \
                .withColumn("latitude", dataDF["latitude"].cast(DecimalType(18, 14))) \
                .withColumn("longitude", dataDF["longitude"].cast(DecimalType(18, 14)))

dataDF.show(10, truncate = False)


23/04/28 00:41:50 WARN TaskSetManager: Stage 4 contains a task of very large size (12101 KiB). The maximum recommended task size is 1000 KiB.


+----------+-------+---------+--------+--------+------------+---------------+----------------+--------------+------------------------+-----------------------+-----------------+------------------+----+-----+-----------------------+----------------+-------+------+-------------------+-----+----------+---------------+-----+--------------+---------------------+-------+
|bbl       |bin    |boro     |building|camis   |census_tract|community_board|council_district|critical_flag |dba                     |inspection_date        |latitude         |longitude         |nta |phone|record_date            |street          |zipcode|action|cuisine_description|grade|grade_date|inspection_type|score|violation_code|violation_description|boro_id|
+----------+-------+---------+--------+--------+------------+---------------+----------------+--------------+------------------------+-----------------------+-----------------+------------------+----+-----+-----------------------+----------------+-------+------+----

In [10]:
dataDF = dataDF.withColumn("inspection_date", f.to_timestamp("inspection_date", "yyyy-MM-dd'T'HH:mm:ss.SSS")) \
                .withColumn("grade_date", f.to_timestamp("grade_date", "yyyy-MM-dd'T'HH:mm:ss.SSS")) \
                .withColumn("record_date", f.to_timestamp("record_date", "yyyy-MM-dd'T'HH:mm:ss.SSS"))

dataDF.show(10, truncate = False)


print("\n Data cleaning completed")


+----------+-------+---------+--------+--------+------------+---------------+----------------+--------------+------------------------+-------------------+-----------------+------------------+----+-----+-------------------+----------------+-------+------+-------------------+-----+----------+---------------+-----+--------------+---------------------+-------+
|bbl       |bin    |boro     |building|camis   |census_tract|community_board|council_district|critical_flag |dba                     |inspection_date    |latitude         |longitude         |nta |phone|record_date        |street          |zipcode|action|cuisine_description|grade|grade_date|inspection_type|score|violation_code|violation_description|boro_id|
+----------+-------+---------+--------+--------+------------+---------------+----------------+--------------+------------------------+-------------------+-----------------+------------------+----+-----+-------------------+----------------+-------+------+-------------------+-----+--

23/04/28 00:42:01 WARN TaskSetManager: Stage 5 contains a task of very large size (12101 KiB). The maximum recommended task size is 1000 KiB.


##### Step 4: Getting dataset ready for the restaurant_inspection_raw table

In [11]:
FinalDF_raw = dataDF.select(["camis", "dba", "boro", "building", "street", "zipcode", "phone", "cuisine_description", "inspection_date", "action", "violation_code", "violation_description", "critical_flag", "score", "grade" , "grade_date", "record_date", "inspection_type", "latitude", "longitude", "community_board", "council_district", "census_tract", "bin", "bbl", "nta"])

FinalDF_raw.cache()

FinalDF_raw.show(10, truncate = False)
FinalDF_raw.printSchema()

print("\n Number of rows in the FinalDF_raw dataframe:")
FinalDF_raw.count()

23/04/28 00:42:13 WARN TaskSetManager: Stage 6 contains a task of very large size (12101 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------+------------------------+---------+--------+----------------+-------+-----+-------------------+-------------------+------+--------------+---------------------+--------------+-----+-----+----------+-------------------+---------------+-----------------+------------------+---------------+----------------+------------+-------+----------+----+
|camis   |dba                     |boro     |building|street          |zipcode|phone|cuisine_description|inspection_date    |action|violation_code|violation_description|critical_flag |score|grade|grade_date|record_date        |inspection_type|latitude         |longitude         |community_board|council_district|census_tract|bin    |bbl       |nta |
+--------+------------------------+---------+--------+----------------+-------+-----+-------------------+-------------------+------+--------------+---------------------+--------------+-----+-----+----------+-------------------+---------------+-----------------+------------------+---------------+--

23/04/28 00:42:15 WARN TaskSetManager: Stage 7 contains a task of very large size (12101 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

202109

##### Step 5: Getting dataset ready for the restaurants table

In [12]:
FinalDF_restaurants = dataDF.select(["camis", "dba", "boro_id", "building", "street", "zipcode", "phone", "cuisine_description", "latitude", "longitude"]).distinct()

FinalDF_restaurants = FinalDF_restaurants.withColumnRenamed("boro_id","boro")

FinalDF_restaurants.cache()

FinalDF_restaurants.show(10, truncate = False)
FinalDF_restaurants.printSchema()

print("\n Number of rows in the FinalDF_restaurants dataframe:")
FinalDF_restaurants.count()


23/04/28 00:42:38 WARN TaskSetManager: Stage 10 contains a task of very large size (12101 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------+-----------------------+----+--------+--------------------------------+-------+-----+-------------------+-----------------+------------------+
|camis   |dba                    |boro|building|street                          |zipcode|phone|cuisine_description|latitude         |longitude         |
+--------+-----------------------+----+--------+--------------------------------+-------+-----+-------------------+-----------------+------------------+
|50106031|KAL                    |4   |3405    |30TH AVE                        |11103  |null |Korean             |40.76570338323900|-73.91883928211500|
|50056212|VIA ROMA PIZZA BAR     |3   |445     |COURT STREET                    |11231  |null |Pizza              |40.67803401508100|-73.99797023494800|
|50109305|SIN FRONTERAS          |3   |3913    |13 AVENUE                       |11218  |null |null               |40.64048191060400|-73.98642303016000|
|50116144|null                   |1   |2374    |ADAM CLAYTON POWELL JR BOULEVARD|n

                                                                                

28324

##### Step 6: Getting dataset ready for the inspection_data table

In [13]:
FinalDF_inspections= dataDF.select(["camis", "inspection_date", "inspection_type", "action", "violation_code", "violation_description", "critical_flag", "score", "grade" , "grade_date", "community_board", "council_district", "census_tract", "bin", "bbl", "nta", "record_date"]).distinct()

FinalDF_inspections.cache()

FinalDF_inspections.show(10, truncate = False)
FinalDF_inspections.printSchema()

print("\n Number of rows in the FinalDF_inspections dataframe:")
FinalDF_inspections.count()




23/04/28 00:42:58 WARN TaskSetManager: Stage 17 contains a task of very large size (12101 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------+-------------------+---------------------------------------------+-----------------------------------------------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+-----+-----+-------------------+---------------+----------------+------------+-------+----------+----+-------------------+
|camis   |inspection_date    |inspection_type                              |action                                         |violation_code|violation_description                                                                                                                                                                                                                                              

                                                                                

202109

### Data Writing : Write data into postgresql database tables using Pyspark dataframes

In [14]:
url = "jdbc:postgresql://localhost:5432/postgres_db"
properties = {"user": "postgres", "password": "123", "driver": "org.postgresql.Driver"}

print("\n Data write to postgresql begins")



 Data write to postgresql begins


In [16]:
# Increase the number of partitions
FinalDF_raw = FinalDF_raw.repartition(100)

# Use a more efficient partitioning scheme
FinalDF_raw.write.option("batchsize", "2000").jdbc(url=url, table="restaurant_inspection_raw", mode="append", properties=properties)

print("\n restaurant_inspection_data raw table updated.")

23/04/28 00:44:12 WARN TaskSetManager: Stage 24 contains a task of very large size (12101 KiB). The maximum recommended task size is 1000 KiB.


 restaurant_inspection_data raw table updated.


                                                                                

In [17]:
# Increase the number of partitions
FinalDF_inspections = FinalDF_inspections.repartition(100)

# Use a more efficient partitioning scheme
FinalDF_inspections.write.option("batchsize", "2000").jdbc(url=url, table="inspection_data", mode="overwrite", properties=properties)

print("\n inspection_data table updated.")

                                                                                


 inspection_data table updated.


In [18]:

# Increase the number of partitions
FinalDF_restaurants = FinalDF_restaurants.repartition(100)

# Use a more efficient partitioning scheme
FinalDF_restaurants.write.option("batchsize", "2000").jdbc(url=url, table="restaurants", mode="overwrite", properties=properties)

print("\n Restaurant table updated.")

print('\n Job Finishes')




 Restaurant table updated.

 Job Finishes


                                                                                