Notebook contains 2 pipelines and 1 REST Api scirpt


In [0]:
#Imports to run API Data Scraping script

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from datetime import datetime
import logging
import traceback
import json
import os
import sys

In [0]:
#Set up functions to house each section of the API data scraping script

#Setup basic logging function for tracking and troubleshooting

def setup_logging():
    logging.basicConfig(
        level = logging.INFO,
        format = '%(asctime)s - %(levelname)s - %(message)s',
        handlers = [logging.FileHandler(f'logs/api_data_pull_run_{datetime.now().strftime('%Y%m%d')}.log'),
                    logging.StreamHandler(sys.stdout)
                   ]
    )

    return logging.getLogger(__name__)

#Setup retry function incase there is a timeout for the original data request pull

def retry_get_data_http_url_timeout(logger, url, retries = 5, backoff_factor = 1, timeout = 10): 

    retry = Retry( #retry rules 
        total = retries, 
        connect = retries,
        read = retries,
        backoff_factor = backoff_factor,
        status_forcelist = [429, 500, 502, 503, 504], #HTTP status codes worth retrying
        allowed_methods = ['GET'], #only allow data pulls
        raise_on_status= False )

    adapter = HTTPAdapter(max_retries=retry) #use an adapter to handle the HTTP network
    session = requests.Session() #use session to automatically apply retry rules
    session.mount('http://', adapter) #only want http/https urls.
    session.mount('https://', adapter)
    try:
        logger.info(f'Reattempting to pull data {retries} times with 10 second intervals')
        global data
        response = session.get(url, timeout=timeout)
        data = response.json()
        logger.info(f'Data stored in object')
        logger.info(f'Ending API data pull script.')
        
    except requests.exceptions.HTTPError as e:
        logger.error(f'Failed to pull data')
        logger.error(f'HTTP error: {e}')
        logger.error(traceback.format_exc())
        logger.info(f'Ending API data pull script.')
        return
  
    except requests.exceptions.RequestException as e:
        logger.error(f'Failed to pull data')
        logger.error(f'Other error: {e}')
        logger.error(traceback.format_exc())
        logger.info(f'Ending API data pull script.')
        return
    
#Setup function to do 1 data pull

def get_data_from_url(logger, url, retries, backoff_factor, params=None, timeout=10):
    try:
        response = requests.get(url, params) #used requests.get for a single try
        response.raise_for_status()
        logger.info(f'Response code {response}')
        global data
        data = response.json()
        logger.info(f'Data successfully pulled')
        logger.info(f'Ending API data pull script.')

    except requests.exceptions.Timeout as e:
        logger.error(f'Request timed out; 10 second wait before attempting to repull.')
        logger.error(traceback.format_exc())
        retry_get_data_http_url_timeout(logger, url, retries, backoff_factor, timeout)
        
    except requests.exceptions.HTTPError as e:
        logger.error(f'HTTP error: {e}')
        logger.error(traceback.format_exc())
        logger.info(f'Ending API data pull script.')
        return
        
    except requests.exceptions.RequestException as e:
        logger.error(f'Other error: {e}')
        logger.error(traceback.format_exc())
        logger.info(f'Ending API data pull script.')
        return

def save_json_file(logger, file_name):
    if not data:
        logger.info(f'url did not contain text')
    else:
        with open(f'{file_name}.json', 'w') as f:
            json.dump(data, f)

def main(url, file_name, params, retries, backoff_factor, timeout):
    
    data = {}
    file_name = file_name
    url = url
    params = params
    retries = retries
    backoff_factor = backoff_factor
    timeout = timeout
    
    os.makedirs('logs', exist_ok = True)
    os.makedirs('data/api_data_pull/logs', exist_ok = True)
    logger = setup_logging()
    logger.info(f'Starting API data pull...')
    try:
        get_data_from_url(logger, url, params, timeout)

    except requests.exceptions.Timeout as e:
        logger.error(f'Request timed out; 10 second wait before attempting to repull.')
        logger.error(traceback.format_exc())
        retry_get_data_http_url_timeout(logger, url, retries, backoff_factor, timeout)

    save_json_file(logger, file_name)
    logger.info(f'Session finished')
                

In [0]:
#API data pull was done outside of Databricks; having issue running requests library in the Free environment

url = 'https://data.lacity.org/resource/2nrs-mtv8.json'
file_name = 'la_crime_data_2020_2026'
params = None
retries = 3
backoff_factor = 1
timeout = 10
main(url, file_name, None, retries, backoff_factor, timeout)

In [0]:
#imports to build la_crime_data_pipeline

from pypdf import PdfReader
from datetime import datetime
import re
import logging
import traceback
import json
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import row_number
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pathlib import Path

