## This notebook can be used for the automated funcitonality to auto-populate the clean collection when an update is detected from the API.

In [None]:
from sodapy import Socrata
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType
from pymongo import MongoClient, UpdateOne
import sys
import json
import logging
import io
import os

In [None]:
# Source - https://stackoverflow.com/a
# Posted by Till Hoffmann
# Retrieved 2025-11-21, License - CC BY-SA 4.0

# Allows for logging to return terminal functionality at runtime.
def console_handler(stream='stdout'):
    """
    Create a handler for logging to the original console.
    """
    assert stream in {'stdout', 'stderr'}, "stream must be one of 'stdin' or 'stdout'"
    # Get the file handle of the original std stream.
    fh = getattr(sys, stream)._original_stdstream_copy
    # Create a writable IO stream.
    stream = io.TextIOWrapper(io.FileIO(fh, 'w'))
    # Set up a stream handler.
    return logging.StreamHandler(stream)

In [None]:
logging.basicConfig(
    stream=sys.stderr,   # send to console directly
    level=logging.INFO,  # or DEBUG, WARNING, etc.
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger()
logger.addHandler(console_handler())

In [None]:
# Flag for notebook early termination. If False, cells will not execute.
CONTINUE_FLAG = True

In [None]:
DATAKC_APP_TOKEN = os.environ.get('DATAKC_APP_TOKEN')
CONN_STR = os.environ.get('MONGO_CONN_STRING')

client = Socrata('data.kcmo.org', DATAKC_APP_TOKEN, timeout=60)
conn = CONN_STR

In [None]:
if CONTINUE_FLAG:
  try:
    last_mod = client.get_metadata("d4px-6rwg")['viewLastModified']
  except Exception as e:
    raise Exception("Unable to connect to DataKC API. ", e)

In [None]:
if CONTINUE_FLAG:
  try:
    mongo = MongoClient(conn)
    db = mongo.get_database("kc_311_db")
    md = db.get_collection("ingest_metadata")

    # Find the most recent document from metadata collection
    latest_md = md.find().sort([('notebook_timestamp', -1)]).limit(1).next()
  except Exception as e:
    raise Exception("Unable to connect to Mongo successfully. ", e)

In [None]:
if CONTINUE_FLAG:
    if not latest_md['api_last_modified'] < last_mod:
        # Exit, no new data to post.
        logger.info("INFO - No update available from DataKC. Exiting.")
        mongo.close()
        CONTINUE_FLAG = False

In [None]:
# If you've made it here, congrats! There's mew data to pull.
if CONTINUE_FLAG:
        logger.info("INFO - New data available, continuing import.")
        spark = SparkSession.builder \
                .master('local[1]') \
                .appName('testapp') \
                .getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")

In [None]:
if CONTINUE_FLAG:
    last_ingest = datetime.fromtimestamp(latest_md['api_last_modified']).strftime("%Y-%m-%d")

    # This will give list of dicts from the API.
    # Aim to limit by ~10k records and then hit the endpoint as many times as needed for the update (using the offest).
    results_json = client.get(dataset_identifier="d4px-6rwg",
                            select='',
                            limit=10_000,
                            offset=0,
                            where=f'''
                            date_trunc_ymd(last_updated) >= "{last_ingest}"
                            AND 1=1
                            ''',
                            order='last_updated desc'
                            )

In [None]:
# Load in the expected schema
if CONTINUE_FLAG:
    schema_json = json.loads('{"fields":[{"metadata":{},"name":"additional_questions","nullable":true,"type":"string"},{"metadata":{},"name":"council_district","nullable":true,"type":"string"},{"metadata":{},"name":"current_status","nullable":true,"type":"string"},{"metadata":{},"name":"days_to_close","nullable":true,"type":"string"},{"metadata":{},"name":"department_work_group","nullable":true,"type":"string"},{"metadata":{},"name":"incident_address","nullable":true,"type":"string"},{"metadata":{},"name":"issue_sub_type","nullable":true,"type":"string"},{"metadata":{},"name":"issue_type","nullable":true,"type":"string"},{"metadata":{},"name":"last_updated","nullable":true,"type":"string"},{"metadata":{},"name":"lat_long","nullable":true,"type":{"keyType":"string","type":"map","valueContainsNull":true,"valueType":"string"}},{"metadata":{},"name":"latitude","nullable":true,"type":"string"},{"metadata":{},"name":"longitude","nullable":true,"type":"string"},{"metadata":{},"name":"open_date_time","nullable":true,"type":"string"},{"metadata":{},"name":"report_source","nullable":true,"type":"string"},{"metadata":{},"name":"reported_issue","nullable":true,"type":"string"},{"metadata":{},"name":"resolved_date","nullable":true,"type":"string"},{"metadata":{},"name":"source_category","nullable":true,"type":"string"},{"metadata":{},"name":"workorder_","nullable":true,"type":"string"}],"type":"struct"}')
    schema = StructType.fromJson(schema_json)
    data = spark.createDataFrame(results_json, schema=schema)

Process and transform begins here, anything before this should handle the case where no updates are available.

In [None]:
if CONTINUE_FLAG:
    # Drop unnecesary columns
    data = data.drop('workorder_')
    data = data.drop('incident_address')
    data = data.drop('lat_long')
    data = data.drop('additional_questions')

    # Change col names for clarity
    data = data.withColumn('issue_id', data['reported_issue'])
    data = data.drop('reported_issue')

    # Parse out dates and times for timestamp col's (more usable w/ visualization)
    data = data.withColumn('last_updated_ymd', F.date_format('last_updated', 'yyyy-MM-dd'))
    data = data.withColumn('last_updated_hms', F.date_format('last_updated', 'hh:mm:ss'))
    data = data.drop('last_updated')

    data = data.withColumn('resolved_date_ymd', F.date_format('resolved_date', 'yyyy-MM-dd'))
    data = data.withColumn('resolved_date_hms', F.date_format('resolved_date', 'hh:mm:ss'))
    data = data.fillna(float('nan'), subset=["resolved_date_ymd", 'resolved_date_hms'])
    data = data.drop('resolved_date')

    data = data.withColumn('open_date_time_ymd', F.date_format('open_date_time', 'yyyy-MM-dd'))
    data = data.withColumn('open_date_time_hms', F.date_format('open_date_time', 'hh:mm:ss'))
    data = data.drop('open_date_time')

    # Lower any cols w/ different cases
    data = data.withColumn('current_status', F.lower('current_status'))

    # Add timestamp to new records
    data = data.withColumn('ingest_timestamp', F.unix_timestamp(F.current_timestamp()))

In [None]:
# Get count of new rows
if CONTINUE_FLAG:
    added_rows = data.count()

In [None]:
# Post new clean data
if CONTINUE_FLAG:
  dash_collection = db.get_collection('dashboard_data')
  operations = [
      UpdateOne({"issue_id": row["issue_id"]}, {"$set": row}, upsert=True)
      for row in (r.asDict() for r in data.toLocalIterator())
  ]
  
  result = dash_collection.bulk_write(operations)

In [None]:
# Post metadata after completion
if CONTINUE_FLAG:
    doc = {
        "api_last_modified" : last_mod,
        "notebook_timestamp" : datetime.now().timestamp(),
        "inserted_records" : added_rows,
    }

    md.insert_one(doc)

InsertOneResult(ObjectId('691ff059ec21ebef69be9346'), acknowledged=True)

In [None]:
# Cleanup and reporting
if CONTINUE_FLAG:
    logger.info(f'INFO - Import successfully completed. {added_rows} rows from the API, {result.bulk_api_result['nUpserted']} new and {result.bulk_api_result['nModified']} changed to \"{dash_collection.name}\".')
    mongo.close()
    spark.stop()