# Bronze Data Load

**Purpose:**  
Ingest raw data from all sources into the Bronze layer with **no business logic** or feature engineering—only the bare minimum of cleaning required for schema alignment.

**What this notebook does:**  
1. **Reads** data from:  
   - San Jose API (JSON → DataFrame)  
   - Dallas CSV  
   - SoCo CSV  
2. **Renames** columns to a common, lowercase, snake_case style  
3. **Tags** each row with its `source` (provenance)  
4. **Deduplicates** any exact‐duplicate records  
5. **Preserves** every original field (e.g. SoCo’s `Date of Birth`, Dallas’s `Kennel_Number`) so downstream layers can decide what to keep

For more on Medallion Architecture, see [Databricks Glossary: Medallion Architecture](https://www.databricks.com/glossary/medallion-architecture) (Databricks, n.d.).

---

### References  
Databricks. (n.d.). *Medallion Architecture*. Retrieved May 10, 2025, from https://www.databricks.com/glossary/medallion-architecture


# Table of Contents

1. [Setup](#setup)  
   Install required packages and import libraries.  

2. [Configuration](#configuration)  
   Define all file paths, API parameters, and date-column lists in one place for reproducibility.  

3. [Data Loading](#data-loading)  
   Fetch data from the San Jose API and read the Dallas + SoCo CSVs into pandas DataFrames.  

4. [Data Cleaning & Standardization](#data-cleaning--standardization)  
   Rename columns, lowercase them, and drop duplicates so all sources share the same schema.  

5. [Data Merging & Harmonization](#data-merging--harmonization)  
   Stack the three sources into one “bronze” table, tag each row by origin, and enforce dtypes.  

6. [Quick Exploratory Checks](#quick-exploratory-checks)  
   Check missing values, unique counts, distributions, date ranges, and monthly trends.  

-----

## 1. Setup

**Purpose:**  
Ensure the environment has all necessary libraries installed and imported.  
- `%pip install ...` installs dependencies.  

In [1]:
%pip install -r ../../requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import subprocess
import requests
import json
import pandas as pd
from sodapy import Socrata

-----

## 2. Configuration

**Purpose:**  
Centralize all “magic” values—file paths, API endpoints, parameters, and date-column names to make it easy to load everything locally.
- Makes the notebook reproducible.  
- Keeps the loading cells concise.

In [None]:
# ─── Configuration ───

# File paths for the CSV files
DATA_DIR = "../../data-assets/bronze"
CSV_PATHS = {
    "dallas": os.path.join(
        DATA_DIR,
        "Dallas_Animal_Shelter_Data_Fiscal_Year_2023_-_2025_20250516.csv"
    ),
    "soco": os.path.join(
        DATA_DIR,
        "SoCo_Animal_Shelter_Intake_and_Outcome_20250519.csv"
    ),
}

SAN_JOSE = {
    "base_url":    "https://data.sanjoseca.gov/api/3/action/datastore_search",
    "resource_id": "f3354a37-7e03-41f8-a94d-3f720389a68a",
    "params":      {"resource_id": "f3354a37-7e03-41f8-a94d-3f720389a68a", "limit": 10000},
}

# ─── Date columns to parse ───
DATE_COLS = {
    "san_jose": ["IntakeDate", "OutcomeDate"],
    "dallas":   ["Intake_Date", "Outcome_Date"],
    "soco":     ["Intake Date", "Outcome Date"],
}

-----

## 3. Data Loading

**Purpose:**  
Pull raw data into pandas DataFrames:  
1. Call the San Jose API and parse its JSON response.  
2. Read the Dallas + SoCo CSV files, converting date strings to `datetime64`.  

This cell remains focused on *how* to load, not *where* (that’s in Configuration).

In [4]:
# ─── Data Loading ───

# San Jose API
resp = requests.get(
    SAN_JOSE["base_url"], 
    params=SAN_JOSE["params"]
    )
resp.raise_for_status()
san_jose_df = pd.DataFrame(resp.json()["result"]["records"])
# Parse as datetimes
for col in DATE_COLS["san_jose"]:
    san_jose_df[col] = pd.to_datetime(san_jose_df[col], errors="coerce")

# Dallas and SoCo CSVs
dallas_df = pd.read_csv(
    CSV_PATHS["dallas"],
    parse_dates=DATE_COLS["dallas"],
    low_memory=False
)
soco_df = pd.read_csv(
    CSV_PATHS["soco"],
    parse_dates=DATE_COLS["soco"],
    low_memory=False
)

-----

## 4. Data Cleaning & Standardization

**Purpose:**  
Clean up and harmonize column names across sources:  
- Apply a single `COLUMN_MAP` dict.  
- Lowercase everything for consistency.  
- Drop unintended duplicates.  

This ensures downstream steps can assume a uniform schema.

In [None]:
# ─── Data Cleaning ───

# Here we will define the full column mapping:
COLUMN_MAP = {
    # Animal ID
    "AnimalID":      "animal_id",
    "Animal_Id":     "animal_id",
    "Animal ID":     "animal_id",
    # Animal Type
    "AnimalType":    "animal_type",
    "Animal_Type":   "animal_type",
    "Type":          "animal_type",
    # Breed
    "PrimaryBreed":  "breed",
    "Animal_Breed":  "breed",
    "Breed":         "breed",
    # Color
    "PrimaryColor":  "primary_color",
    "Color":         "primary_color",
    # Age
    "Age":           "age",
    # Sex
    "Sex":           "sex",
    # Intake fields
    "IntakeType":        "intake_type",
    "Intake_type":       "intake_type",
    "Intake Type":       "intake_type",
    "IntakeCondition":   "intake_condition",
    "Intake_Condition":  "intake_condition",
    "Intake Condition":  "intake_condition",
    "IntakeReason":      "intake_reason",
    "reason":            "intake_reason",
    "IntakeDate":        "intake_date",
    "Intake_Date":       "intake_date",
    "Intake Date":       "intake_date",
    # Outcome fields
    "OutcomeType":       "outcome_type",
    "outcome_type":      "outcome_type",
    "Outcome Type":      "outcome_type",
    "OutcomeDate":       "outcome_date",
    "Outcome_Date":      "outcome_date",
    "Outcome Date":      "outcome_date",
}

# Function to apply the column mapping 
def standardize_columns(df: pd.DataFrame, mapping: dict) -> pd.DataFrame:
    """
    Standardize DataFrame column names.

    Parameters
    ----------
    df : pandas.DataFrame
        The raw DataFrame whose columns need standardization to enable better
        analysis.
    mapping : dict
        A dict where keys are original column names (exact match) and
        values are the desired standardized names (snake_case).

    Returns
    -------
    pandas.DataFrame
        A copy of `df` with:
        1. Columns renamed according to `mapping`.
        2. All column names converted to lowercase.
        3. Any duplicate column names (arising when multiple originals map
           to the same new name) removed—only the first occurrence is kept.

    Notes
    -----
    - Columns not present in `mapping` are left unchanged (apart from lowercasing).
    - Renaming happens before lowercasing, so mapping keys are case-sensitive.
    - Dropping duplicate columns avoids collisions in downstream code.
    """
    # Apply the renaming mapping
    df = df.rename(columns=mapping)
    # Convert all column names to lowercase
    df.columns = df.columns.str.lower()
    # Remove duplicate columns, keeping the first occurrence
    df = df.loc[:, ~df.columns.duplicated()]
    return df


# Apply the column mapping to each DataFrame
san_jose_clean = standardize_columns(san_jose_df, COLUMN_MAP)
dallas_clean = standardize_columns(dallas_df, COLUMN_MAP)
soco_clean = standardize_columns(soco_df, COLUMN_MAP)

# Uncomment the following lines to do a quick check
# --------------------------------------------------------------
# print("San Jose columns:", san_jose_clean.columns.tolist())
# print("Dallas   columns:", dallas_clean.columns.tolist())
# print("SoCo     columns:", soco_clean.columns.tolist())
# --------------------------------------------------------------

San Jose columns: ['_id', 'animal_id', 'animalname', 'animal_type', 'primary_color', 'secondarycolor', 'breed', 'sex', 'dob', 'age', 'intake_date', 'intake_condition', 'intake_type', 'intakesubtype', 'intake_reason', 'outcome_date', 'outcome_type', 'outcomesubtype', 'outcomecondition', 'crossing', 'jurisdiction', 'lastupdate']
Dallas   columns: ['animal_id', 'animal_type', 'breed', 'kennel_number', 'kennel_status', 'tag_type', 'activity_number', 'activity_sequence', 'source_id', 'census_tract', 'council_district', 'intake_type', 'intake_subtype', 'intake_total', 'reason', 'staff_id', 'intake_date', 'intake_time', 'due_out', 'intake_condition', 'hold_request', 'outcome_type', 'outcome_subtype', 'outcome_date', 'outcome_time', 'receipt_number', 'impound_number', 'service_request_number', 'outcome_condition', 'chip_status', 'animal_origin', 'additional_information', 'month', 'year']
SoCo     columns: ['name', 'animal_type', 'breed', 'primary_color', 'sex', 'size', 'date of birth', 'impoun

-----

## 5. Data Merging & Harmonization

**Purpose:**  
Combine the cleaned DataFrames into one Bronze-layer table:  
- Tag each row with its source (`san_jose`, `dallas`, or `soco`).  
- Reindex to a common `FINAL_COLUMNS` list.  
- Drop exact duplicates and enforce correct dtypes.  

Result: a single `bronze_df` ready for analysis or Silver-layer transforms.

In [None]:
# # ─── Data Merging ───

# Tag each DataFrame with its source to allow for tracking
san_jose_clean["source"] = "san_jose"
dallas_clean["source"] = "dallas"
soco_clean["source"] = "soco"

# Now lets define the final column order for the Bronze Layer
FINAL_COLUMNS = [
    "animal_id",
    "animal_type",
    "breed",
    "primary_color",
    "age",
    "sex",
    "intake_type",
    "intake_condition",
    "intake_reason",
    "intake_date",
    "outcome_type",
    "outcome_date",
    "source",
]

# This is where we will concatenate the DataFrames
def merge_bronze(*dfs: pd.DataFrame, final_cols: list) -> pd.DataFrame:
    """
    Stack multiple source DataFrames into a single Bronze DataFrame.

    Parameters
    ----------
    dfs : one or more pandas.DataFrame
        Cleaned DataFrames to merge (must share standardized column names).
    final_cols : list of str
        Desired column order for the merged Bronze layer.

    Returns
    -------
    pandas.DataFrame
        The combined Bronze DataFrame with only `final_cols`, in that order.
    """
    combined = pd.concat(dfs, ignore_index=True, sort=False)
    # Keep only the columns we care about in the specified order
    combined = combined.reindex(columns=final_cols)
    return combined

bronze_df = merge_bronze(
    san_jose_clean,
    dallas_clean,
    soco_clean,
    final_cols=FINAL_COLUMNS
)

#  Drop exact duplicates to make sure we have unique records
#  across all sources
bronze_df = bronze_df.drop_duplicates()

# Display the final DataFrame shape and a count of records by source
print("Final Bronze shape:", bronze_df.shape)
print(bronze_df["source"].value_counts())

Final Bronze shape: (105597, 13)
source
dallas      65063
soco        30550
san_jose     9984
Name: count, dtype: int64


-----

## 6. Quick Exploratory Checks

**Purpose:**  
Perform lightweight diagnostics to understand your data:  
- **Missing values:** Where do we need imputation or exclusion?  
- **Unique counts:** Which columns are constant vs. highly cardinal?  
- **Distributions:** How do key fields like `intake_type` or `outcome_type` break down?  
- **Date range:** Are you covering the expected time span?  
- **Trends:** Monthly intake counts to spot gaps or seasonality.  

Use these insights to guide further cleaning or deeper EDA.

In [21]:
# ─── Step 6: Quick Exploratory Checks ───

# Missing values per column
print("Missing values:")
print(bronze_df.isna().sum(), "\n")

# Cardinality (distinct counts)
print("Unique values per column:")
print(bronze_df.nunique(), "\n")

# Outcome‐type distribution
print("Outcome types (%):")
print(bronze_df["outcome_type"]
      .value_counts(normalize=True)
      .mul(100)
      .round(1)
      .astype(str) + "%", "\n")

# Intake‐type distribution
print("Intake types (%):")
print(bronze_df["intake_type"]
      .value_counts(normalize=True)
      .mul(100)
      .round(1)
      .astype(str) + "%", "\n")

# Time span of intake dates
print("Intake date range:",
      bronze_df["intake_date"].min().date(),
      "to",
      bronze_df["intake_date"].max().date(),
      "\n")

# Age summary (if numeric)
if "age" in bronze_df.columns:
    # coerce to numeric (some age values may be strings)
    bronze_df["age_num"] = pd.to_numeric(bronze_df["age"], errors="coerce")
    print("Age summary:")
    print(bronze_df["age_num"].describe(), "\n")

#Monthly counts by source (to check seasonality or data gaps)
bronze_df["year_month"] = bronze_df["intake_date"].dt.to_period("M")
monthly_counts = (
    bronze_df
    .groupby(["year_month", "source"])
    .size()
    .unstack(fill_value=0)
)
print("Monthly intake counts by source:")
print(monthly_counts.tail())

Missing values:
animal_id                0
animal_type              0
breed                   39
primary_color        65063
age                  95613
sex                  65063
intake_type              0
intake_condition         0
intake_reason       105139
intake_date              0
outcome_type           370
outcome_date          1447
source                   0
dtype: int64 

Unique values per column:
animal_id           85442
animal_type             6
breed                1221
primary_color         394
age                    61
sex                    10
intake_type            23
intake_condition       26
intake_reason          25
intake_date          3979
outcome_type           29
outcome_date         3593
source                  3
dtype: int64 

Outcome types (%):
outcome_type
ADOPTION             28.6%
TRANSFER             14.8%
FOSTER               11.7%
RETURN TO OWNER       9.6%
EUTHANIZED            8.2%
RETURNED TO OWNER     5.2%
EUTHANIZE             4.0%
LOST EXP          

-----