### Loading data to Lakehouse

There is no single best way to load data into the a lakehouse.  This workbook will use these techniques: 

- Simple drag and drop csv or Excel files using "OneLake File Explorer" or the Upload feature
- From a notebook use Python to read a csv file:  Raw data will be stored in  


We are not dealing with large amounts of data, and when we update our enriched data, we will replace the full dataset each time. We are using public data sources, so we need to be prepared if they change the source format.  The public data sources used in this workbook are
- U.S. Post Office
- U.S. Census 2020 (States, Counties, CBSA, ZCTAS, Places, Blocks, Address Counts)
- 

In [10]:
# Parameters used 
# RawLakehouse and CuratedLakehouse
# directory used for storing files

rawlakehousefiles = "abfss://45516964-41e4-478d-bf96-0628e0339cc3@onelake.dfs.fabric.microsoft.com/241a8454-878c-4fa0-82c5-3996a743b2ef/Files"
enrichedlakehousefiles = "abfss://45516964-41e4-478d-bf96-0628e0339cc3@onelake.dfs.fabric.microsoft.com/a2db409c-43eb-4218-a351-cf2bc3c98d50/Files"
directory = "/Geography/"
rawlakehouse = "ub_raw_engineering"
enrichedlakehouse = "ub_enriched_engineering"

StatementMeta(, deda66e5-7007-484a-b661-ad4b23a8fa99, 12, Finished, Available)

### U.S. Postal Service

