# Raw Data Ingestion

This workshop will use data from the City of Chicago Open Data Portal: <https://data.cityofchicago.org>. The following datasets will be used:

1. Business license data: <https://data.cityofchicago.org/Community-Economic-Development/Business-Licenses/r5kz-chrr>
2. Food inspections: <https://data.cityofchicago.org/Health-Human-Services/Food-Inspections/4ijn-s7e5>

## Setup

In [None]:
import os
import requests
import re
from urllib.parse import urlencode

import pandas as pd
from sqlalchemy import create_engine, text

In [None]:
# Jupyter / Pandas settings
pd.options.display.max_columns = 999

In [None]:
# Set up postgresql connection
db_user = "posit"
db_password = os.environ["CONF23_DB_PASSWORD"]
db_host = os.environ["CONF23_DB_HOST"]
db_port = 5432
db_database = "conf23_python"
engine = create_engine(f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}/{db_database}")
engine

Set dyanmic variables. To ensure that we do not have overload the database or the server, only the instructors scripts will run on the full data set.

In [None]:
connect_username = requests.get(
    f"{os.environ['CONNECT_SERVER']}/__api__/v1/user",
    headers={"Authorization": f"Key {os.environ['CONNECT_API_KEY']}"}
).json()["username"]

connect_username

In [None]:
if connect_username == "sam.edwardes":
    max_rows = 99_999_999
else:
    max_rows = 10_000

max_rows

## Data set (1): Food inspections

<https://data.cityofchicago.org/Health-Human-Services/Food-Inspections/4ijn-s7e5>

**Step 1:** Gew the raw data from the data portal

In [None]:
base_url = "https://data.cityofchicago.org/resource/4ijn-s7e5.csv"
params = {"$order": "inspection_date", "$limit": max_rows}
url = f"{base_url}?{urlencode(params)}"
url

In [None]:
# Read everything as a string so that the formatting is preserved. We will use 
# pandera later to convert everything to the correct type.
food_inspection_data = pd.read_csv(url, dtype=str)
food_inspection_data

**Step 2:** Save to data to Postgres

Determine a prefix for the table name so that we do not overwrite eachothers data.

In [None]:
# determine the table name
if connect_username == "sam.edwardes":
    table_name_prefix = ""
else:
    table_name_prefix = re.sub('[^0-9a-zA-Z]+', '_', "sam-edwardes") + "_"

table_name = f"{table_name_prefix}food_inspection_raw"

table_name

In [None]:
# Insert the data into postgres. Inserting large amounts of data can be slow, so
# iterate over 10,000 rows at a time.

n_rows = food_inspection_data.shape[0]
step_size = 10_000

for i in range(0, n_rows, step_size):
    index_start = i
    index_end = min(n_rows, i + step_size - 1)
    
    if i == 0:
        if_exists = "replace"
    else:
        if_exists = "append"

    print(f"Inserting rows: {index_start:,} - {index_end:,}")

    food_inspection_data \
        .loc[index_start:index_end, :] \
        .to_sql(table_name, engine, if_exists=if_exists, index=False)

In [None]:
# Confirm number of rows
with engine.begin() as conn:
    query = text(f"SELECT COUNT(*) FROM {table_name}")
    data_from_sql = pd.read_sql_query(query, conn)

print(data_from_sql)

## Data set (2): Business License Data

<https://data.cityofchicago.org/Community-Economic-Development/Business-Licenses/r5kz-chrr>

**Step 1:** Gew the raw data from the data portal

In [None]:
base_url = "https://data.cityofchicago.org/resource/r5kz-chrr.csv"
params = {"$order": "id", "$limit": max_rows}
url = f"{base_url}?{urlencode(params)}"
url

In [None]:
# Read everything as a string so that the formatting is preserved. We will use 
# pandera later to convert everything to the correct type.
business_license_data = pd.read_csv(url, dtype=str)
business_license_data

In [None]:
# To speed things up, we will only keep the license data for establishments
# that have food inspection data.
business_license_data = pd.merge(
    business_license_data,
    food_inspection_data[["license_"]].drop_duplicates(),
    how="inner",
    left_on="license_id",
    right_on="license_"
).drop(
    columns="license_"
)

business_license_data.info()

**Step 2:** Save data to Postgres

In [None]:
table_name = f"{table_name_prefix}business_license_raw"
table_name

In [None]:
# Insert the data into postgres. Inserting large amounts of data can be slow, so
# iterate over 10,000 rows at a time.

# TODO: dynamically insert user name into table if not PROD

n_rows = business_license_data.shape[0]
step_size = 10_000

for i in range(0, n_rows, step_size):
    index_start = i
    index_end = min(n_rows, i + step_size - 1)
    
    if i == 0:
        if_exists = "replace"
    else:
        if_exists = "append"

    print(f"Inserting rows: {index_start:,} - {index_end:,}")
    
    business_license_data \
        .loc[index_start:index_end, :] \
        .to_sql(table_name, engine, if_exists=if_exists, index=False)

In [None]:
# Confirm number of rows
with engine.begin() as conn:
    query = text(f"SELECT COUNT(*) FROM {table_name}")
    data_from_sql = pd.read_sql_query(query, conn)

print(data_from_sql)