# Project Overview

In this project, we'll be building a data pipeline that extracts police incident data in Cambridge, MA on a daily(?) basis and uploads it to a dashboard.
The data will be extracted from the Cambridge Open Data Portal using Socrata's Open Data API.
The dashboard will help people see trends in police incident data in Cambridge, MA.

This notebook is intended to be used during the development process to develop/test individual components of the project.

## Extract

We'll extract Cambridge daily police data using the Socrata Open Data API.

Dataset information:
* https://data.cambridgema.gov/Public-Safety/Daily-Police-Log/3gki-wyrb/about_data

Socrata Open Data API:
* https://dev.socrata.com/foundry/data.cambridgema.gov/3gki-wyrb

In [20]:
import pandas as pd
from sodapy import Socrata
from dotenv import load_dotenv
import os

When making the API request, we'll throw in an app token to avoid request throttling. 
The app token allows Socrata to identify our application, and avoid request throttling. Without the app token, we'll be limited to 2000 rows in our fetched results. 
Including the app token will allow us to get all the data. Additionally, we'll increase the timeout period to avoid timeout errors.

For more information:
* https://dev.socrata.com/docs/app-tokens.html

In [35]:
def extract_data():
    '''
    Extract incident data reported to the Cambridge Police Department using the Socrata Open Data API.
    Return the incident data as a Pandas DataFrame.
    '''
    # fetch Socrata app token from .env
    # include this app token when interacting with the Socrata API to avoid request throttling, so we can fetch all the incidents
    load_dotenv()
    APP_TOKEN = os.getenv("SOCRATA_APP_TOKEN") 

    # create Socrata client to interact with the Socrata API (https://github.com/afeld/sodapy)
    client = Socrata(
        "data.cambridgema.gov", 
        APP_TOKEN, 
        timeout=30 # increase timeout from the default - sometimes, it takes longer to fetch all the results
    )

    # fetch all data, paginating over results
    DATASET_ID = "3gki-wyrb" # unique identifier for Cambridge Police Log data (https://data.cambridgema.gov/Public-Safety/Daily-Police-Log/3gki-wyrb/about_data)
    results = client.get_all(DATASET_ID)

    # Convert to pandas DataFrame
    results_df = pd.DataFrame.from_records(results)

    return results_df

results_df = extract_data()
results_df

Unnamed: 0,date_time,id,type,subtype,location,description,last_updated
0,2025-03-06T01:07:00.000,25001925,INCIDENT,LARCENY FROM BUILDING C266 S20,NEWPORT RD,Police responded to the 0 block of Newport Rd ...,2025-03-11T09:35:52.000
1,2025-03-06T01:56:00.000,25001926,INCIDENT,"MOTOR VEH, LARCENY OF C266 S28",MASSACHUSETTS AVE,Police responded to the 400 block of Massachus...,2025-03-11T09:35:52.000
2,2025-03-06T09:15:00.000,25001928,INCIDENT,B&E MV / BOAT NIGHTTIME FOR FELONY C266 S16,IRVING ST,Police responded to the 0 block of Irving Stre...,2025-03-11T09:35:52.000
3,2025-03-06T10:01:00.000,25001929,INCIDENT,LARCENY UNDER $1200 C266 S30(1),MASSACHUSETTS AVE & WESTERN AVE,A Cambridge resident came to the Central Squar...,2025-03-11T09:35:52.000
4,2025-03-06T10:02:00.000,25001930,INCIDENT,SHOPLIFTING BY ASPORTATION C266 S30A,MASSACHUSETTS AVE,Police responded to the 500 block of Massachus...,2025-03-11T09:35:52.000
...,...,...,...,...,...,...,...
7271,2026-01-15T20:07:00.000,26000428,INCIDENT,LARCENY FROM BUILDING C266 S20,WALDEN ST,A Cambridge resident reported a package theft ...,2026-01-16T16:00:07.000
7272,2026-01-15T20:25:00.000,26000430,INCIDENT,SHOPLIFTING BY CONCEALING MDSE C266 S30A,CENTRAL SQ,"David Powell, 57, of Roslindale was placed und...",2026-01-16T16:00:07.000
7273,2026-01-15T21:12:00.000,26000432,INCIDENT,SHOPLIFTING BY CONCEALING MDSE C266 S30A,ALEWIFE BROOK PKWY,A Cambridge resident will be summonsed to Camb...,2026-01-16T16:00:07.000
7274,2026-01-15T21:26:00.000,26000433,ARREST,WARRANT ARREST REPORT,GREEN ST & PEARL ST,"Krystal Scott, 43, of Malden was placed under ...",2026-01-16T16:00:07.000