Postal Codes:  US Postal Service  (https://postalpro.usps.com/ZIP_Locale_Detail)
<br>A File is downloaded in Excel xlsx format and saved to the Geography directory using OneLake File Explorer 
<br>
<br>Pandas is used to read the raw excel file, update the headers, specify data types  (pdzips)
<br>A Spark DataFrame is used to write the data in delta format (zips)
<br>
<br>Note: Excel 


In [13]:
import pandas as pd
import numpy as np

filename = "ZIP_Locale_Detail.xlsx"
sheetname = "ZIP_DETAIL"
deltaTableName = enrichedlakehouse+".geo_zips"

# Returns a Pandas DataFrame reading data from Excel
pdzips = pd.read_excel(rawlakehousefiles + directory + filename, sheet_name=sheetname, \
header=0,names=['AreaName', 'AreaCode', 'DistrictName', 'DistrictNo', 'DeliveryZipCode', 'LocaleName', 'PhysicalDelvAddr', 'PhysicalCity', \
'PhysicalState', 'PhysicalZip', 'PhysicalZip4'], skiprows=1, dtype={'DistrictNo': int, 'DeliveryZipCode': int, 'PhysicalZip': int, 'PhysicalZip4': str})

# Returns a Spark DataFrame from the Pandas DataFrame
zips = spark.createDataFrame(pdzips)

# Only needed when writing the initial code... commented out for production
# zips.show()
# zips.printSchema()

# Write the data in delta format to the enrichedlakehouse
zips.write.format("delta").mode("overwrite")\
.saveAsTable(deltaTableName)

StatementMeta(, deda66e5-7007-484a-b661-ad4b23a8fa99, 15, Finished, Available)

  warn("""Cannot parse header or footer so it will be ignored""")


#### U.S. Census Bureau

##### State Codes
Source Data:  US Census TIGER Data Products (https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html)
<br>File is downloaded in csv format and saved to the Geography directory using OneLake File Explorer 
<br>A Spark DataFrame is used to read the csv file using a custom schema, then write the data in delta format (statedf)

In [14]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

filename = rawlakehousefiles+directory+"TigerState.csv"
deltaTableName = enrichedlakehouse+".geo_state"

stateschema = StructType([
    StructField("region",IntegerType() ,True),
    StructField("division",IntegerType(),True),
    StructField("statefp",IntegerType(),True),
    StructField("statens",IntegerType(),True),
    StructField("geoid",IntegerType(),True),
    StructField("geofq",IntegerType(),True),
    StructField("stusps",StringType(),True),
    StructField("name",StringType(),True),
    StructField("lsad",StringType(),True),
    StructField("mtfcc",StringType(),True),
    StructField("funcstat",StringType(),True),
    StructField("aland",IntegerType(),True),
    StructField("awater",IntegerType(),True),
    StructField("intptlat",FloatType(),True),
    StructField("intptlon",FloatType(),True)    
])

# Returns a Spark DataFrame reading data from csv 
statedf = spark.read.csv(filename, schema=stateschema, header=True) \
.write.format("delta").mode("overwrite") \
.saveAsTable(deltaTableName) 


StatementMeta(, deda66e5-7007-484a-b661-ad4b23a8fa99, 16, Finished, Available)

In [15]:
# Creates County Geography Table
# Source data:  Topologically Integrated Geographic Encoding and Referencing system 
#    State Codes:  US Census TIGER Data Products (https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html)
#    File is downloaded in csv format and saved to the Geography directory using OneLake File Explorer 
#
# A Spark DataFrame is used to read the csv file using a custom schema, then write the data in delta format (statedf)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

filename = rawlakehousefiles+directory+"TigerCounty.csv"
deltaTableName = enrichedlakehouse+".geo_county"

countyschema = StructType([
    StructField("statefp",IntegerType(),True),
    StructField("countyfp",IntegerType(),True),
    StructField("countyns",IntegerType(),True),
    StructField("geoid",IntegerType(),True),
    StructField("geofq",IntegerType(),True),
    StructField("name",StringType(),True),
    StructField("namelsad",StringType(),True),
    StructField("lsad",StringType(),True),
    StructField("classfp",StringType(),True),
    StructField("mtfcc",StringType(),True),
    StructField("csafp",IntegerType(),True),
    StructField("cbsafp",IntegerType(),True),
    StructField("metdivfp",IntegerType(),True),  
    StructField("funcstat",StringType(),True),
    StructField("aland",IntegerType(),True),
    StructField("awater",IntegerType(),True),
    StructField("intptlat",FloatType(),True),
    StructField("intptlon",FloatType(),True)    
])


# Returns a Spark DataFrame reading data from csv 
countydf = spark.read.csv(filename, schema=countyschema, header=True) \
.write.format("delta").mode("overwrite") \
.saveAsTable(deltaTableName) 


StatementMeta(, deda66e5-7007-484a-b661-ad4b23a8fa99, 17, Finished, Available)

In [16]:
# Creates CBSA Table (Core Based Statistical Area)
# Source data:  Topologically Integrated Geographic Encoding and Referencing system 
#    State Codes:  US Census TIGER Data Products (https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html)
#    File is downloaded in csv format and saved to the Geography directory using OneLake File Explorer 
#
# A Spark DataFrame is used to read the csv file using a custom schema, then write the data in delta format (statedf)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

filename = rawlakehousefiles+directory+"TigerCbsa.csv"
deltaTableName = enrichedlakehouse+".geo_cbsa"

cbsaschema = StructType([
    StructField("csafp",IntegerType(),True),
    StructField("cbsafp",IntegerType(),True),
    StructField("geoid",IntegerType(),True),
    StructField("geofq",IntegerType(),True),
    StructField("name",StringType(),True),
    StructField("namelsad",StringType(),True),
    StructField("memi",StringType(),True),
    StructField("mtfcc",StringType(),True),
    StructField("aland",IntegerType(),True),
    StructField("awater",IntegerType(),True),
    StructField("intptlat",FloatType(),True),
    StructField("intptlon",FloatType(),True)    
])


# Returns a Spark DataFrame reading data from csv 
countydf = spark.read.csv(filename, schema=cbsaschema, header=True) \
.write.format("delta").mode("overwrite") \
.saveAsTable(deltaTableName) 


StatementMeta(, deda66e5-7007-484a-b661-ad4b23a8fa99, 18, Finished, Available)

In [17]:
# Creates ZCTA Table (Zip Code Tabulation Area)
# Source data:  Topologically Integrated Geographic Encoding and Referencing system 
#    State Codes:  US Census TIGER Data Products (https://www.census.gov/programs-surveys/geography/guidance/tiger-data-products-guide.html)
#    File is downloaded in csv format and saved to the Geography directory using OneLake File Explorer 
#
# A Spark DataFrame is used to read the csv file using a custom schema, then write the data in delta format (statedf)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

filename = rawlakehousefiles+directory+"TigerZCTA.csv"
deltaTableName = enrichedlakehouse+".geo_zcta"

zctaschema = StructType([
    StructField("zcta",IntegerType(),True),
    StructField("geoid",IntegerType(),True),
    StructField("geofq",IntegerType(),True),
    StructField("classfp",StringType(),True),
    StructField("mtfcc",StringType(),True),
    StructField("funcstat",StringType(),True),
    StructField("aland",IntegerType(),True),
    StructField("awater",IntegerType(),True),
    StructField("intptlat",FloatType(),True),
    StructField("intptlon",FloatType(),True)    
])


# Returns a Spark DataFrame reading data from csv 
zctadf = spark.read.csv(filename, schema=zctaschema, header=True) \
.write.format("delta").mode("overwrite") \
.saveAsTable(deltaTableName) 


StatementMeta(, deda66e5-7007-484a-b661-ad4b23a8fa99, 19, Finished, Available)