In [0]:
%sql
USE CATALOG api_crime_data_2020_to_2026_01_30;
USE SCHEMA default;


In [0]:
#Discovery work > ignore cell

spark = SparkSession.builder.appName('la_crime_api_data_pipeline').getOrCreate()
df = spark.read.text('/Volumes/api_crime_data_2020_to_2026_01_30/default/full_crime_data/la_crime_data_2020_2026.json')
#display(df)

In [0]:
#Discovery work > ignore cell

#values of 1 sample cell 

{"dr_no": "211507896", "date_rptd": "2021-04-11T00:00:00.000", "date_occ": "2020-11-07T00:00:00.000", "time_occ": "0845", "area": "15", "area_name": "N Hollywood", "rpt_dist_no": "1502", "part_1_2": "2", "crm_cd": "354", "crm_cd_desc": "THEFT OF IDENTITY", "mocodes": "0377", "vict_age": "31", "vict_sex": "M", "vict_descent": "H", "premis_cd": "501", "premis_desc": "SINGLE FAMILY DWELLING", "status": "IC", "status_desc": "Invest Cont", "crm_cd_1": "354", "location": "7800    BEEMAN                       AV", "lat": "34.2124", "lon": "-118.4092"}

In [0]:
#Discovery work > ignore cell

crime_df = spark.read.json('/Volumes/api_crime_data_2020_to_2026_01_30/default/full_crime_data/la_crime_data_2020_2026.json')
inferred_crime_schema = crime_df.printSchema()

In [0]:
#Discovery work > ignore cell

inferred_crime_schema = '''
root
 |-- area: string (nullable = true)
 |-- area_name: string (nullable = true)
 |-- crm_cd: string (nullable = true)
 |-- crm_cd_1: string (nullable = true)
 |-- crm_cd_2: string (nullable = true)
 |-- crm_cd_3: string (nullable = true)
 |-- crm_cd_4: string (nullable = true)
 |-- crm_cd_desc: string (nullable = true)
 |-- cross_street: string (nullable = true)
 |-- date_occ: string (nullable = true)
 |-- date_rptd: string (nullable = true)
 |-- dr_no: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- location: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- mocodes: string (nullable = true)
 |-- part_1_2: string (nullable = true)
 |-- premis_cd: string (nullable = true)
 |-- premis_desc: string (nullable = true)
 |-- rpt_dist_no: string (nullable = true)
 |-- status: string (nullable = true)
 |-- status_desc: string (nullable = true)
 |-- time_occ: string (nullable = true)
 |-- vict_age: string (nullable = true)
 |-- vict_descent: string (nullable = true)
 |-- vict_sex: string (nullable = true)
 |-- weapon_desc: string (nullable = true)
 |-- weapon_used_cd: string (nullable = true)
 '''

In [0]:
#Discovery work > ignore cell

#function to parse the printSchema output into easier format to use
def inferred_schema_formatter(inferred_schema_text, type_map):
    
    inferred_schema_fields = []
    
    for raw_line in inferred_schema_text.split('\n'):
        line = raw_line.strip()
        if line.startswith('|--'):
            line = line[4:]
            line_2_parts = line.split(':', 2)
            col_name = line_2_parts[0]
    
            line_2nd_2_parts = line_2_parts[1].strip().split(' ', 2)
            data_type = line_2nd_2_parts[0].lower()
            if data_type in type_map:
                spark_type = type_map[data_type]
            else:
                raise ValueError(f"Unsupported type: {data_type}")
    
            is_nullable = 'true' in line_2nd_2_parts[2].lower()
    
            inferred_schema_fields.append(StructField(col_name, spark_type, is_nullable))
            
    return inferred_schema_fields

In [0]:
#Discovery work > ignore cell

#converts into a copy-paste format for a json structure
def pretty_print_inferred_schema(inferred_schema_fields):
    
    schema_output = ['StructType([']

    for line in inferred_schema_fields:
        schema_output.append(f"    StructField('{line.name}', {line.dataType}, {line.nullable}),")
        
    schema_output.append("])")
    
    return '\n'.join(schema_output)

In [0]:
#Discovery work > ignore cell

#main function to house everything together
def main_inferred_schema_print(inferred_schema_text):

    type_map = {
        "string": StringType(),
        "int": IntegerType(),
        "integer": IntegerType(),
        "double": DoubleType(),
        "boolean": BooleanType(),
        "timestamp": TimestampType()
    }

    inferred_schema_fields = inferred_schema_formatter(inferred_schema_text, type_map)
    
    schema_str = pretty_print_inferred_schema(inferred_schema_fields)
    
    print(schema_str)

