<a href="https://colab.research.google.com/github/kalona/net.jgp.books.spark.ch03/blob/master/src/main/python/ch03_majesctic_df.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# noinspection PyUnresolvedReferences
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ch03").getOrCreate()

In [2]:
wake_path = "../../../data/Restaurants_in_Wake_County_NC.csv"
durham_path = "../../../data/Restaurants_and_Services_(Feb_21_2017).xlsx"
wake_url = "https://shortener.manning.com/Jz2P"
durham_url = "https://www.arcgis.com/sharing/rest/content/items/7f37ceecd9fc4b7bb4b26f46b13cdfce/data"

In [4]:
import os
import requests

# Check if the files exist
for file_path, url in [(wake_path, wake_url), (durham_path, durham_url)]:
    if not os.path.exists(file_path):
        print(f"File '{file_path}' does not exist. We will download it.")

        # Create the directory if it does not exist
        os.makedirs(os.path.dirname(file_path), exist_ok=True)

        # Download the file
        response = requests.get(url, stream=True)
        response.raise_for_status()

        with open(file_path, 'wb') as file:
            for chunk in response.iter_content(chunk_size=8192):
                file.write(chunk)

        print(f"File '{file_path}' downloaded successfully.")

    else:
        print(f"File '{file_path}' already exists.")

File '../../../data/Restaurants_in_Wake_County_NC.csv' already exists.
File '../../../data/Restaurants_and_Services_(Feb_21_2017).xlsx' does not exist. We will download it.
File '../../../data/Restaurants_and_Services_(Feb_21_2017).xlsx' downloaded successfully.


In [3]:
df_wake_county = (
    spark.read.format("csv").option("header", "true").load(wake_path)
)

In [4]:
# print("*** Right after ingestion")
df_wake_county.show(10)

+--------+-----------+--------------------+--------------------+--------------------+-----------+-----+----------+--------------+--------------------+-----------------+--------+------------+-----------+-------------+
|OBJECTID|     HSISID|                NAME|            ADDRESS1|            ADDRESS2|       CITY|STATE|POSTALCODE|   PHONENUMBER|  RESTAURANTOPENDATE|     FACILITYTYPE|PERMITID|           X|          Y|GEOCODESTATUS|
+--------+-----------+--------------------+--------------------+--------------------+-----------+-----+----------+--------------+--------------------+-----------------+--------+------------+-----------+-------------+
|    1001|04092016024|                WABA|2502 1/2 HILLSBOR...|                NULL|    RALEIGH|   NC|     27607|(919) 833-1710|2011-10-18T00:00:...|       Restaurant|    6952|-78.66818477|35.78783803|            M|
|    1002|04092021693|  WALMART DELI #2247|2010 KILDAIRE FAR...|                NULL|       CARY|   NC|     27518|(919) 852-6651|201

In [8]:
df_wake_county.schema

StructType([StructField('OBJECTID', StringType(), True), StructField('HSISID', StringType(), True), StructField('NAME', StringType(), True), StructField('ADDRESS1', StringType(), True), StructField('ADDRESS2', StringType(), True), StructField('CITY', StringType(), True), StructField('STATE', StringType(), True), StructField('POSTALCODE', StringType(), True), StructField('PHONENUMBER', StringType(), True), StructField('RESTAURANTOPENDATE', StringType(), True), StructField('FACILITYTYPE', StringType(), True), StructField('PERMITID', StringType(), True), StructField('X', StringType(), True), StructField('Y', StringType(), True), StructField('GEOCODESTATUS', StringType(), True)])

In [5]:
df_wake_county.printSchema()
print(f"We have {df_wake_county.count()} records.")

root
 |-- OBJECTID: string (nullable = true)
 |-- HSISID: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- ADDRESS1: string (nullable = true)
 |-- ADDRESS2: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- POSTALCODE: string (nullable = true)
 |-- PHONENUMBER: string (nullable = true)
 |-- RESTAURANTOPENDATE: string (nullable = true)
 |-- FACILITYTYPE: string (nullable = true)
 |-- PERMITID: string (nullable = true)
 |-- X: string (nullable = true)
 |-- Y: string (nullable = true)
 |-- GEOCODESTATUS: string (nullable = true)

We have 3440 records.


In [17]:
    # Let's transform our dataframe
from pyspark.sql import functions as F

df_wake_county = (
    df_wake_county.withColumn("county", F.lit("Wake"))
    .withColumnRenamed("HSISID", "datasetId")
    .withColumnRenamed("NAME", "name")
    .withColumnRenamed("ADDRESS1", "address1")
    .withColumnRenamed("ADDRESS2", "address2")
    .withColumnRenamed("CITY", "city")
    .withColumnRenamed("STATE", "state")
    .withColumnRenamed("POSTALCODE", "zip")
    .withColumnRenamed("PHONENUMBER", "tel")
    .withColumnRenamed("RESTAURANTOPENDATE", "dateStart")
    .withColumn(
        "dateEnd", F.lit("null"))
    .withColumnRenamed("FACILITYTYPE", "type")
    .withColumnRenamed("X", "geoX")
    .withColumnRenamed("Y", "geoY")
    .drop("OBJECTID", "PERMITID", "GEOCODESTATUS")
)