In [22]:
results_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7276 entries, 0 to 7275
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   date_time     7276 non-null   object
 1   id            7276 non-null   object
 2   type          7276 non-null   object
 3   subtype       7271 non-null   object
 4   location      7276 non-null   object
 5   description   7183 non-null   object
 6   last_updated  7276 non-null   object
dtypes: object(7)
memory usage: 398.0+ KB


In [23]:
# fetch rows that contain any missing values
results_df[results_df.isna().any(axis=1)]

Unnamed: 0,date_time,id,type,subtype,location,description,last_updated
553,2025-01-01T11:41:00.000,25000010,INCIDENT,,MASSACHUSETTS AVE,Police responded to the 600th block of Massach...,2025-03-20T13:41:07.000
557,2025-01-01T13:47:00.000,25000014,INCIDENT,,HUNTING ST,Police responded to the 0th block of Hunting S...,2025-03-20T13:41:07.000
627,2025-01-05T12:09:00.000,25000117,INCIDENT,,CEDAR ST,Police responded to the 0 block on Cedar Stree...,2025-03-20T13:41:07.000
650,2025-01-06T12:11:00.000,25000143,INCIDENT,,SIXTH ST,Police were dispatched by the Emergency Commun...,2025-03-20T13:41:07.000
670,2025-01-07T23:16:00.000,25000187,INCIDENT,,NORTH FIRST ST,Police responded to a reported Breaking & Ente...,2025-03-20T13:41:07.000
...,...,...,...,...,...,...,...
6618,2025-12-07T12:25:00.000,25011431,INCIDENT,LARCENY OVER $1200 BY FALSE PRETENSE C266 S34,AMES ST ...,,2025-12-12T11:00:09.000
6760,2025-12-15T07:38:00.000,25011667,INCIDENT,TRAFFIC INVESTIGATIONS,CONCORD AVE ...,,2025-12-19T16:00:05.000
6825,2025-12-18T09:23:00.000,25011767,ARREST,WARRANT ARREST REPORT,MASSACHUSETTS AVE ...,,2025-12-23T15:00:07.000
6978,2025-12-28T16:19:00.000,25012032,INCIDENT,LARCENY OVER $1200 C266 S30,MT VERNON ST ...,,2026-01-01T15:00:07.000


In [24]:
from collections import Counter 

schema = ['date_time', 'id', 'type', 'subtype', 'location', 'last_updated', 'description']
print(Counter(schema) == Counter(list(results_df.columns)))


True


In [25]:
results_df['type'].value_counts()

type
INCIDENT          6904
ARREST             238
TRAFFIC            121
SP                   8
ASSIST               4
INVESTIGATIONS       1
Name: count, dtype: int64

In [26]:
# results_df['datetime'] = pd.to_datetime(results_df['date_time'])

# min_date = results_df['datetime'].min()
# max_date = results_df['datetime'].max()

# print(f"Minimum Date: {min_date}")
# print(f"Maximum Date: {max_date}")

In [27]:
print(f"Unique ID count: {results_df['id'].nunique()}")
print(f"Number of rows: {len(results_df)}")

Unique ID count: 7229
Number of rows: 7276


## Data Validation & Transformation

We'll now perform some basic data quality checks to verify the data meets certain requirements before transforming and loading it to our database.