In [0]:
#Discovery work > ignore cell

#main_inferred_schema_print(inferred_crime_schema)

StructType([
    StructField('area', StringType(), True),
    StructField('area_name', StringType(), True),
    StructField('crm_cd', StringType(), True),
    StructField('crm_cd_1', StringType(), True),
    StructField('crm_cd_2', StringType(), True),
    StructField('crm_cd_3', StringType(), True),
    StructField('crm_cd_4', StringType(), True),
    StructField('crm_cd_desc', StringType(), True),
    StructField('cross_street', StringType(), True),
    StructField('date_occ', StringType(), True),
    StructField('date_rptd', StringType(), True),
    StructField('dr_no', StringType(), True),
    StructField('lat', StringType(), True),
    StructField('location', StringType(), True),
    StructField('lon', StringType(), True),
    StructField('mocodes', StringType(), True),
    StructField('part_1_2', StringType(), True),
    StructField('premis_cd', StringType(), True),
    StructField('premis_desc', StringType(), True),
    StructField('rpt_dist_no', StringType(), True),
    StructF

In [0]:
#Discovery work > ignore cell

crime_df = spark.read.json('/Volumes/api_crime_data_2020_to_2026_01_30/default/full_crime_data/la_crime_data_2020_2026.json', schema=crime_schema)
#display(crime_df) 

In [0]:
#looked up API documents and need to make data type adjustments to fields:
#date_occ & date_rptd should be dates
#lat & lon should be double, but getting nulls after conversion
#part_1_2 & premis_cd should be integers
table_schema_bronze = StructType([
    StructField('area', StringType(), True),
    StructField('area_name', StringType(), True),
    StructField('crm_cd', StringType(), True),
    StructField('crm_cd_1', StringType(), True),
    StructField('crm_cd_2', StringType(), True),
    StructField('crm_cd_3', StringType(), True),
    StructField('crm_cd_4', StringType(), True),
    StructField('crm_cd_desc', StringType(), True),
    StructField('cross_street', StringType(), True),
    StructField('date_occ', DateType(), True),
    StructField('date_rptd', DateType(), True),
    StructField('dr_no', StringType(), True),
    StructField('lat', StringType(), True),
    StructField('location', StringType(), True),
    StructField('lon', StringType(), True),
    StructField('mocodes', StringType(), True),
    StructField('part_1_2', IntegerType(), True),
    StructField('premis_cd', IntegerType(), True),
    StructField('premis_desc', StringType(), True),
    StructField('rpt_dist_no', StringType(), True),
    StructField('status', StringType(), True),
    StructField('status_desc', StringType(), True),
    StructField('time_occ', StringType(), True),
    StructField('vict_age', StringType(), True),
    StructField('vict_descent', StringType(), True),
    StructField('vict_sex', StringType(), True),
    StructField('weapon_desc', StringType(), True),
    StructField('weapon_used_cd', StringType(), True),
])

In [0]:
    table_schema_silver = StructType([
        StructField('id', IntegerType(), True),
        StructField('area', StringType(), True),
        StructField('area_name', StringType(), True),
        StructField('crm_cd', StringType(), True),
        StructField('crm_cd_1', StringType(), True),
        StructField('crm_cd_2', StringType(), True),
        StructField('crm_cd_3', StringType(), True),
        StructField('crm_cd_4', StringType(), True),
        StructField('crm_cd_desc', StringType(), True),
        StructField('cross_street', StringType(), True),
        StructField('date_occ', DateType(), True),
        StructField('date_rptd', DateType(), True),
        StructField('dr_no', StringType(), True),
        StructField('latitude', DoubleType(), True),
        StructField('location', StringType(), True),
        StructField('longitude', DoubleType(), True),
        StructField('mocodes', StringType(), True),
        StructField('part_1_2', IntegerType(), True),
        StructField('premis_cd', IntegerType(), True),
        StructField('premis_desc', StringType(), True),
        StructField('rpt_dist_no', StringType(), True),
        StructField('status', StringType(), True),
        StructField('status_desc', StringType(), True),
        StructField('time_occ', StringType(), True),
        StructField('vict_age', StringType(), True),
        StructField('vict_descent', StringType(), True),
        StructField('vict_sex', StringType(), True),
        StructField('weapon_desc', StringType(), True),
        StructField('weapon_used_cd', StringType(), True),
])


In [0]:
#Set-up logging for all steps in the pipeline

def setup_logging():
    os.makedirs('logs', exist_ok=True)
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(f'logs/api_la_crime_data_pipe_line_setup_{datetime.now().strftime('%Y%m%d')}.log'),
            logging.StreamHandler(sys.stdout)
        ]
    )
    
    return logging.getLogger(__name__)