df_wake_county = df_wake_county.withColumn(
    "id",
    F.concat(
        F.col("state"), F.lit("_"), F.col("county"), F.lit("_"), F.col("datasetId")
    ),
)

In [18]:
df_wake_county.show(10)

+-----------+--------------------+--------------------+--------------------+-----------+-----+----------+--------------+--------------------+-----------------+------------+-----------+------+-------+-------------------+
|  datasetId|                name|            address1|            address2|       city|state|       zip|           tel|           dateStart|             type|        geoX|       geoY|county|dateEnd|                 id|
+-----------+--------------------+--------------------+--------------------+-----------+-----+----------+--------------+--------------------+-----------------+------------+-----------+------+-------+-------------------+
|04092016024|                WABA|2502 1/2 HILLSBOR...|                NULL|    RALEIGH|   NC|     27607|(919) 833-1710|2011-10-18T00:00:...|       Restaurant|-78.66818477|35.78783803|  Wake|   null|NC_Wake_04092016024|
|04092021693|  WALMART DELI #2247|2010 KILDAIRE FAR...|                NULL|       CARY|   NC|     27518|(919) 852-6651|

In [8]:
df_durham_county = (
    spark.read.format("json")
    .load("../../../data/Restaurants_in_Durham_County_NC.json")
)

In [9]:
df_durham_county.show(10)

