# Data Pipeline using PySpark/Delta Lake
Uses publicly available geography data sets.

References:
- [Geonames zipcodes](http://download.geonames.org/export/zip/)
- [US Gazetteer](https://www.usgs.gov/core-science-systems/ngp/board-on-geographic-names/download-gnis-data)
- [US Gazetteer Spec](https://geonames.usgs.gov/docs/pubs/Nat_State_Topic_File_formats.pdf)

In [None]:
from datetime import date

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from delta.tables import DeltaTable

In [None]:
%%bash

ls /opt/spark/jupyter-lib/input_data/

## Build Spark session with Delta Table bindings

In [None]:
spark = (
    SparkSession
    .builder
    .appName("example-3-geonames")
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

In [None]:
zipcode_raw_path = 's3a://condesa/input_data/geonames/'
geography_raw_path = 's3a://condesa/input_data/gazetteer/'
output_root_path = 's3a://condesa/output_data'

## Define Input Schemas

In [None]:
GEONAMES = """
    country_code STRING,
    postal_code  STRING,
    place_name   STRING,
    admin_name1  STRING,  -- order subdivision (state)
    admin_code1  STRING,
    admin_name2  STRING,  -- order subdivision (county/province)
    admin_code2  STRING,
    admin_name3  STRING,  -- order subdivision (community)
    admin_code3  STRING,
    latitude     DECIMAL(12, 2),
    longitude    DECIMAL(12, 2),
    -- of lat/lng: 1=estimated, 4=geonameid, 6=centroid of addresses or shape
    accuracy     INT
"""

GAZETTEER = """
    feature_id      INT,
    feature_name    STRING,
    feature_class   STRING,
    state_alpha     STRING,
    state_numeric   STRING,
    county_name     STRING,
    county_numeric  STRING,
    primary_lat_dms STRING,
    prim_long_dms   STRING,
    prim_lat_dec    DECIMAL(12, 2),
    prim_long_dec   DECIMAL(12, 2),
    source_lat_dms  STRING,
    source_long_dms STRING,
    source_lat_dec  DECIMAL(12, 2),
    source_long_dec DECIMAL(12, 2),
    elev_in_m       INT,
    elev_in_ft      INT,
    map_name        STRING,
    date_created    DATE,
    date_edited     DATE
"""

## Read zipcodes CSV into DataFrame
TODO:
- Pick a specific file within a ZIP archive.

In [None]:
zipcode = (
    spark
    .read
    .csv(
        path=zipcode_raw_path,
        schema=GEONAMES,
        sep="\t",
        header=False
    )
    .withColumnRenamed('admin_name1', 'state_name')
    .withColumnRenamed('admin_code1', 'state_code')
    .withColumnRenamed('admin_name2', 'county_name')
    .withColumnRenamed('admin_code2', 'county_code')
    .drop('accuracy', 'admin_name3', 'admin_code3')
    .filter(col('state_code').isNotNull())
)

## Read Gazetteer geographies from CSV

In [None]:
geography = (
    spark
    .read
    .csv(
        path=geography_raw_path,
        schema=GAZETTEER,
        sep="|",
        dateFormat="MM/dd/yyyy",
        mode="FAILFAST",
        header=True
    )
    .withColumnRenamed('state_alpha', 'state_code')
    .withColumnRenamed('county_numeric', 'county_code')
    .withColumnRenamed('prim_lat_dec', 'latitude')
    .withColumnRenamed('prim_long_dec', 'longitude')
    .drop('state_numeric', 
          'primary_lat_DMS', 'prim_long_dms',
          'source_lat_dms', 'source_long_dms', 
          'source_lat_dec', 'source_long_dec',
          'elev_in_ft', 'date_edited'
         )
    .filter(col('state_code').isNotNull())
)

## Stage unrefined data

In [None]:
zipcode_path = (
        "{root}/stage/zipcode/parquet/{Y}/{M:02d}/{D:02d}"
        .format(root='/opt/spark/jupyter-lib/output_data',
                Y=date.today().year,
                M=date.today().month,
                D=date.today().day)
    )

(
    zipcode
    .write
    .parquet(
        zipcode_path,
        mode='overwrite',
        partitionBy='state_code'
    )
)

In [None]:
geography_path = (
        "{root}/stage/geography/parquet/{Y}/{M:02d}/{D:02d}"
        .format(root='/opt/spark/jupyter-lib/output_data',
                Y=date.today().year,
                M=date.today().month,
                D=date.today().day)
    )
(
    geography
    .write
    .parquet(
        geography_path,
        mode='overwrite',
        partitionBy='state_code'
    )
)

## Transform data
Join geographies to zipcodes on place name, county, and state.

In [None]:
zipcode_stage = (
    spark
    .read
    .format('parquet')
    .load(zipcode_path)
)
zipcode_stage.createOrReplaceTempView('vw_zipcode')

geography_stage = (
    spark
    .read
    .format('parquet')
    .load(geography_path)
)
geography_stage.createOrReplaceTempView('vw_geography')

In [None]:
geography_conformed = spark.sql("""
    SELECT  g.feature_id,
            g.feature_name AS place_name,
            g.county_name,
            g.county_code,
            g.state_code,
            zip.postal_code,
            g.latitude,
            g.longitude,
            g.elev_in_m,
            g.map_name,
            g.date_created
    FROM    vw_geography AS g
        INNER JOIN vw_zipcode AS zip
            ON g.feature_name = zip.place_name
                AND g.county_code = zip.county_code
                AND g.state_code = zip.state_code
    WHERE   g.feature_class IN ('Populated Place')
""")

## Publish to Delta Table

In [None]:
delta_path = (
    "{root}/public/geography/delta"
    .format(root=output_root_path)
)

(
    geography_conformed
    .write
    .format('delta')
    .mode('append')
    .partitionBy('state_code')
    .save(delta_path)
)

In [None]:
geography_delta = DeltaTable.forPath(spark, path=delta_path)
geography_delta.toDF().count()