In [0]:
#Function to identify correct files and select the latest one for transformation

def get_correct_file(logger):
    mo_codes_files_count = 0
    volume = []
    directory_path = '/Volumes/api_crime_data_2020_to_2026_01_30/default/full_crime_data'
    for dirpath,_,filenames in os.walk(directory_path):
        for file in filenames:
            if 'MO_CODES' in file:
                mo_codes_files_count += 1
                logger.info(f'{file} is in the directory')
                volume.append(os.path.join(dirpath, file))
            else:
                pass
            logger.info(f'{mo_codes_files_count} MO code files.')
    volume_sorted = sorted(volume, reverse=True)
    upload_mo_codes_file = volume_sorted[0]
    logger.info(f'Loading lastest MO codes file: {upload_mo_codes_file}')
    logger.info(f'Operation complete.')
    return upload_mo_codes_file

In [0]:
def appended_mo_codes_pages(logger, reader):

    logger.info(f'Combining all MO pages.')

    full_text = []
    for page in reader.pages:
        page_text = page.extract_text()
        full_text.append(page_text)
    logger.info(f'Transformation complete.')
    return full_text

In [0]:
def extract_only_mo_codes_with_regex(logger, page_text_in_lines):

    logger.info(f'Removing junk text.')

    pattern = r'^[0-9][0-9][0-9][0-9]'
    parsed_mo_codes = []

    for page_text in page_text_in_lines:
        if re.match(pattern, page_text):
            parsed_mo_codes.append(page_text)
    #print(parsed_mo_codes)

    logger.info(f'Transformation complete.')

    return parsed_mo_codes


In [0]:
def repackage_mo_codes_into_2_column_structure(logger, parsed_mo_codes):

    logger.info(f'Reformatting MO codes and MO descriptions into usable format.')

    mo_code_list = []
    mo_code_description_list = []

    for line in parsed_mo_codes:
        mo_code, mo_code_description = line.split(' ', 1)
        mo_code_list.append(mo_code)
        mo_code_description_list.append(mo_code_description)
    
    mo_code_desc_structure = list(zip(mo_code_list, mo_code_description_list))

    logger.info(f'Transformation complete.')

    return mo_code_desc_structure


In [0]:
def separate_mo_codes_pages(logger, all_pages_text):

    logger.info(f'Separating MO codes and MO descriptions from text.')

    full_text_combined = ''.join(all_pages_text)
    full_text_new_line_removed = re.sub(r'\n',',', full_text_combined)
    full_text_split = full_text_new_line_removed.split(',')

    logger.info(f'Transformation complete.')

    return full_text_split

In [0]:
def create_spark_dataframe(logger, mo_code_desc_structure):

    logger.info(f'Turning object into Spark dataframe')

    spark = SparkSession.builder.appName('mo_codes').getOrCreate()

    df = spark.createDataFrame(mo_code_desc_structure,['mo_codes', 'mo_descriptions'])

    logger.info(f'Transformation complete.')

    return df

In [0]:
def create_or_replace_table(logger, df, table_name, schema_path):

    #logger.info(f'Checking to see if {table_name} exists.')
    
    if spark.catalog.tableExists(f'{schema_path}.{table_name}'):
        logger.info("Table exists; overwriting existing table with updated data.")
        df.write.mode("overwrite").saveAsTable(f'{schema_path}.{table_name}')
    else:
        logger.info('Table does not exist; table will be created.')
        #table = spark.createDataFrame([], schema)
        df.write.format('delta').mode('ignore').saveAsTable(f'{schema_path}.{table_name}')
        logger.info(f'Created {table_name} table.')


In [0]:
def copy_as_consumption_view(logger, table_name, view_name, schema_path):

    logger.info(f'Cloning main table into consumable view.')

    spark.sql(f"""CREATE OR REPLACE TABLE {schema_path}.{view_name} SHALLOW CLONE {schema_path}.{table_name}""")

    logger.info(f'Cloning complete.')
    
    return

In [0]:
#Check if a table exists in a schema; create an empty table if it does not exist.

def check_or_create_table(logger, table_name, schema_path, schema):

    logger.info(f'Checking to see if {table_name} exists.')
    
    if spark.catalog.tableExists(f'{schema_path}.{table_name}'):
        logger.info("Table exists; table creation skipped.")
    else:
        logger.info('Table does not exist; table will be created.')
        table = spark.createDataFrame([], schema)
        table.write.format('delta').mode('ignore').saveAsTable(f'{schema_path}.{table_name}')
        logger.info(f'Created {table_name} table.')