+----------------+--------------------+--------------------+--------------------+--------------------+
|       datasetid|              fields|            geometry|    record_timestamp|            recordid|
+----------------+--------------------+--------------------+--------------------+--------------------+
|restaurants-data|{NULL, Full-Servi...|{[-78.9573299, 35...|2017-07-13T09:15:...|1644654b953d1802c...|
|restaurants-data|{NULL, Nursing Ho...|{[-78.8895483, 36...|2017-07-13T09:15:...|93573dbf8c9e799d8...|
|restaurants-data|{NULL, Fast Food ...|{[-78.9593263, 35...|2017-07-13T09:15:...|0d274200c7cef50d0...|
|restaurants-data|{NULL, Full-Servi...|{[-78.9060312, 36...|2017-07-13T09:15:...|cf3e0b175a6ebad2a...|
|restaurants-data|{NULL, NULL, [36....|{[-78.9135175, 36...|2017-07-13T09:15:...|e796570677f7c39cc...|
|restaurants-data|{NULL, NULL, [35....|{[-78.8077969, 35...|2017-07-13T09:15:...|90cdb7722ea7d4ffd...|
|restaurants-data|{NULL, NULL, [35....|{[-78.8865863, 35...|2017-07-13T09

In [9]:
df_durham_county.printSchema()

root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- closing_date: string (nullable = true)
 |    |-- est_group_desc: string (nullable = true)
 |    |-- geolocation: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- hours_of_operation: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- insp_freq: long (nullable = true)
 |    |-- opening_date: string (nullable = true)
 |    |-- premise_address1: string (nullable = true)
 |    |-- premise_address2: string (nullable = true)
 |    |-- premise_city: string (nullable = true)
 |    |-- premise_name: string (nullable = true)
 |    |-- premise_phone: string (nullable = true)
 |    |-- premise_state: string (nullable = true)
 |    |-- premise_zip: string (nullable = true)
 |    |-- risk: long (nullable = true)
 |    |-- rpt_area_desc: string (nullable = true)
 |    |-- seats: long (nullable = true)
 |    |-- sewage: string (nullable = true)
 |   

In [None]:
import pyspark.sql.functions as F

df_durham_county_normalized = (
    df_durham_county.withColumn("county", F.lit("Durham"))
    .withColumn("datasetId", F.col("fields.id"))
    .withColumn("name", F.col("fields.premise_name"))
    .withColumn("address1", F.col("fields.premise_address1"))
    .withColumn("address2", F.col("fields.premise_address2"))
    .withColumn("city", F.col("fields.premise_city"))
    .withColumn("state", F.col("fields.premise_state"))
    .withColumn("zip", F.col("fields.premise_zip"))
    .withColumn("tel", F.col("fields.premise_phone"))
    .withColumn("dateStart", F.col("fields.opening_date"))
    .withColumn("dateEnd", F.col("fields.closing_date"))
    .withColumn("type", F.split(F.col("fields.type_description"), " - ").getItem(1))
    .withColumn("geoX", F.col("fields.geolocation").getItem(0))
    .withColumn("geoY", F.col("fields.geolocation").getItem(1))
)

df_durham_county_normalized = df_durham_county_normalized.withColumn("id", F.concat(F.col("state"), F.lit("_"), F.col("county"), F.lit("_"),
                                                  F.col("datasetId")))

df_durham_county_normalized.printSchema()
print("Before dropping nested fields")
df_durham_county_normalized = df_durham_county_normalized.drop("fields", "geometry", "record_timestamp", "recordid")
df_durham_county_normalized.count()

root
 |-- datasetId: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- closing_date: string (nullable = true)
 |    |-- est_group_desc: string (nullable = true)
 |    |-- geolocation: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- hours_of_operation: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- insp_freq: long (nullable = true)
 |    |-- opening_date: string (nullable = true)
 |    |-- premise_address1: string (nullable = true)
 |    |-- premise_address2: string (nullable = true)
 |    |-- premise_city: string (nullable = true)
 |    |-- premise_name: string (nullable = true)
 |    |-- premise_phone: string (nullable = true)
 |    |-- premise_state: string (nullable = true)
 |    |-- premise_zip: string (nullable = true)
 |    |-- risk: long (nullable = true)
 |    |-- rpt_area_desc: string (nullable = true)
 |    |-- seats: long (nullable = true)
 |    |-- sewage: string (nullable = true)
 |   

2463

In [6]:
df_durham_county_normalized.printSchema()

root
 |-- datasetId: string (nullable = true)
 |-- county: string (nullable = false)
 |-- name: string (nullable = true)
 |-- address1: string (nullable = true)
 |-- address2: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- tel: string (nullable = true)
 |-- dateStart: string (nullable = true)
 |-- dateEnd: string (nullable = true)
 |-- type: string (nullable = true)
 |-- geoX: double (nullable = true)
 |-- geoY: double (nullable = true)
 |-- id: string (nullable = true)



In [19]:
df_durham_county_normalized.show(10)

+---------+------+--------------------+--------------------+--------+------+-----+-----+--------------+----------+-------+--------------------+----------+-----------+---------------+
|datasetId|county|                name|            address1|address2|  city|state|  zip|           tel| dateStart|dateEnd|                type|      geoX|       geoY|             id|
+---------+------+--------------------+--------------------+--------+------+-----+-----+--------------+----------+-------+--------------------+----------+-----------+---------------+
|    56060|Durham|    WEST 94TH ST PUB| 4711 HOPE VALLEY RD|SUITE 6C|DURHAM|   NC|27707|(919) 403-0025|1994-09-01|   NULL|          Restaurant|35.9207272|-78.9573299|NC_Durham_56060|
|    58123|Durham|BROOKDALE DURHAM IFS|4434 BEN FRANKLIN...|    NULL|DURHAM|   NC|27704|(919) 479-9966|2003-10-15|   NULL|Institutional Foo...|36.0467802|-78.8895483|NC_Durham_58123|
|    70266|Durham|       SMOOTHIE KING|1125 W. NC HWY 54...|    NULL|DURHAM|   NC|277

In [31]:
df_final = df_wake_county.unionByName(df_durham_county_normalized)

In [34]:
df_final.show(10)

+-----------+--------------------+--------------------+--------------------+-----------+-----+----------+--------------+--------------------+-----------------+------------+-----------+------+-------+-------------------+
|  datasetId|                name|            address1|            address2|       city|state|       zip|           tel|           dateStart|             type|        geoX|       geoY|county|dateEnd|                 id|
+-----------+--------------------+--------------------+--------------------+-----------+-----+----------+--------------+--------------------+-----------------+------------+-----------+------+-------+-------------------+
|04092016024|                WABA|2502 1/2 HILLSBOR...|                NULL|    RALEIGH|   NC|     27607|(919) 833-1710|2011-10-18T00:00:...|       Restaurant|-78.66818477|35.78783803|  Wake|   null|NC_Wake_04092016024|
|04092021693|  WALMART DELI #2247|2010 KILDAIRE FAR...|                NULL|       CARY|   NC|     27518|(919) 852-6651|

In [33]:
df_final.filter(df_final.county == "Durham").show(10)

+---------+--------------------+--------------------+--------+------+-----+-----+--------------+----------+--------------------+----------+-----------+------+-------+---------------+
|datasetId|                name|            address1|address2|  city|state|  zip|           tel| dateStart|                type|      geoX|       geoY|county|dateEnd|             id|
+---------+--------------------+--------------------+--------+------+-----+-----+--------------+----------+--------------------+----------+-----------+------+-------+---------------+
|    56060|    WEST 94TH ST PUB| 4711 HOPE VALLEY RD|SUITE 6C|DURHAM|   NC|27707|(919) 403-0025|1994-09-01|          Restaurant|35.9207272|-78.9573299|Durham|   NULL|NC_Durham_56060|
|    58123|BROOKDALE DURHAM IFS|4434 BEN FRANKLIN...|    NULL|DURHAM|   NC|27704|(919) 479-9966|2003-10-15|Institutional Foo...|36.0467802|-78.8895483|Durham|   NULL|NC_Durham_58123|
|    70266|       SMOOTHIE KING|1125 W. NC HWY 54...|    NULL|DURHAM|   NC|27707|(919