In [None]:
# Create widgets to receive job parameters
dbutils.widgets.text("catalog", "mk_fiddles")
dbutils.widgets.text("schema", "detroit_911")

# Get parameter values
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
table_name = "incidents_bronze"
table = f"{catalog}.{schema}.{table_name}"

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema}")

In [0]:
import datetime as dt
import time

import pandas as pd
import requests


TODAY = dt.datetime.now()
RAW_CALLS_ROUTE: str = (
    "https://services2.arcgis.com/qvkbeam7Wirps6zC/arcgis/rest/services/Police_Serviced_911_Calls/FeatureServer/0/query?where=called_at>'{}'&outFields=*&returnGeometry=false&f=json"
)

def _get_latest_date_in_dataset(table: str) -> str:
    """
    Get the latest date in the dataset minus one day.

    Determines the start date for the API call
    """
    df = spark.sql(f"SELECT max(called_at) as latest_call_date FROM {table}")
    latest_datetime = pd.to_datetime(df.first()["latest_call_date"], unit='ms') - dt.timedelta(days=1)
    return latest_datetime.strftime("%Y-%m-%d")


def pull_crime_data(num_records: int, latest_call_date: str) -> pd.DataFrame:
    """
    Pull data from the Detroit Police Department's 911 Call for Service API.
    API call generated here: https://data.detroitmi.gov/datasets/detroitmi::911-calls-for-service-last-30-days/about
    """

    raw_data = pd.DataFrame()
    for i, offset in enumerate(range(0, num_records + 2000, 2000)):
        data = requests.get(RAW_CALLS_ROUTE.format(latest_call_date) + f"&resultOffset={offset}")
        data = data.json()
        records = [row["attributes"] for row in data.get("features")]
        df = pd.DataFrame(records)
        start_shape = raw_data.shape
        raw_data = pd.concat((raw_data, df), axis=0)
        if raw_data.drop_duplicates().shape == start_shape:
            print("DataFrame is the same size as before. Exiting loop...")
            break
        if i % 10 == 0:
            print(f"Data Pulled for {offset} records")
            # be nice or pay the price
            time.sleep(1)

    return raw_data


def main():
    try:
        latest_date = _get_latest_date_in_dataset(table)
        existing_records = spark.sql(f"SELECT incident_id from {table}")
    except Exception as e:
        latest_date = (dt.datetime.now() - dt.timedelta(days=90)).strftime("%Y-%m-%d")
        existing_records = spark.createDataFrame(data=pd.DataFrame({"incident_id": [0]}))

    count_url = RAW_CALLS_ROUTE.format(latest_date) + "&returnCountOnly=true"
    record_count = requests.get(count_url)
    num_records = int(record_count.json()["count"])
    print(f"Total Records since last pull: {num_records}")

    detroit_crime_raw = pull_crime_data(num_records, latest_date)
    detroit_crime_raw["day"] = (dt.datetime.now() + pd.DateOffset(days=1)).strftime("%Y-%m-%d")
    
    detroit_crime_raw = spark.createDataFrame(detroit_crime_raw)
    detroit_crime_raw = detroit_crime_raw.join(existing_records, on="incident_id", how="leftanti")

    return detroit_crime_raw

detroit_crime_raw = main()


In [0]:

if detroit_crime_raw.count() > 0:
  print(f"Writing {detroit_crime_raw.count()} records to {table}")
  detroit_crime_raw.write.mode("append").saveAsTable(f"mk_fiddles.detroit_911.{table_name}")