In [0]:
#check if a silver table exists; if it does, start batch unique id # from existing count; otherwise start at 1

def check_silver_table(logger, silver_table_name, schema_path):

    flag = 0
    logger.info(f'Checking to see if {silver_table_name} exists.')
    
    if spark.catalog.tableExists(f'{schema_path}.{silver_table_name}'):
        logger.info("Table exists; continue unique id # from existing count.")
        flag += 1
    else:
        logger.info('Table does not exist; start unique id # from 1.')

    return flag


In [0]:
#Function to identify correct files to load into bronze level table

def get_correct_files(logger, volume_path, volume_identifier):
    volume = []
    files_count = 0
    for vol_path,_,filenames in os.walk(volume_path):
        for file in filenames:
            if volume_identifier in file:
                files_count += 1
                logger.info(f'{file} is in the volume folder')
                volume.append(os.path.join(vol_path, file))
            else:
                pass
    logger.info(f'{files_count} files to load')
    return volume

In [0]:
#Function to read a json data file for the bronze level table

def json_data_extractor(logger, spark, volume_path, table_schema_bronze):
    
    full_df = spark.createDataFrame([], table_schema_bronze)

    for file in volume_path:

        logger.info(f'Extracting data from {file}')

        df = spark.read.option('multiline', True).json(file, schema = table_schema_bronze)
        total_row_count = df.count()
        total_column_count = len(df.columns)

        logger.info(f'Extracted {total_row_count} rows with {total_column_count} columns')
        
        full_df = full_df.union(df)
    
    logger.info(f'Extracted {full_df.count()} rows with {len(full_df.columns)} columns')

    return full_df


In [0]:
#Function to write/append data to bronze level table

def write_to_table(logger, df, schema_path, table_name):

    logger.info(f'Writing data to {schema_path}')

    df.write.format('delta').mode('append').saveAsTable(f'{schema_path}.{table_name}')

    total_row_count = df.count()

    logger.info(f'Wrote {total_row_count} rows to {schema_path}')
    
    return


In [0]:
#Function to read a SQL data table as a dataframe

def read_table_as_df(logger, spark, schema_path, table_name):

    logger.info(f'Reading data from {schema_path}.{table_name}')

    df = spark.read.table(f'{schema_path}.{table_name}')
    total_row_count = df.count()

    logger.info(f'Read {total_row_count} rows from {schema_path}.{table_name}')
    
    return df


In [0]:
#function that removes excess spacing in between character strings

def remove_excess_spaces_in_between_strings(logger, df):

   logger.info(f'Removing excess spaces from location')

   df = df.withColumn('location', F.trim(F.regexp_replace('location', r'\s+', ' ')))

   logger.info(f'Removing excess spaces from cross_street')

   df = df.withColumn('cross_street', F.trim(F.regexp_replace('cross_street', r'\s+', ' ')))

   logger.info(f'Transformations complete.')

   return df
   

In [0]:
#function that converts StringType to DoubleType for the lat & lon columns
#Could not initially set as DoubleType() in Schema, would produce nulls

def convert_to_double(logger, df):
    logger.info(f'Converting lat from StringType into DoubleType.')
    df = df.withColumn('latitude', F.trim(df['lat']).cast(DoubleType()))
    logger.info(f'Converting lon from StringType into DoubleType.')
    df = df.withColumn('longitude', F.trim(df['lon']).cast(DoubleType()))
    logger.info(f'Dropping old lat & lon columns.')
    df = df.drop('lat', 'lon')
    logger.info(f'Transformations complete.')
    return df


In [0]:
#an a unique id to each record based on their dr_no and utilizing the flag created earlier

def add_unique_id(logger, df, schema_path, table_name, flag):

    window_spec = Window.orderBy('dr_no')

    logger.info(f'Checking to see if {schema_path}.{table_name} exists as a silver-level table.')

    if flag == 1:

        logger.info('Table exists; continuing unique_id off latest number.')

        latest_unique_id = spark.sql(f"SELECT MAX(id) FROM {schema_path}.{table_name}").collect()[0][0]
        df_with_unique_id = df.withColumn('id', row_number().over(window_spec) + latest_unique_id)

        logger.info(f'Transformation complete.')
    else:
        logger.info('Table does not exist; starting unique_id from 1.')

        df_with_unique_id = df.withColumn('id', row_number().over(window_spec))

        logger.info(f'Transformation complete')
        
    return df_with_unique_id