Data quality checks include:
- checking schema
- unique, numeric IDs
- verify datetime follows ISO 8601 format (https://www.iso.org/iso-8601-date-and-time-format.html)
- no missing values

Data transformations could include:
- deduplicate records (keep the first)
- converting datestring to datetime
- splitting datetime into year, month, day, and time columns

Other possible transformations include:
- dealing with missing values depending on column
- checking whether columns fall within certain ranges/values

Many other transformations are worthy to consider depending on your use case. We'll proceed with these basic ones for now.

Considerations:
* How to deal with data quality failures? Raise or catch errors? EXPLAIN.
* Separate transform and data quality checks?

In [28]:
from datetime import datetime
from collections import Counter
import pandas as pd

### UTILITIES
def check_valid_schema(df):
    '''
    Check whether the DataFrame content contains the expected columns for the Cambridge Police dataset. 
    Otherwise, raise an error.
    '''
    SCHEMA_COLS = ['date_time', 'id', 'type', 'subtype', 'location', 'last_updated', 'description']
    if Counter(df.columns) != Counter(SCHEMA_COLS):
        raise ValueError("Schema does not match with the expected schema.")
    
def check_numeric_id(df):
    '''
    Convert 'id' values to numeric.
    If any 'id' values are non-numeric, replace them with NaN, so they can be removed downstream in the data transformations.
    '''
    df['id'] = pd.to_numeric(df['id'], errors='coerce')
    return df

def verify_datetime(df):
    '''
    Verify 'date_time' values follow ISO 8601 format (https://www.iso.org/iso-8601-date-and-time-format.html).
    Raise a ValueError if any of the 'date_time' values are invalid.
    '''
    df.apply(lambda row: datetime.fromisoformat(row['date_time']), axis=1) 
    
def check_missing_values(df):
    '''
    Check whether there are any missing values in columns that require data.
    For police logs, each incident should have a datetime, ID, incident type, and location.
    '''
    REQUIRED_COLS = ['date_time', 'id', 'type', 'location']
    for col in REQUIRED_COLS:
        if df[col].isnull().sum() > 0:
            raise ValueError(f"Missing values are present in the '{col}' attribute.")

### VALIDATION LOGIC
def validate_data(df):
    """
    Check the data satisfies the following data quality checks:
    - schema is valid
    - IDs are numeric
    - datetime follows ISO 8601 format
    - no missing values in columns that require data

    TODO: Use Great Expectations? (https://greatexpectations.io/)
    """
    check_valid_schema(df)

    df = check_numeric_id(df)

    verify_datetime(df)
    
    check_missing_values(df)
        
    return df

validated_df = validate_data(results_df)
print(validated_df.shape)
validated_df

(7276, 7)


Unnamed: 0,date_time,id,type,subtype,location,description,last_updated
0,2025-03-06T01:07:00.000,25001925,INCIDENT,LARCENY FROM BUILDING C266 S20,NEWPORT RD,Police responded to the 0 block of Newport Rd ...,2025-03-11T09:35:52.000
1,2025-03-06T01:56:00.000,25001926,INCIDENT,"MOTOR VEH, LARCENY OF C266 S28",MASSACHUSETTS AVE,Police responded to the 400 block of Massachus...,2025-03-11T09:35:52.000
2,2025-03-06T09:15:00.000,25001928,INCIDENT,B&E MV / BOAT NIGHTTIME FOR FELONY C266 S16,IRVING ST,Police responded to the 0 block of Irving Stre...,2025-03-11T09:35:52.000
3,2025-03-06T10:01:00.000,25001929,INCIDENT,LARCENY UNDER $1200 C266 S30(1),MASSACHUSETTS AVE & WESTERN AVE,A Cambridge resident came to the Central Squar...,2025-03-11T09:35:52.000
4,2025-03-06T10:02:00.000,25001930,INCIDENT,SHOPLIFTING BY ASPORTATION C266 S30A,MASSACHUSETTS AVE,Police responded to the 500 block of Massachus...,2025-03-11T09:35:52.000
...,...,...,...,...,...,...,...
7271,2026-01-15T20:07:00.000,26000428,INCIDENT,LARCENY FROM BUILDING C266 S20,WALDEN ST,A Cambridge resident reported a package theft ...,2026-01-16T16:00:07.000
7272,2026-01-15T20:25:00.000,26000430,INCIDENT,SHOPLIFTING BY CONCEALING MDSE C266 S30A,CENTRAL SQ,"David Powell, 57, of Roslindale was placed und...",2026-01-16T16:00:07.000
7273,2026-01-15T21:12:00.000,26000432,INCIDENT,SHOPLIFTING BY CONCEALING MDSE C266 S30A,ALEWIFE BROOK PKWY,A Cambridge resident will be summonsed to Camb...,2026-01-16T16:00:07.000
7274,2026-01-15T21:26:00.000,26000433,ARREST,WARRANT ARREST REPORT,GREEN ST & PEARL ST,"Krystal Scott, 43, of Malden was placed under ...",2026-01-16T16:00:07.000


In [29]:
import pandas as pd

### UTILITIES
def remove_duplicates(df):
    '''
    Remove duplicate rows from dataframe based on 'id' column. Keep the first occurrence.
    '''
    return df.drop_duplicates(subset=["id"], keep='first')

def remove_invalid_rows(df):
    '''
    Remove rows where the 'id' is NaN, as these IDs were identified as non-numeric.
    '''
    return df.dropna(subset='id')

def split_datetime(df):
    '''
    Split the date_time column into separate year, month, day, and time columns.
    '''
    # convert to datetime
    df['date_time'] = pd.to_datetime(df['date_time'])

    # extract year/month/day/time
    df['year'] = df['date_time'].dt.year
    df['month'] = df['date_time'].dt.month
    df['day'] = df['date_time'].dt.day
    df['hour'] = df['date_time'].dt.hour
    df['minute'] = df['date_time'].dt.minute
    df['second'] = df['date_time'].dt.second

    return df

### TRANSFORMATION LOGIC
def transform_data(df):
    '''
    Apply the following transformations to the passed in dataframe:
    - deduplicate records (keep the first)
    - remove invalid rows
    - split datetime into year, month, day, and time columns
    ''' 

    df = remove_duplicates(df)

    df = remove_invalid_rows(df)

    df = split_datetime(df)

    return df

transformed_df = transform_data(validated_df)
print(transformed_df.shape)
transformed_df

(7229, 13)


Unnamed: 0,date_time,id,type,subtype,location,description,last_updated,year,month,day,hour,minute,second
0,2025-03-06 01:07:00,25001925,INCIDENT,LARCENY FROM BUILDING C266 S20,NEWPORT RD,Police responded to the 0 block of Newport Rd ...,2025-03-11T09:35:52.000,2025,3,6,1,7,0
1,2025-03-06 01:56:00,25001926,INCIDENT,"MOTOR VEH, LARCENY OF C266 S28",MASSACHUSETTS AVE,Police responded to the 400 block of Massachus...,2025-03-11T09:35:52.000,2025,3,6,1,56,0
2,2025-03-06 09:15:00,25001928,INCIDENT,B&E MV / BOAT NIGHTTIME FOR FELONY C266 S16,IRVING ST,Police responded to the 0 block of Irving Stre...,2025-03-11T09:35:52.000,2025,3,6,9,15,0
3,2025-03-06 10:01:00,25001929,INCIDENT,LARCENY UNDER $1200 C266 S30(1),MASSACHUSETTS AVE & WESTERN AVE,A Cambridge resident came to the Central Squar...,2025-03-11T09:35:52.000,2025,3,6,10,1,0
4,2025-03-06 10:02:00,25001930,INCIDENT,SHOPLIFTING BY ASPORTATION C266 S30A,MASSACHUSETTS AVE,Police responded to the 500 block of Massachus...,2025-03-11T09:35:52.000,2025,3,6,10,2,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
7271,2026-01-15 20:07:00,26000428,INCIDENT,LARCENY FROM BUILDING C266 S20,WALDEN ST,A Cambridge resident reported a package theft ...,2026-01-16T16:00:07.000,2026,1,15,20,7,0
7272,2026-01-15 20:25:00,26000430,INCIDENT,SHOPLIFTING BY CONCEALING MDSE C266 S30A,CENTRAL SQ,"David Powell, 57, of Roslindale was placed und...",2026-01-16T16:00:07.000,2026,1,15,20,25,0
7273,2026-01-15 21:12:00,26000432,INCIDENT,SHOPLIFTING BY CONCEALING MDSE C266 S30A,ALEWIFE BROOK PKWY,A Cambridge resident will be summonsed to Camb...,2026-01-16T16:00:07.000,2026,1,15,21,12,0
7274,2026-01-15 21:26:00,26000433,ARREST,WARRANT ARREST REPORT,GREEN ST & PEARL ST,"Krystal Scott, 43, of Malden was placed under ...",2026-01-16T16:00:07.000,2026,1,15,21,26,0


## Load

Load the transformed data into Postgres/SQLite.
* Create the table
* Execute INSERT query to load dataframe data into Postgres
* Verify results

In [30]:
# transformed_df.iloc[0]['description']
text_cols = ['type', 'subtype', 'location', 'description']
for col in text_cols:
    lengths = transformed_df.apply(lambda row: len(str(row[col])), axis=1)
    print(f"Col: {col} | Max length: {max(lengths)}")


Col: type | Max length: 14
Col: subtype | Max length: 63
Col: location | Max length: 670
Col: description | Max length: 1484


# Visualize

Dashboard displaying trends in total incidents, incident types, and location:
- Incident count by day/week/month
- Incident type breakdown by day/week/month
- Incident count by location


## Scheduling

*Work on this before/after scheduling*

Schedule data pipeline to run on regular schedule:
* Daily/weekly/monthly? Data updated daily.

Schedule webpage(?) that holds dashboard to query database for updated statistics:
* Probably same frequency as data pipeline schedule - doesn't make sense to query DB multiple times before data updated, but also want to keep dashboard as up to date as possible.

Create the table in Postgres.

In [31]:
import psycopg2

def create_postgres_table():
    '''
    Create the 'cpd_incidents' table in the 'cpd_db' Postgres DB if it doesn't exist.
    '''
    # establish connection to DB
    conn = psycopg2.connect(
        host="localhost",
        port="5433",
        database="cpd_db",
        user="jiminkang",
        password="password"
    )

    # create cursor object to execute SQL
    cur = conn.cursor()
    
    # execute query
    create_table_query = '''
        CREATE TABLE IF NOT EXISTS cpd_incidents (
            date_time TIMESTAMP,
            id INTEGER PRIMARY KEY,
            type TEXT,
            subtype TEXT,
            location TEXT,
            description TEXT,
            last_updated TIMESTAMP,
            year INTEGER,
            month INTEGER,
            day INTEGER,
            hour INTEGER, 
            minute INTEGER,
            second INTEGER
        )
    '''
    cur.execute(create_table_query)

    # commit changes
    conn.commit()

    # close cursor and connection
    cur.close()
    conn.close()

create_postgres_table()

Insert transformed data into Postgres table.

In [32]:
from sqlalchemy import URL, create_engine

def load_into_postgres(df):
    '''
    Load the transformed data into the Postgres DB.
    '''
    # create Engine object with Postgres connection parameters to connect to DB
    engine = create_engine("postgresql://jiminkang:password@localhost:5433/cpd_db")
        
    # insert data into Postgres DB into the 'cpd_incidents' table
    df.to_sql('cpd_incidents', engine, if_exists='replace')

load_into_postgres(transformed_df)

Double check that data was imported into postgres

In [None]:
# establish connection to DB
conn = psycopg2.connect(
    host="localhost",
    port="5433",
    database="cpd_db",
    user="jiminkang",
    password="password"
)

# create cursor object to execute SQL
cur = conn.cursor()

# execute query & read results into dataframe
select_query = '''
    SELECT * FROM cpd_incidents;
'''
fetched_df = pd.read_sql(select_query, conn)

# close conn
conn.close()

fetched_df




  fetched_df = pd.read_sql(select_query, conn)


Unnamed: 0,index,date_time,id,type,subtype,location,description,last_updated,year,month,day,hour,minute,second
0,0,2025-03-06 01:07:00,25001925,INCIDENT,LARCENY FROM BUILDING C266 S20,NEWPORT RD,Police responded to the 0 block of Newport Rd ...,2025-03-11T09:35:52.000,2025,3,6,1,7,0
1,1,2025-03-06 01:56:00,25001926,INCIDENT,"MOTOR VEH, LARCENY OF C266 S28",MASSACHUSETTS AVE,Police responded to the 400 block of Massachus...,2025-03-11T09:35:52.000,2025,3,6,1,56,0
2,2,2025-03-06 09:15:00,25001928,INCIDENT,B&E MV / BOAT NIGHTTIME FOR FELONY C266 S16,IRVING ST,Police responded to the 0 block of Irving Stre...,2025-03-11T09:35:52.000,2025,3,6,9,15,0
3,3,2025-03-06 10:01:00,25001929,INCIDENT,LARCENY UNDER $1200 C266 S30(1),MASSACHUSETTS AVE & WESTERN AVE,A Cambridge resident came to the Central Squar...,2025-03-11T09:35:52.000,2025,3,6,10,1,0
4,4,2025-03-06 10:02:00,25001930,INCIDENT,SHOPLIFTING BY ASPORTATION C266 S30A,MASSACHUSETTS AVE,Police responded to the 500 block of Massachus...,2025-03-11T09:35:52.000,2025,3,6,10,2,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7224,7271,2026-01-15 20:07:00,26000428,INCIDENT,LARCENY FROM BUILDING C266 S20,WALDEN ST,A Cambridge resident reported a package theft ...,2026-01-16T16:00:07.000,2026,1,15,20,7,0
7225,7272,2026-01-15 20:25:00,26000430,INCIDENT,SHOPLIFTING BY CONCEALING MDSE C266 S30A,CENTRAL SQ,"David Powell, 57, of Roslindale was placed und...",2026-01-16T16:00:07.000,2026,1,15,20,25,0
7226,7273,2026-01-15 21:12:00,26000432,INCIDENT,SHOPLIFTING BY CONCEALING MDSE C266 S30A,ALEWIFE BROOK PKWY,A Cambridge resident will be summonsed to Camb...,2026-01-16T16:00:07.000,2026,1,15,21,12,0
7227,7274,2026-01-15 21:26:00,26000433,ARREST,WARRANT ARREST REPORT,GREEN ST & PEARL ST,"Krystal Scott, 43, of Malden was placed under ...",2026-01-16T16:00:07.000,2026,1,15,21,26,0