In [0]:
def separate_mo_codes(logger, df):
    
    logger.info(f'Splitting MO codes into separate records based on dr_no.')

    df_with_split = df.withColumn('array_mocodes',F.split(F.col('mocodes'), ' '))
    df_with_split_exploded = df_with_split.withColumn('array_mocode', F.explode('array_mocodes'))

    logger.info(f'Transformations complete.')

    logger.info(f'Dropping old mocodes column.')

    df_with_split_exploded = df_with_split_exploded.drop('mocodes', 'array_mocodes')

    logger.info(f'Renaming array_mocode to mocode.')

    df_with_split_exploded = df_with_split_exploded.withColumnRenamed('array_mocode', 'mocodes')

    logger.info(f'Transformations complete.')

    return df_with_split_exploded


In [0]:
def rearrange_columns(logger, df):

    logger.info(f'Rearranging columns.')

    df = df.select('id',
                   'area', 
                   'area_name', 
                   'crm_cd', 
                   'crm_cd_1', 
                   'crm_cd_2', 
                   'crm_cd_3', 
                   'crm_cd_4', 
                   'crm_cd_desc', 
                   'cross_street', 
                   'date_occ', 
                   'date_rptd', 
                   'dr_no', 
                   'latitude', 
                   'location', 
                   'longitude', 
                   'mocodes', 
                   'part_1_2', 
                   'premis_cd', 
                   'premis_desc', 
                   'rpt_dist_no', 
                   'status', 
                   'status_desc', 
                   'time_occ', 
                   'vict_age', 
                   'vict_descent', 
                   'vict_sex', 
                   'weapon_desc', 
                   'weapon_used_cd')
    
    logger.info(f'Transformations complete.')

    return df

In [0]:
#function that converts dataframe's columns into a readible copy-paste format.
def pretty_print_df_columns(df):
    columns_output = []
    for column in df.columns:
        columns_output.append(f"'{column},'")
    return '\n'.join(columns_output)

In [0]:
#function that makes a gold table by joining 2 silver tables into 1

def produce_gold_table(logger,spark, schema_path, main_table_name, sub_table_name, gold_table_name):

    logger.info(f'Preparing {main_table_name} to {schema_path} as a gold-level table.')

    gold_table_name = 'la_crime_data_vw'
    spark = SparkSession.builder.appName('join_la_crime_data_and_mo_code_data.').getOrCreate()
    
    logger.info(f'Reading {main_table_name} & {sub_table_name} as a gold-level table.')
    la_crime_data_df = read_table_as_df(logger, spark, schema_path, main_table_name)
    mo_codes_df = read_table_as_df(logger, spark, schema_path, sub_table_name)

    logger.info(f'Joining sub tables to {main_table_name}.')

    full_la_crime_data_df = la_crime_data_df.join(mo_codes_df, la_crime_data_df['mocodes'] == mo_codes_df['mo_codes'], how='left')

    logger.info(f'Dropping duplicate columns in the dataframe.')
    
    full_la_crime_data_df = full_la_crime_data_df.drop('mocodes')

    logger.info(f'Saving table as {gold_table_name}.')

    full_la_crime_data_df.write.mode('overwrite').saveAsTable(f'{schema_path}.{gold_table_name}')

    logger.info(f'Operation complete.')

    return


In [0]:
def la_crime_data_pipeline_main(table_schema_bronze, table_schema_silver):

    logger = setup_logging()

    latest_mo_codes_file = get_correct_file(logger)

    reader = PdfReader(latest_mo_codes_file)

    all_pages_text = appended_mo_codes_pages(logger, reader)

    page_text_in_lines = separate_mo_codes_pages(logger, all_pages_text)
    
    parsed_mo_codes = extract_only_mo_codes_with_regex(logger, page_text_in_lines)

    mo_code_desc_structure = repackage_mo_codes_into_2_column_structure(logger, parsed_mo_codes)

    df = create_spark_dataframe(logger, mo_code_desc_structure)

    mo_codes_schema_path = 'api_crime_data_2020_to_2026_01_30.default'
    table_name = 'mo_codes_mt'
    view_name  = 'mo_codes_vw'
    create_or_replace_table(logger, df, table_name, mo_codes_schema_path)
    copy_as_consumption_view(logger,table_name, view_name, mo_codes_schema_path)

    logger.info('MO Codes successfully loaded into table.')

    logger.info('Creating main la_crime_data table.')

    bronze_schema = table_schema_bronze
    schema_path = 'api_crime_data_2020_to_2026_01_30.default'
    bronze_table_name = 'la_crime_data_raw'
    check_or_create_table(logger, bronze_table_name, schema_path, bronze_schema)

    volume_path = '/Volumes/api_crime_data_2020_to_2026_01_30/default/crime_data'
    volume_identifier = 'chunk'
    volume = get_correct_files(logger, volume_path, volume_identifier)

    la_crime_bronze_df = json_data_extractor(logger, spark, volume, bronze_schema)

    write_to_table(logger, la_crime_bronze_df, schema_path, bronze_table_name)

    silver_schema = table_schema_silver
    silver_table_name = 'la_crime_data_mt'
    flag = check_silver_table(logger, silver_table_name, schema_path)
    
    la_crime_silver_df = la_crime_bronze_df
    excess_la_crime_silver_df = remove_excess_spaces_in_between_strings(logger, la_crime_silver_df)
    double_la_crime_silver_df = convert_to_double(logger, excess_la_crime_silver_df)
    unique_la_crime_silver_df = add_unique_id(logger, double_la_crime_silver_df, schema_path, silver_table_name, flag)
    separated_la_crime_silver_df = separate_mo_codes(logger, unique_la_crime_silver_df)
    clean_la_crime_silver_df = rearrange_columns(logger, separated_la_crime_silver_df)

    write_to_table(logger, clean_la_crime_silver_df, schema_path, silver_table_name)

    main_table_name = 'la_crime_data_mt'
    sub_table_name = 'mo_codes_vw'
    gold_table_name = 'la_crime_data_vw'
    produce_gold_table(logger,spark, schema_path, main_table_name, sub_table_name, gold_table_name)

    logger.info("LA crime data successfully loaded into table.")

    return


In [0]:
la_crime_data_pipeline_main(table_schema_bronze, table_schema_silver)

2026-02-11 05:36:18,195 - INFO - MO_CODES_Numerical_20180627.pdf is in the directory
2026-02-11 05:36:18,196 - INFO - 1 MO code files.
2026-02-11 05:36:18,197 - INFO - MO_CODES_Numerical_20191119.pdf is in the directory
2026-02-11 05:36:18,198 - INFO - 2 MO code files.
2026-02-11 05:36:18,198 - INFO - 2 MO code files.
2026-02-11 05:36:18,216 - INFO - 2 MO code files.
2026-02-11 05:36:18,217 - INFO - Loading lastest MO codes file: /Volumes/api_crime_data_2020_to_2026_01_30/default/full_crime_data/MO_CODES_Numerical_20191119.pdf
2026-02-11 05:36:18,217 - INFO - Operation complete.
2026-02-11 05:36:18,233 - INFO - Combining all MO pages.
2026-02-11 05:36:18,516 - INFO - Transformation complete.
2026-02-11 05:36:18,517 - INFO - Separating MO codes and MO descriptions from text.
2026-02-11 05:36:18,518 - INFO - Transformation complete.
2026-02-11 05:36:18,519 - INFO - Removing junk text.
2026-02-11 05:36:18,521 - INFO - Transformation complete.
2026-02-11 05:36:18,521 - INFO - Reformatting 



2026-02-11 05:36:42,003 - INFO - Wrote 2870 rows to api_crime_data_2020_to_2026_01_30.default
2026-02-11 05:36:42,004 - INFO - Preparing la_crime_data_mt to api_crime_data_2020_to_2026_01_30.default as a gold-level table.
2026-02-11 05:36:42,006 - INFO - Reading la_crime_data_mt & mo_codes_vw as a gold-level table.
2026-02-11 05:36:42,007 - INFO - Reading data from api_crime_data_2020_to_2026_01_30.default.la_crime_data_mt
2026-02-11 05:36:42,367 - INFO - Read 2870 rows from api_crime_data_2020_to_2026_01_30.default.la_crime_data_mt
2026-02-11 05:36:42,368 - INFO - Reading data from api_crime_data_2020_to_2026_01_30.default.mo_codes_vw
2026-02-11 05:36:42,857 - INFO - Read 824 rows from api_crime_data_2020_to_2026_01_30.default.mo_codes_vw
2026-02-11 05:36:42,858 - INFO - Joining sub tables to la_crime_data_mt.
2026-02-11 05:36:42,860 - INFO - Dropping duplicate columns in the dataframe.
2026-02-11 05:36:42,861 - INFO - Saving table as la_crime_data_vw.
2026-02-11 05:36:46,798 - INFO -

In [0]:
%sql
SELECT * FROM api_crime_data_2020_to_2026_01_30.default.la_crime_data_vw

id,area,area_name,crm_cd,crm_cd_1,crm_cd_2,crm_cd_3,crm_cd_4,crm_cd_desc,cross_street,date_occ,date_rptd,dr_no,latitude,location,longitude,part_1_2,premis_cd,premis_desc,rpt_dist_no,status,status_desc,time_occ,vict_age,vict_descent,vict_sex,weapon_desc,weapon_used_cd,mo_codes,mo_descriptions
1,11,Northeast,812,812,860.0,,,CRM AGNST CHLD (13 OR UNDER) (14-15 & SUSP 10 YRS OLDER),,2020-07-07,2020-10-29,201115217,34.1107,3000 ACRESITE ST,-118.2589,,,YARD (RESIDENTIAL/BUSINESS),1133,AO,Adult Other,1400,14,H,F,UNKNOWN WEAPON/OTHER WEAPON,500.0,1259.0,Victim is 14 years old thru 17 years old
1,11,Northeast,812,812,860.0,,,CRM AGNST CHLD (13 OR UNDER) (14-15 & SUSP 10 YRS OLDER),,2020-07-07,2020-10-29,201115217,34.1107,3000 ACRESITE ST,-118.2589,,,YARD (RESIDENTIAL/BUSINESS),1133,AO,Adult Other,1400,14,H,F,UNKNOWN WEAPON/OTHER WEAPON,500.0,448.0,Grabbed
1,11,Northeast,812,812,860.0,,,CRM AGNST CHLD (13 OR UNDER) (14-15 & SUSP 10 YRS OLDER),,2020-07-07,2020-10-29,201115217,34.1107,3000 ACRESITE ST,-118.2589,,,YARD (RESIDENTIAL/BUSINESS),1133,AO,Adult Other,1400,14,H,F,UNKNOWN WEAPON/OTHER WEAPON,500.0,304.0,Ate/drank on premises
1,11,Northeast,812,812,860.0,,,CRM AGNST CHLD (13 OR UNDER) (14-15 & SUSP 10 YRS OLDER),,2020-07-07,2020-10-29,201115217,34.1107,3000 ACRESITE ST,-118.2589,,,YARD (RESIDENTIAL/BUSINESS),1133,AO,Adult Other,1400,14,H,F,UNKNOWN WEAPON/OTHER WEAPON,500.0,1822.0,Stranger
1,11,Northeast,812,812,860.0,,,CRM AGNST CHLD (13 OR UNDER) (14-15 & SUSP 10 YRS OLDER),,2020-07-07,2020-10-29,201115217,34.1107,3000 ACRESITE ST,-118.2589,,,YARD (RESIDENTIAL/BUSINESS),1133,AO,Adult Other,1400,14,H,F,UNKNOWN WEAPON/OTHER WEAPON,500.0,500.0,Sex related acts
1,11,Northeast,812,812,860.0,,,CRM AGNST CHLD (13 OR UNDER) (14-15 & SUSP 10 YRS OLDER),,2020-07-07,2020-10-29,201115217,34.1107,3000 ACRESITE ST,-118.2589,,,YARD (RESIDENTIAL/BUSINESS),1133,AO,Adult Other,1400,14,H,F,UNKNOWN WEAPON/OTHER WEAPON,500.0,522.0,Touched
2,13,Newton,930,930,,,,CRIMINAL THREATS - NO WEAPON DISPLAYED,,2020-04-28,2020-04-28,201310056,33.9921,1300 E 56TH ST,-118.2521,,,"MULTI-UNIT DWELLING (APARTMENT, DUPLEX, ETC)",1375,IC,Invest Cont,5,26,H,F,VERBAL THREAT,511.0,443.0,Threaten to harm victim (other than kill)
3,13,Newton,210,210,,,,ROBBERY,45TH ST,2020-04-27,2020-04-27,201310061,34.0025,AVALON BL,-118.2653,,,GAS STATION,1353,IC,Invest Cont,1540,17,B,M,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",400.0,1822.0,Stranger
3,13,Newton,210,210,,,,ROBBERY,45TH ST,2020-04-27,2020-04-27,201310061,34.0025,AVALON BL,-118.2653,,,GAS STATION,1353,IC,Invest Cont,1540,17,B,M,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",400.0,344.0,Removes vict property
3,13,Newton,210,210,,,,ROBBERY,45TH ST,2020-04-27,2020-04-27,201310061,34.0025,AVALON BL,-118.2653,,,GAS STATION,1353,IC,Invest Cont,1540,17,B,M,"STRONG-ARM (HANDS, FIST, FEET OR BODILY FORCE)",400.0,371.0,Gang affiliation questions asked/made gang statement


In [0]:
%sql
DROP TABLE api_crime_data_2020_to_2026_01_30.default.la_crime_data_raw;
DROP TABLE api_crime_data_2020_to_2026_01_30.default.la_crime_data_mt;
DROP TABLE api_crime_data_2020_to_2026_01_30.default.la_crime_data_vw;
DROP TABLE api_crime_data_2020_to_2026_01_30.default.mo_codes_mt;
DROP TABLE api_crime_data_2020_to_2026_01_30.default.mo_codes_vw;