#Data Loading, Cleaning, and Normalization
Now that we have a better idea of what the data contains, we're going to load it in a format that will be more efficient for analysis.  Changes still needed:
 - note splitting regex still has issues.  `SELECT descr, COUNT(*) FROM note_author GROUP BY descr ORDER BY COUNT` will show the weird ones
 
We'll load each table as a two-step process.  First, we scan each table and accumulate a set for each lookup table associated.  We'll then load these lookup tables.  Second, we'll load the main table.  This should be less complicated than trying to accumulate the lookup tables during the chunked-out load of the main table.

Main table: call
Lookup tables: call_unit, city, nature, beat, district, sector, zip, priority
Lookup tables loaded separately: call_source

Main table: note
Lookup tables: note_author

Main table: shift
Lookup tables: officer, call_unit (update)

Main table: call_log
Lookup tables: transaction
Lookup tables loaded separately: close_code

Main table: incident
Lookup tables: city, ucr_descr, ucr_code, beat, district, sector, zip
Lookup tables loaded separately: premise, weapon, bureau, division, unit, investigation_status, case_status

Main table: modus_operandi
Lookup tables: mo_item

Main table: out_of_service
Lookup tables: call_unit (update), os_code

Main table: shift
Lookup tables: call_unit (update), officer_name

Other tables: squad (referenced by call_unit, but since we're constantly updating call_unit, won't load this till the end)

We'll use dataset to stuff the data into postgres.

In [1]:
import dataset
import datetime as dt
import pandas as pd
from sqlalchemy.exc import IntegrityError
from sqlalchemy import create_engine

We need to create the tables before touching the data so they have all the proper constraints.

#Database DDL

Code to create the database schema is below.

In [23]:
# CHANGE CREDENTIALS AS APPROPRIATE
#sqlalchemy_uri = 'postgresql://datascientist:1234thumbwar@freyja.rtp.rti.org:5432/cfs_v3'
sqlalchemy_uri = 'postgresql://jnance:@localhost:5432/cfs'

db = dataset.connect(sqlalchemy_uri)
engine = create_engine(sqlalchemy_uri)

  (attype, name))
  (attype, name))
  (attype, name))


In [24]:
def reset_db():
    """
    Remove and recreate tables to prepare for reloading the db
    """
    db.query("DROP TABLE IF EXISTS note CASCADE;")
    db.query("DROP TABLE IF EXISTS note_author CASCADE;")
    db.query("DROP TABLE IF EXISTS call CASCADE;")
    db.query("DROP TABLE IF EXISTS call_source CASCADE;")
    db.query("DROP TABLE IF EXISTS call_unit CASCADE;")
    db.query("DROP TABLE IF EXISTS priority CASCADE;")
    db.query("DROP TABLE IF EXISTS city CASCADE;")
    db.query("DROP TABLE IF EXISTS beat CASCADE;")
    db.query("DROP TABLE IF EXISTS zip_code CASCADE;")
    db.query("DROP TABLE IF EXISTS district CASCADE;")
    db.query("DROP TABLE IF EXISTS sector CASCADE;")
    db.query("DROP TABLE IF EXISTS officer CASCADE;")
    db.query("DROP TABLE IF EXISTS shift CASCADE;")
    db.query("DROP TABLE IF EXISTS shift_unit CASCADE;")
    db.query("DROP TABLE IF EXISTS call_log CASCADE;")
    db.query("DROP TABLE IF EXISTS transaction CASCADE;")
    db.query("DROP TABLE IF EXISTS close_code CASCADE;")
    db.query("DROP TABLE IF EXISTS ucr_descr CASCADE;")
    db.query("DROP TABLE IF EXISTS ucr_code CASCADE;")
    db.query("DROP TABLE IF EXISTS incident CASCADE;")
    db.query("DROP TABLE IF EXISTS incident_mo_item CASCADE;")
    db.query("DROP TABLE IF EXISTS mo_item CASCADE;")
    db.query("DROP TABLE IF EXISTS mo_group CASCADE;")
    db.query("DROP TABLE IF EXISTS bureau CASCADE;")
    db.query("DROP TABLE IF EXISTS case_status CASCADE;")
    db.query("DROP TABLE IF EXISTS division CASCADE;")
    db.query("DROP TABLE IF EXISTS unit CASCADE;")
    db.query("DROP TABLE IF EXISTS investigation_status CASCADE;")
    db.query("DROP TABLE IF EXISTS weapon CASCADE;")
    db.query("DROP TABLE IF EXISTS weapon_group CASCADE;")
    db.query("DROP TABLE IF EXISTS premise CASCADE;")
    db.query("DROP TABLE IF EXISTS premise_group CASCADE;")
    db.query("DROP TABLE IF EXISTS nature CASCADE;")
    db.query("DROP TABLE IF EXISTS oos_code CASCADE;")
    db.query("DROP TABLE IF EXISTS out_of_service CASCADE;")
    db.query("DROP TABLE IF EXISTS squad CASCADE;")

    
    db.query("""
    CREATE TABLE ucr_code
    (
      ucr_code_id serial NOT NULL,
      descr text,
      CONSTRAINT ucr_code_pk PRIMARY KEY (ucr_code_id)
    );
    """)
    
    db.query("""
    CREATE TABLE ucr_descr
    (
      ucr_descr_id serial NOT NULL,
      short_descr text,
      long_descr text,
      ucr_code_id int,
      CONSTRAINT ucr_descr_pk PRIMARY KEY (ucr_descr_id),
      CONSTRAINT ucr_code_ucr_descr_fk FOREIGN KEY (ucr_code_id) REFERENCES ucr_code (ucr_code_id)
    );
    """)
    
    db.query("""
    CREATE TABLE bureau
    (
      bureau_id serial NOT NULL,
      descr text,
      CONSTRAINT bureau_pk PRIMARY KEY (bureau_id)
    );
    """)
    
    db.query("""
    CREATE TABLE division
    (
      division_id serial NOT NULL,
      descr text,
      CONSTRAINT division_pk PRIMARY KEY (division_id)
    );
    """)
    
    db.query("""
    CREATE TABLE investigation_status
    (
      investigation_status_id serial NOT NULL,
      descr text,
      CONSTRAINT investigation_status_pk PRIMARY KEY (investigation_status_id)
    );
    """)
    
    db.query("""
    CREATE TABLE case_status
    (
      case_status_id serial NOT NULL,
      descr text,
      CONSTRAINT case_status_pk PRIMARY KEY (case_status_id)
    );
    """)
    
    db.query("""
    CREATE TABLE unit
    (
      unit_id serial NOT NULL,
      descr text,
      CONSTRAINT unit_pk PRIMARY KEY (unit_id)
    );
    """)
    
    db.query("""
    CREATE TABLE weapon_group
    (
      weapon_group_id serial NOT NULL,
      descr text,
      CONSTRAINT weapon_group_pk PRIMARY KEY (weapon_group_id)
    );
    """)
    
    db.query("""
    CREATE TABLE premise_group
    (
      premise_group_id serial NOT NULL,
      descr text,
      CONSTRAINT premise_group_pk PRIMARY KEY (premise_group_id)
    );
    """)
    
    db.query("""
    CREATE TABLE weapon
    (
      weapon_id serial NOT NULL,
      descr text,
      weapon_group_id int,
      CONSTRAINT weapon_pk PRIMARY KEY (weapon_id),
      CONSTRAINT weapon_group_weapon_fk FOREIGN KEY (weapon_group_id) REFERENCES weapon_group (weapon_group_id)
    );
    """)
    
    db.query("""
    CREATE TABLE premise
    (
      premise_id serial NOT NULL,
      descr text,
      premise_group_id int,
      CONSTRAINT premise_pk PRIMARY KEY (premise_id),
      CONSTRAINT premise_group_premise_fk FOREIGN KEY (premise_group_id) REFERENCES premise_group (premise_group_id)
    );
    """)
    
    db.query("""
    CREATE TABLE city
    (
      city_id serial NOT NULL,
      descr text,
      CONSTRAINT city_pk PRIMARY KEY (city_id)
    );
    """)
    
    db.query("""
    CREATE TABLE sector
    (
      sector_id serial NOT NULL,
      descr text,
      CONSTRAINT sector_pk PRIMARY KEY (sector_id)
    );
    """)
    
    db.query("""
    CREATE TABLE district
    (
      district_id serial NOT NULL,
      sector_id int,
      descr text,
      CONSTRAINT district_pk PRIMARY KEY (district_id),
      CONSTRAINT sector_district_fk FOREIGN KEY (sector_id) REFERENCES sector (sector_id)
    );
    """)
    
    db.query("""
    CREATE TABLE beat
    (
      beat_id serial NOT NULL,
      district_id int,
      sector_id int,
      descr text,
      CONSTRAINT beat_pk PRIMARY KEY (beat_id),
      CONSTRAINT district_beat_fk FOREIGN KEY (district_id) REFERENCES district (district_id),
      CONSTRAINT sector_beat_fk FOREIGN KEY (sector_id) REFERENCES sector (sector_id)
    );
    """)
    
    db.query("""
    CREATE TABLE zip_code
    (
      zip_code_id serial NOT NULL,
      descr text,
      CONSTRAINT zip_code_pk PRIMARY KEY (zip_code_id)
    );
    """)
    
    db.query("""
    CREATE TABLE incident
    (
      incident_id bigint NOT NULL,
      case_id bigint UNIQUE,
      time_filed timestamp without time zone,
      month_filed int,
      week_filed int,
      year_filed int,
      dow_filed int,
      street_num int,
      street_name text,
      city_id int,
      zip_code_id int,
      geox double precision,
      geoy double precision,
      beat_id int,
      district_id int,
      sector_id int,
      premise_id int,
      weapon_id int,
      domestic boolean,
      juvenile boolean,
      gang_related boolean,
      emp_bureau_id int,
      emp_division_id int,
      emp_unit_id int,
      num_officers int,
      investigation_status_id int,
      investigator_unit_id int,
      case_status_id int,
      ucr_descr_id int,
      committed boolean,
      
      CONSTRAINT incident_pk PRIMARY KEY (incident_id),
      
      CONSTRAINT case_status_incident_fk
        FOREIGN KEY (case_status_id) REFERENCES case_status (case_status_id),
      CONSTRAINT bureau_incident_fk
        FOREIGN KEY (emp_bureau_id) REFERENCES bureau (bureau_id),
      CONSTRAINT division_incident_fk
        FOREIGN KEY (emp_division_id) REFERENCES division (division_id),
      CONSTRAINT unit_incident_emp_fk
        FOREIGN KEY (emp_unit_id) REFERENCES unit (unit_id),
      CONSTRAINT unit_incident_investigator_fk
        FOREIGN KEY (investigator_unit_id) REFERENCES unit (unit_id),
      CONSTRAINT investigation_status_incident_fk
        FOREIGN KEY (investigation_status_id) REFERENCES investigation_status (investigation_status_id),
      CONSTRAINT premise_incident_fk
        FOREIGN KEY (premise_id) REFERENCES premise (premise_id),
      CONSTRAINT weapon_incident_fk
        FOREIGN KEY (weapon_id) REFERENCES weapon (weapon_id),
      CONSTRAINT city_incident_fk
        FOREIGN KEY (city_id) REFERENCES city (city_id),
      CONSTRAINT ucr_descr_incident_fk
        FOREIGN KEY (ucr_descr_id) REFERENCES ucr_descr (ucr_descr_id),
      CONSTRAINT beat_incident_fk
        FOREIGN KEY (beat_id) REFERENCES beat (beat_id),
      CONSTRAINT district_incident_fk
        FOREIGN KEY (district_id) REFERENCES district (district_id),
      CONSTRAINT sector_incident_fk
        FOREIGN KEY (sector_id) REFERENCES sector (sector_id),
      CONSTRAINT zip_code_incident_fk
        FOREIGN KEY (zip_code_id) REFERENCES zip_code (zip_code_id)
    );
    """)
    
    db.query("""
    CREATE TABLE mo_group
    (
      mo_group_id serial NOT NULL,
      descr text,
      CONSTRAINT mo_group_pk PRIMARY KEY (mo_group_id)
    );
    """)
    
    db.query("""
    CREATE TABLE mo_item
    (
      mo_item_id serial NOT NULL,
      descr text,
      mo_group_id int NOT NULL,
      CONSTRAINT mo_item_pk PRIMARY KEY (mo_item_id),
      CONSTRAINT mo_group_mo_item_fk FOREIGN KEY (mo_group_id) REFERENCES mo_group (mo_group_id)
    );
    """)
    
    db.query("""
    CREATE TABLE incident_mo_item
    (
      incident_mo_item_id bigint NOT NULL,
      incident_id bigint,
      mo_item_id int,
      
      CONSTRAINT incident_mo_item_pk PRIMARY KEY (incident_mo_item_id),
      
      CONSTRAINT incident_incident_mo_item_fk FOREIGN KEY (incident_id) REFERENCES incident (incident_id),
      CONSTRAINT mo_item_incident_mo_item_fk FOREIGN KEY (mo_item_id) REFERENCES mo_item (mo_item_id)
    );
    """)
    
    db.query("""
    CREATE TABLE call_source
    (
      call_source_id serial NOT NULL,
      descr text,
      CONSTRAINT call_source_pk PRIMARY KEY (call_source_id)
    );
    """)
    
    db.query("""
    CREATE TABLE squad
    (
      squad_id serial NOT NULL,
      descr text,
      CONSTRAINT squad_pk PRIMARY KEY (squad_id)
    );
    """)
    
    db.query("""
    CREATE TABLE call_unit
    (
      call_unit_id serial NOT NULL,
      descr text,
      squad_id int,
      CONSTRAINT call_unit_pk PRIMARY KEY (call_unit_id),
      CONSTRAINT squad_call_unit_fk FOREIGN KEY (squad_id) REFERENCES squad (squad_id)
    );
    """)
    
    db.query("""
    CREATE TABLE close_code
    (
      close_code_id serial NOT NULL,
      descr text,
      CONSTRAINT close_code_pk PRIMARY KEY (close_code_id)
    );
    """)
    
    db.query("""
    CREATE TABLE nature
    (
      nature_id serial NOT NULL,
      descr text,
      CONSTRAINT nature_pk PRIMARY KEY (nature_id)
    );
    """)
    
    db.query("""
    CREATE TABLE priority
    (
      priority_id serial NOT NULL,
      descr text,
      CONSTRAINT priority_pk PRIMARY KEY (priority_id)
    );
    """)
    
    db.query("""
    CREATE TABLE call
    (
      call_id bigint NOT NULL,
      year_received int,
      month_received int,
      week_received int,
      dow_received int,
      hour_received int,
      case_id bigint,
      call_source_id int,
      primary_unit_id int,
      first_dispatched_id int,
      reporting_unit_id int,
      street_num int,
      street_name text,
      city_id int,
      zip_code_id int,
      crossroad1 text,
      crossroad2 text,
      geox double precision,
      geoy double precision,
      beat_id int,
      district_id int,
      sector_id int,
      business text,
      nature_id int,
      priority_id int,
      report_only boolean,
      cancelled boolean,
      time_received timestamp without time zone,
      time_routed timestamp without time zone,
      time_finished timestamp without time zone,
      first_unit_dispatch timestamp without time zone,
      first_unit_enroute timestamp without time zone,
      first_unit_arrive timestamp without time zone,
      first_unit_transport timestamp without time zone,
      last_unit_clear timestamp without time zone,
      time_closed timestamp without time zone,
      overall_response_time interval,
      officer_response_time interval,
      close_code_id int,
      close_comments text,
      
      CONSTRAINT call_pk PRIMARY KEY (call_id),
      
      CONSTRAINT call_source_call_fk
        FOREIGN KEY (call_source_id) REFERENCES call_source (call_source_id),
      CONSTRAINT call_unit_call_primary_unit_fk
        FOREIGN KEY (primary_unit_id) REFERENCES call_unit (call_unit_id),
      CONSTRAINT call_unit_call_first_dispatched_fk
        FOREIGN KEY (first_dispatched_id) REFERENCES call_unit (call_unit_id),
      CONSTRAINT call_unit_call_reporting_unit_fk
        FOREIGN KEY (reporting_unit_id) REFERENCES call_unit (call_unit_id),
      CONSTRAINT city_call_fk
        FOREIGN KEY (city_id) REFERENCES city (city_id),
      CONSTRAINT close_code_call_fk
        FOREIGN KEY (close_code_id) REFERENCES close_code (close_code_id),
      --There is some mismatch here that might be valid; no constraint for now
      --CONSTRAINT incident_call_fk
      --  FOREIGN KEY (case_id) REFERENCES incident (case_id),
      CONSTRAINT nature_call_fk
        FOREIGN KEY (nature_id) REFERENCES nature (nature_id),
      CONSTRAINT beat_call_fk
        FOREIGN KEY (beat_id) REFERENCES beat (beat_id),
      CONSTRAINT district_call_fk
        FOREIGN KEY (district_id) REFERENCES district (district_id),
      CONSTRAINT sector_call_fk
        FOREIGN KEY (sector_id) REFERENCES sector (sector_id),
      CONSTRAINT priority_call_fk
        FOREIGN KEY (priority_id) REFERENCES priority (priority_id),
      CONSTRAINT zip_code_call_fk
        FOREIGN KEY (zip_code_id) REFERENCES zip_code (zip_code_id)
    );
    """)
    
    db.query("""
    CREATE TABLE note_author
    (
      note_author_id serial NOT NULL,
      descr text,
      CONSTRAINT note_author_pk PRIMARY KEY (note_author_id)
    );
    """)
    
    db.query("""
    CREATE TABLE note
    (
      note_id serial NOT NULL,
      body text,
      time_recorded timestamp without time zone,
      note_author_id int,
      call_id bigint,
      CONSTRAINT note_pk PRIMARY KEY (note_id),
      
      CONSTRAINT call_note_fk FOREIGN KEY (call_id) REFERENCES call (call_id),
      CONSTRAINT note_author_note_fk FOREIGN KEY (note_author_id) REFERENCES note_author (note_author_id)
    );
    """)
    
    db.query("""
    CREATE TABLE officer
    (
        officer_id bigint NOT NULL,
        name text,
        name_aka text,
        
        CONSTRAINT officer_pk PRIMARY KEY (officer_id)
    );
    """)
    
    db.query("""
    CREATE TABLE shift
    (
      shift_id bigint NOT NULL,
      
      CONSTRAINT shift_pk PRIMARY KEY (shift_id)
    );
    """)
    
    db.query("""
    CREATE TABLE shift_unit
    (
        shift_unit_id bigint NOT NULL,
        call_unit_id int,
        officer_id int,
        in_time timestamp without time zone,
        out_time timestamp without time zone,
        bureau_id int,
        division_id int,
        unit_id int,
        shift_id bigint,
        
        CONSTRAINT shift_unit_pk PRIMARY KEY (shift_unit_id),
        CONSTRAINT shift_shift_unit_fk FOREIGN KEY (shift_id) REFERENCES shift (shift_id),
        CONSTRAINT call_unit_shift_unit_fk FOREIGN KEY (call_unit_id) REFERENCES call_unit (call_unit_id),
        CONSTRAINT officer_shift_unit_fk FOREIGN KEY (officer_id) REFERENCES officer (officer_id),
        CONSTRAINT bureau_shift_unit_fk FOREIGN KEY (bureau_id) REFERENCES bureau (bureau_id),
        CONSTRAINT division_shift_unit_fk FOREIGN KEY (division_id) REFERENCES division (division_id),
        CONSTRAINT unit_shift_unit_fk FOREIGN KEY (unit_id) REFERENCES unit (unit_id)
    );
    """)

    db.query("""
    CREATE TABLE transaction
    (
      transaction_id serial NOT NULL,
      descr text,
      CONSTRAINT transaction_pk PRIMARY KEY (transaction_id)
    )
    """)
    
    db.query("""
    CREATE TABLE call_log
    (
      call_log_id bigint NOT NULL,
      transaction_id int,
      shift_id bigint,
      time_recorded timestamp without time zone,
      call_id bigint,
      call_unit_id int,
      close_code_id int,
      
      CONSTRAINT call_log_pk PRIMARY KEY (call_log_id),
      
      CONSTRAINT shift_call_log_fk FOREIGN KEY (shift_id) REFERENCES shift (shift_id),
      CONSTRAINT call_unit_call_log_fk FOREIGN KEY (call_unit_id) REFERENCES call_unit (call_unit_id),
      CONSTRAINT call_call_log_fk FOREIGN KEY (call_id) REFERENCES call (call_id),
      CONSTRAINT close_code_call_log_fk FOREIGN KEY (close_code_id) REFERENCES close_code (close_code_id),
      CONSTRAINT transaction_call_log_fk FOREIGN KEY (transaction_id) REFERENCES transaction (transaction_id)
    );
    """)
    
    db.query("""
    CREATE TABLE oos_code
    (
      oos_code_id serial NOT NULL,
      descr text,
      
      CONSTRAINT oos_code_pk PRIMARY KEY (oos_code_id)
    );
    """)
    
    db.query("""
    CREATE TABLE out_of_service
    (
      oos_id bigint NOT NULL,
      call_unit_id int,
      shift_id bigint,
      oos_code_id int,
      location text,
      comments text,
      start_time timestamp without time zone,
      end_time timestamp without time zone,
      duration interval,
      
      CONSTRAINT oos_pk PRIMARY KEY (oos_id),
      
      CONSTRAINT shift_out_of_service_fk FOREIGN KEY (shift_id) REFERENCES shift (shift_id),
      CONSTRAINT call_unit_oos_fk FOREIGN KEY (call_unit_id) REFERENCES call_unit (call_unit_id),
      CONSTRAINT oos_code_oos_fk FOREIGN KEY (oos_code_id) REFERENCES oos_code (oos_code_id)
    );
    """)
      
    
reset_db()

#Small lookup tables
case_status, division, unit, bureau, investigation_status, call_source, and close_code

The nested lookup tables are weapon/weapon_group and premise/premise_group

In [25]:
# There are a million of these, so let's make life easier and reuse all that code

# We need to save the mapping between DPD's short codes and our database ids so we can apply it to the records
# in the main tables
#
# These have the DPD's codes as keys and our internal database PKs as values
case_status_code_mapping = {}
division_code_mapping = {}
unit_code_mapping = {}
bureau_code_mapping = {}
investigation_status_code_mapping = {}
call_source_code_mapping = {}
close_code_mapping = {}
oos_code_mapping = {}

lookup_jobs = [
    {
        "file": "LWMAIN.CSSTATUS.csv",
        "table": "case_status",
        "mapping": {"descriptn": "descr"},
        "code_column": "code_agcy",
        "code_mapping": case_status_code_mapping
    },
    {
        "file": "LWMAIN.EMDIVISION.csv",
        "table": "division",
        "mapping": {"descriptn": "descr"},
        "code_column": "code_agcy",
        "code_mapping": division_code_mapping
    },
    {
        "file": "LWMAIN.EMSECTION.csv",
        "table": "unit",
        "mapping": {"descriptn": "descr"},
        "code_column": "code_agcy",
        "code_mapping": unit_code_mapping
    },
    {
        "file": "LWMAIN.EMUNIT.csv",
        "table": "bureau",
        "mapping": {"descriptn": "descr"},
        "code_column": "code_agcy",
        "code_mapping": bureau_code_mapping
    },
    {
        "file": "LWMAIN.INVSTSTATS.csv",
        "table": "investigation_status",
        "mapping": {"descriptn": "descr"},
        "code_column": "code_agcy",
        "code_mapping": investigation_status_code_mapping
    },
    {
        "file": "inmain.callsource.tsv",
        "table": "call_source",
        "mapping": {"Description": "descr"},
        "code_column": "code_agcy",
        "code_mapping": call_source_code_mapping
    },
    {
        "file": "inmain.closecode.tsv",
        "table": "close_code",
        "mapping": {"Description": "descr"},
        "code_column": "code_agcy",
        "code_mapping": close_code_mapping
    },
    {
        "file": "outserv.oscode.tsv",
        "table": "oos_code",
        "mapping": {"Description": "descr"},
        "code_column": "Code",
        "code_mapping": oos_code_mapping
    }
]

for job in lookup_jobs:
    print("loading %s into %s" % (job['file'], job['table']))
    
    if job['file'].endswith(".csv"):
        data = pd.read_csv("../csv_data/%s" % (job['file']))
    elif job['file'].endswith(".tsv"):
        data = pd.read_csv("../csv_data/%s" % (job['file']), sep='\t')
    
    # Keep track of the ids, as the data is ordered, so these will be the same assigned by the incrementing
    # primary key in the database.
    id_ = 1    
    for (i,row) in data.iterrows():
        job['code_mapping'][row[job['code_column']]] = id_
        id_ += 1

    # Keep only the desired columns
    keep_columns = set(job['mapping'].keys())
    for c in data.columns:
        if c not in keep_columns:
            data = data.drop(c, axis=1)
            
    # Change the column names to the ones we want and insert the data
    data.rename(columns=job['mapping'], inplace=True)
    data.to_sql(job['table'], engine, index=False, if_exists='append')
    
# They neglected to give us this code which is frequently in the database
investigation_status_code_mapping['CBA'] = None

# Some more that are in the db but not the lookup table they gave us
for bogus_code in ('15:13.0', 'SLFIN', 'EYE', 'WALK', '911', 'A'):
    call_source_code_mapping[bogus_code] = None

loading LWMAIN.CSSTATUS.csv into case_status
loading LWMAIN.EMDIVISION.csv into division
loading LWMAIN.EMSECTION.csv into unit
loading LWMAIN.EMUNIT.csv into bureau
loading LWMAIN.INVSTSTATS.csv into investigation_status
loading inmain.callsource.tsv into call_source
loading inmain.closecode.tsv into close_code
loading outserv.oscode.tsv into oos_code


In [26]:
#These have to create "nested" tables and are a little tougher, but we can still reuse the code

# Still need to keep track of the mappings
weapon_code_mapping = {}
premise_code_mapping = {}

nested_lookup_jobs = [
    {
        "file": "LWMAIN.PREMISE.csv",
        "outer_table": "premise",
        "inner_table": "premise_group",
        "outer_cols": ["premise_group_id","descr"],
        "inner_col": "descr",
        "inner_id": "premise_group_id",
        "code_mapping": premise_code_mapping
    },
    {
        "file": "LWMAIN.WEAPON.csv",
        "outer_table": "weapon",
        "inner_table": "weapon_group",
        "outer_cols": ["weapon_group_id","descr"],
        "inner_col": "descr",
        "inner_id": "weapon_group_id",
        "code_mapping": weapon_code_mapping
    }
]

for job in nested_lookup_jobs:
    print("loading %s into %s and %s" % (job['file'], job['outer_table'], job['inner_table']))
    data = pd.read_csv("../csv_data/%s" % (job['file']))
    
    # load the group table by getting all the unique groups
    inner_data = data['descriptn_a'].drop_duplicates()
    inner_data.name = job['inner_col']
    inner_data.to_sql(job['inner_table'], engine, index=False, if_exists='append')
    
    # Learn the mapping between groups and group_ids in the database so we can insert the proper
    # group_ids with the outer tables
    groups = {}
    for row in db.query("SELECT * FROM %s" % (job['inner_table'])):
        groups[row[job['inner_col']]] = row[job['inner_id']]
       
    # Figure out what the database ids will be, so we can convert DPD's columns to the database ids in the
    # main table load
    id_ = 1
    for (i,row) in data.iterrows():
        job['code_mapping'][row['code_agcy']] = id_
        id_ += 1
    
    # Concatenate and rename the series we want
    outer_data = pd.concat([data['descriptn_a'], data['descriptn_b']], axis=1, keys=job['outer_cols'])
    
    # use the groups mapping to turn group names into ids from our database
    outer_data[job['inner_id']] = outer_data[job['inner_id']].map(lambda x: groups[x])
    
    # Store the records
    outer_data.to_sql(job['outer_table'], engine, index=False, if_exists='append')

loading LWMAIN.PREMISE.csv into premise and premise_group
loading LWMAIN.WEAPON.csv into weapon and weapon_group


#cfs_2014_lwmain.csv

###Lookup tables
city, beat, district, sector, and ucr_descr

In [27]:
chunksize=20000

def safe_strip(str_):
    try:
        return str_.strip()
    except AttributeError:
        return str_

print("loading lookup tables")

import csv

cities = set()
beats = set()
districts = set()
sectors = set()
zip_codes = set()
ucr_descr_pairs = {}

with open('../csv_data/cfs_2014_lwmain.csv', 'r', encoding='ISO-8859-1') as f:
    reader = csv.reader(f)
    first_row = True
    for row in reader:
        if first_row:
            header = row
            first_row = False
            continue
            
        # Strip whitespace and convert empty strings to None
        row = list(map(lambda x: x if x else None, map(safe_strip, row)))

        if row[9]:
            cities.add(row[9])
        if row[10]:
            zip_codes.add(row[10])
        if row[13]:
            beats.add(row[13])
        if row[14]:
            districts.add(row[14])
        if row[15]:
            sectors.add(row[15])
            
        #ucr_code: 30, ucr_short_descr: 31, ucr_long_descr: 32
        if (row[31],row[32]) not in ucr_descr_pairs:
            ucr_descr_pairs[(row[31],row[32])] = row[30]

try:
    for set_, table in ((cities, 'city'),
                        (beats, 'beat'),
                        (districts, 'district'),
                        (sectors, 'sector'),
                        (zip_codes, 'zip_code')):
    
        table_ref = db[table]
        db.begin()
        for s in set_:
            table_ref.insert({'descr': s})
        db.commit()
    
    ucr_code = db['ucr_code']
    for u in set(ucr_descr_pairs.values()):
        ucr_code.insert({'descr': u})
    
    ucr_code_mapping = {}
    
    for row in db.query("SELECT * FROM ucr_code;"):
        ucr_code_mapping[row['descr']] = row['ucr_code_id']
        
    ucr_descr = db['ucr_descr']
    for pair in ucr_descr_pairs.keys():
        ucr_descr.insert({'short_descr': pair[0], 'long_descr': pair[1],
                          'ucr_code_id': ucr_code_mapping[ucr_descr_pairs[pair]] })
    
except Exception as e:
    db.rollback()
    raise e

print("lookup tables loaded")

loading lookup tables
lookup tables loaded


###Main table
incident

In [28]:
import csv    

def combine_date_time(str_date, str_time):
    date = dt.datetime.strptime(str_date, "%m/%d/%y")
    time = dt.datetime.strptime(str_time, "%I:%M %p")
    return dt.datetime(date.year, date.month, date.day, time.hour, time.minute)

# We'll use this to ensure we either map to a value of the foreign key or null
def safe_map(m,d):
    return m[d] if d else d

# We have several columns we need to convert to int that can also be None
def safe_int(x):
    return int(x) if x else None

att_com_mapping = {
    'COM': True,
    'ATT': False,
    '': None
}

city_code_mapping = {}
beat_code_mapping = {}
district_code_mapping = {}
sector_code_mapping = {}
zip_code_mapping = {}
ucr_descr_code_mapping = {}

# Populate the mappings from the database
for row in db.query("SELECT * FROM city;"):
    city_code_mapping[row['descr']] = row['city_id']
    
for row in db.query("SELECT * FROM beat;"):
    beat_code_mapping[row['descr']] = row['beat_id']
    
for row in db.query("SELECT * FROM district;"):
    district_code_mapping[row['descr']] = row['district_id']
    
for row in db.query("SELECT * FROM sector;"):
    sector_code_mapping[row['descr']] = row['sector_id']

for row in db.query("SELECT * FROM zip_code;"):
    zip_code_mapping[row['descr']] = row['zip_code_id']

# the pairs of short/long_descr are unique, so that needs to be our key
for row in db.query("SELECT * FROM ucr_descr;"):
    ucr_descr_code_mapping[(row['short_descr'], row['long_descr'])] = row['ucr_descr_id']

start = dt.datetime.now()
j = 0

incident = db['incident']
db.begin()

try:
    with open('../csv_data/cfs_2014_lwmain.csv', 'r') as f:
        reader = csv.reader(f)
        first_row = True
        for row in reader:
            if first_row:
                header = row
                first_row = False
                continue
            
            #for i in range(len(header)):
            #    print(i, header[i])
            
            # Strip whitespace and convert empty strings to None
            row = list(map(lambda x: x if x else None, map(safe_strip, row)))
            time_filed = combine_date_time(row[2], row[3])
            db_row = {
                'incident_id': safe_int(row[0]),
                'case_id': safe_int(row[1]),
                'time_filed': time_filed,
                'year_filed': time_filed.year,
                'month_filed': time_filed.month,
                'week_filed': time_filed.isocalendar()[1],
                'dow_filed': time_filed.weekday(),
                'street_num': row[7],
                'street_name': row[8],
                'city_id': safe_map(city_code_mapping, row[9]),
                'zip_code_id': safe_map(zip_code_mapping, row[10]),
                'geox': row[11],
                'geoy': row[12],
                'beat_id': safe_map(beat_code_mapping, row[13]),
                'district_id': safe_map(district_code_mapping, row[14]),
                'sector_id': safe_map(sector_code_mapping, row[15]),
                'premise_id': safe_map(premise_code_mapping, safe_int(row[16])),
                'weapon_id': safe_map(weapon_code_mapping, safe_int(row[17])),
                'domestic': True if row[18]=='Y' else False if row[18]=='N' else None,
                'juvenile': True if row[19]=='Y' else False if row[19]=='N' else None,
                'gang_related': True if row[20]=='YES' else False if row[20]=='NO' else None,
                'emp_bureau_id': safe_map(bureau_code_mapping, row[21]),
                'emp_division_id': safe_map(division_code_mapping, row[22]),
                'emp_unit_id': safe_map(unit_code_mapping, row[23]),
                'num_officers': (lambda x: None if x in ('',None) else safe_int(x))(row[24]),
                'investigation_status_id': safe_map(investigation_status_code_mapping,row[25]),
                'investigator_unit_id': safe_map(unit_code_mapping, row[26]),
                'case_status_id': safe_map(case_status_code_mapping, safe_int(row[27])),
                'ucr_descr_id': safe_map(ucr_descr_code_mapping, (row[31],row[32])),
                'committed': safe_map(att_com_mapping, row[33])
            }
            
            try:
                # have to insert one by one to properly handle the duplicate PKs in the data
                db.query("SAVEPOINT integrity_checkpoint;")
                incident.insert(db_row, ensure=False) # we know the right columns are already there
            except IntegrityError:
                #ignore the duplicate pks; the lower chrgid comes first, so we already have the record we want
                #postgres complains if we keep inserting records into an aborted transaction
                db.query("ROLLBACK TO SAVEPOINT integrity_checkpoint;")
                
            db.query("RELEASE SAVEPOINT integrity_checkpoint;")
            
            j+=1
            if j % 10000 == 0:
                print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j))
            
            
    db.commit()
            
except Exception as e:
    db.rollback()
    raise e

14 seconds: completed 10000 rows
29 seconds: completed 20000 rows


#cfs_2014_lwmodop.csv

###Lookup tables
mo_group and mo_item

In [29]:
def safe_strip(str_):
    try:
        return str_.strip()
    except AttributeError:
        return str_
    
mo_items_pairs = []

print("loading lookup tables")

import csv

with open('../csv_data/cfs_2014_lwmodop.csv', 'r') as f:
    reader = csv.reader(f)
    first_row = True
    for row in reader:
        if first_row:
            header = row
            first_row = False
            continue


        # Strip whitespace and convert empty strings to None
        row = list(map(lambda x: x if x else None, map(safe_strip, row)))

        if (row[5], row[3]) not in mo_items_pairs:
            # (mo_item, mo_group)
            mo_items_pairs.append((row[5], row[3]))
            
try:
    mo_group = db['mo_group']
    db.begin()
    for g in set([p[1] for p in mo_items_pairs]):
        mo_group.insert({'descr': g})
    
    mo_group_mapping = {}
    for row in db.query("SELECT * FROM mo_group;"):
        mo_group_mapping[row['descr']] = row['mo_group_id']
        
    mo_item = db['mo_item']
    for i, g in mo_items_pairs:
        mo_item.insert({'descr': i, 'mo_group_id': mo_group_mapping[g] })
        
    db.commit()
except Exception as e:
    db.rollback()
    raise e
    
print("lookup tables loaded")

loading lookup tables
lookup tables loaded


###Main table
incident_mo_item

In [30]:
import csv    

# We'll use this to ensure we either map to a value of the foreign key or null
def safe_map(m,d):
    return m[d] if d else d

def safe_strip(str_):
    try:
        return str_.strip()
    except AttributeError:
        return str_

# Populate the mappings from the database
mo_item_code_mapping = {}

# the pairs of group/item_descr are unique, so that needs to be our key
for row in db.query("SELECT * FROM mo_item;"):
    mo_item_code_mapping[(row['descr'])] = row['mo_item_id']

start = dt.datetime.now()
j = 0

mo = db['incident_mo_item']
db_rows = []
db.begin()

try:
    with open('../csv_data/cfs_2014_lwmodop.csv', 'r') as f:
        reader = csv.reader(f)
        first_row = True
        for row in reader:
            if first_row:
                header = row
                first_row = False
                continue
            
            #for i in range(len(header)):
            #    print(i, header[i])
            
            # Strip whitespace and convert empty strings to None
            row = list(map(lambda x: x if x else None, map(safe_strip, row)))
            db_row = {
                'incident_id': row[0],
                'incident_mo_item_id': row[1],
                'mo_item_id': safe_map(mo_item_code_mapping, row[5])
            }
            
            # have to insert one by one to properly handle the duplicate PKs in the data
            mo.insert(db_row, ensure=False) # we know the right columns are already there
            
            j+=1
            if j % 10000 == 0:
                print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j))    
            
    db.commit()
            
except Exception as e:
    db.rollback()
    raise e

3 seconds: completed 10000 rows
7 seconds: completed 20000 rows
11 seconds: completed 30000 rows
15 seconds: completed 40000 rows


#cfs_2014_inmain.csv

###Lookup tables
We did close_code and call_source earlier with the other tables that come directly from a .csv file.

We'll need to do a full load of nature and call_unit, and we need to update city with the new cities in this file even though we loaded it previously.  Also need to do a pass through all the notes to get the note_authors.

Also update beat, district, sector, and zip_code.

In [31]:
import re

def safe_strip(str_):
    try:
        return str_.strip()
    except AttributeError:
        return str_
    
timestamp_expr = re.compile("(.*?)\[(\d{2}/\d{2}/(?:\d{2}|\d{4}) \d{2}:\d{2}:\d{2}) (.*?)\]")

def split_notes(notes):
    """
    Return a list of tuples.  Each tuple represents a single note and contains the corresponding call_id,
    the timestamp, the note-taker, and the text of the note.
    """
    tuples = []
    regex_split = re.findall(timestamp_expr, notes)
    for tup in regex_split:
        text = tup[0].split()
        text = text if text else None  # turn blanks into null
        try:
            timestamp = dt.datetime.strptime(tup[1], "%m/%d/%y %H:%M:%S")
        except ValueError: # 4 digit year
            timestamp = dt.datetime.strptime(tup[1], "%m/%d/%Y %H:%M:%S")
        author = tup[2] if tup[2] else None
        tuples.append((text, timestamp, author))
    return tuples
    
natures = set()
call_units = set()
priorities = set()
note_authors = set()

cities = set()
db_cities = set()
# we already have most of the cities/beats/districts/sectors from the incident data, but there are more
for row in db.query("SELECT * FROM city;"):
    db_cities.add(row['descr'])
    
beats = set()
db_beats = set()
for row in db.query("SELECT * FROM beat;"):
    db_beats.add(row['descr'])
    
districts = set()
db_districts = set()
for row in db.query("SELECT * FROM district;"):
    db_districts.add(row['descr'])
    
sectors = set()
db_sectors = set()
for row in db.query("SELECT * FROM sector;"):
    db_sectors.add(row['descr'])
    
zip_codes = set()
db_zip_codes = set()
for row in db.query("SELECT * FROM zip_code;"):
    db_zip_codes.add(row['descr'])

print("loading lookup tables")

import csv

with open('../csv_data/cfs_2014_inmain.csv', 'r', encoding='ISO-8859-1') as f:
    reader = csv.reader(f)
    first_row = True
    for row in reader:
        if first_row:
            header = row
            first_row = False
            continue
            
        # Strip whitespace and convert empty strings to None
        row = list(map(lambda x: x if x else None, map(safe_strip, row)))

        if row[23]:
            natures.add(row[23])
        for d in (row[5], row[6], row[50]): # these all reference the call_unit table
            call_units.add(d)
        if row[27]:
            for note in split_notes(row[27]):
                if note[2] is not None:
                    note_authors.add(note[2])
        if row[10] and row[10] not in db_cities:
            cities.add(row[10])
            
        if row[11] and row[11] not in db_zip_codes:
            zip_codes.add(row[11])
            
        if row[18] and row[18] not in db_beats:
            beats.add(row[18])
            
        if row[19] and row[19] not in db_districts:
            districts.add(row[19])
            
        if row[20] and row[20] not in db_sectors:
            sectors.add(row[20])
        
        if row[24]:
            priorities.add(row[24])
            
try:
    for set_, table in ((natures, 'nature'),
                        (priorities, 'priority'),
                        (call_units, 'call_unit'),
                        (note_authors, 'note_author'),
                        (cities, 'city'),
                        (beats, 'beat'),
                        (districts, 'district'),
                        (sectors, 'sector'),
                        (zip_codes, 'zip_code')):
        table_ref = db[table]
        db.begin()
        for s in set_:
            table_ref.insert({'descr': s})
        db.commit()
    
except Exception as e:
    db.rollback()
    raise e
    
print("lookup tables loaded")

loading lookup tables
lookup tables loaded


###Main table
call

In [32]:
import csv
import re

timestamp_expr = re.compile("(.*?)\[(\d{2}/\d{2}/(?:\d{2}|\d{4}) \d{2}:\d{2}:\d{2}) (.*?)\]")

# We'll use this to ensure we either map to a value of the foreign key or null
def safe_map(m,d):
    return m[d] if d else None

# We have several columns we need to convert to int that can also be None
def safe_int(x):
    return int(x) if x else None

def safe_float(x):
    return float(x) if x else None

def safe_bool(x):
    return True if x == '1' else False if x == '0' else None

def safe_datetime(x):
    # to_datetime returns a pandas Timestamp object, and we want a vanilla datetime
    return pd.to_datetime(x).to_datetime() if x not in ('NULL', None) else None

def clean_case_id(c):
    if c:
        c = str(c).replace('-','').replace(' ','')
        try:
            return int(c)
        except ValueError: #got some weird rows with non-digits in the case_id that def. won't map back to incident
            return None
    return None

def split_notes(notes):
    """
    Return a list of tuples.  Each tuple represents a single note and contains the corresponding call_id,
    the timestamp, the note-taker, and the text of the note.
    """
    tuples = []
    if notes is None:
        return []
    regex_split = re.findall(timestamp_expr, notes)
    for tup in regex_split:
        text = tup[0].strip()
        text = text if text else None  # turn blanks into null
        try:
            timestamp = dt.datetime.strptime(tup[1], "%m/%d/%y %H:%M:%S")
        except ValueError: # 4 digit year
            timestamp = dt.datetime.strptime(tup[1], "%m/%d/%Y %H:%M:%S")
        author = tup[2]
        tuples.append((text, timestamp, author))
    return tuples

nature_code_mapping = {}
call_unit_code_mapping = {}
note_author_mapping = {}
city_code_mapping = {}
beat_mapping = {}
district_mapping = {}
sector_mapping = {}
zip_code_mapping = {}
priority_mapping = {}

# Populate the mappings from the database
for row in db.query("SELECT * FROM nature;"):
    nature_code_mapping[row['descr']] = row['nature_id']

for row in db.query("SELECT * FROM call_unit;"):
    call_unit_code_mapping[row['descr']] = row['call_unit_id']
    
for row in db.query("SELECT * FROM note_author;"):
    note_author_mapping[row['descr']] = row['note_author_id']
    
for row in db.query("SELECT * FROM city;"):
    city_code_mapping[row['descr']] = row['city_id']
    
for row in db.query("SELECT * FROM beat;"):
    beat_mapping[row['descr']] = row['beat_id']
    
for row in db.query("SELECT * FROM district;"):
    district_mapping[row['descr']] = row['district_id']
    
for row in db.query("SELECT * FROM sector;"):
    sector_mapping[row['descr']] = row['sector_id']
    
for row in db.query("SELECT * FROM zip_code;"):
    zip_code_mapping[row['descr']] = row['zip_code_id']
    
for row in db.query("SELECT * FROM priority;"):
    priority_mapping[row['descr']] = row['priority_id']

start = dt.datetime.now()
j = 0

note_authors_set = set()

call_rows = []
note_rows = []
call = db['call']
note = db['note']
db.begin()

try:
    with open('../csv_data/cfs_2014_inmain.csv', 'r', encoding='ISO-8859-1') as f:
        reader = csv.reader(f)
        first_row = True
        for row in reader:
            if first_row:
                header = row
                first_row = False
                continue
            
            #for i in range(len(header)):
            #    print(i, header[i])
            
            # Strip whitespace and convert empty strings to None
            row = list(map(lambda x: x if x else None, map(safe_strip, row)))
            time_received = safe_datetime(row[1])  # we need a datetime, not a pandas timestamp
            first_unit_dispatch = safe_datetime(row[32])
            first_unit_arrive = safe_datetime(row[39])
            if first_unit_arrive is not None and time_received is not None:
                overall_response_time = first_unit_arrive - time_received
            else:
                overall_response_time = None
                
            if first_unit_arrive is not None and first_unit_dispatch is not None:
                officer_response_time = first_unit_arrive - first_unit_dispatch
            else:
                officer_response_time = None
            call_id = safe_int(row[0]) # going to be using this again with the notes
            db_row = {
                'call_id': call_id,
                'time_received': time_received,
                'year_received': time_received.year,
                'hour_received': time_received.hour,
                'month_received': time_received.month,
                'week_received': time_received.isocalendar()[1],
                'dow_received': time_received.weekday(),
                'case_id': clean_case_id(row[3]),
                'call_source_id': safe_map(call_source_code_mapping, row[4]),
                'primary_unit_id': safe_map(call_unit_code_mapping, row[5]),
                'first_dispatched_id': safe_map(call_unit_code_mapping, row[6]),
                'reporting_unit_id': safe_map(call_unit_code_mapping, row[50]),
                'street_num': safe_int(row[7]),
                'street_name': row[8],
                'city_id': safe_map(city_code_mapping, row[10]),
                'zip_code_id': safe_map(zip_code_mapping, row[11]),
                'crossroad1': row[12],
                'crossroad2': row[13],
                'geox': safe_float(row[14]),
                'geoy': safe_float(row[15]),
                'beat_id': safe_map(beat_mapping, row[18]),
                'district_id': safe_map(district_mapping, row[19]),
                'sector_id': safe_map(sector_mapping, row[20]),
                'business': row[21],
                'nature_id': safe_map(nature_code_mapping, row[23]),
                'priority_id': safe_map(priority_mapping, row[24]),
                'report_only': safe_bool(row[25]),
                'cancelled': safe_bool(row[26]),
                'time_routed': safe_datetime(row[28]),
                'time_finished': safe_datetime(row[30]),
                'first_unit_dispatch': safe_datetime(row[32]),
                'first_unit_enroute': safe_datetime(row[36]),
                'first_unit_arrive': first_unit_arrive,
                'first_unit_transport': safe_datetime(row[42]),
                'last_unit_clear': safe_datetime(row[45]),
                'time_closed': safe_datetime(row[49]),
                'overall_response_time': overall_response_time,
                'officer_response_time': officer_response_time,
                'close_code_id': safe_map(close_code_mapping, row[51]),
                'close_comments': row[52]
            }
            notes = split_notes(row[27])
            
            #try:
#               have to insert one by one to properly handle the duplicate PKs in the data
#               db.query("SAVEPOINT integrity_checkpoint;")
#               call.insert(db_row, ensure=False) # we know the right columns are already there
            call_rows.append(db_row)
            
            for n in notes:
                note_author_mapped = safe_map(note_author_mapping, n[2])
                note_db_row = {'body': n[0],
                             'time_recorded': n[1],
                             'call_id': call_id,
                             'note_author_id': note_author_mapped}
                #note.insert(note_db_row, ensure=False)
                note_rows.append(note_db_row)
            #except IntegrityError:
                # we seem to be missing some incident data
                #print("insert of call_id %d failed due to integrity error" % (call_id))
                #db.query("ROLLBACK TO SAVEPOINT integrity_checkpoint;")
                
            #db.query("RELEASE SAVEPOINT integrity_checkpoint;")
            
            

            j+=1
            if j % 10000 == 0:
                call.insert_many(call_rows, chunk_size=10000, ensure=False)
                note.insert_many(note_rows, chunk_size=10000, ensure=False)
                call_rows = []
                note_rows = []
                print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j))
            
    call.insert_many(call_rows, ensure=False)
    note.insert_many(note_rows, ensure=False)
    db.commit()
            
except Exception as e:
    db.rollback()
    raise e

21 seconds: completed 10000 rows
43 seconds: completed 20000 rows
64 seconds: completed 30000 rows
86 seconds: completed 40000 rows
107 seconds: completed 50000 rows
129 seconds: completed 60000 rows
151 seconds: completed 70000 rows
173 seconds: completed 80000 rows
195 seconds: completed 90000 rows
217 seconds: completed 100000 rows
239 seconds: completed 110000 rows
262 seconds: completed 120000 rows
284 seconds: completed 130000 rows
307 seconds: completed 140000 rows
330 seconds: completed 150000 rows
353 seconds: completed 160000 rows
376 seconds: completed 170000 rows
399 seconds: completed 180000 rows
421 seconds: completed 190000 rows
444 seconds: completed 200000 rows
467 seconds: completed 210000 rows
489 seconds: completed 220000 rows
512 seconds: completed 230000 rows
534 seconds: completed 240000 rows
556 seconds: completed 250000 rows
579 seconds: completed 260000 rows
601 seconds: completed 270000 rows
623 seconds: completed 280000 rows
646 seconds: completed 290000 row

#cfs_2014_unitper.csv

##Lookup tables
Need to update call_unit and load shift and officer.

In [33]:
import csv

def safe_strip(str_):
    try:
        return str_.strip()
    except AttributeError:
        return str_
    
officers = {}
shifts = set()

call_units = set()
db_call_units = set()
# we already have most of the call_units from the call data, but there are more
for row in db.query("SELECT * FROM call_unit;"):
    db_call_units.add(row['descr'])

print("loading lookup tables")

with open('../csv_data/cfs_2014_unitper.csv', 'r', encoding='ISO-8859-1') as f:
    reader = csv.reader(f)
    first_row = True
    for row in reader:
        
        if first_row:
            header = row
            first_row = False
            continue

        # Strip whitespace and convert empty strings to None
        row = list(map(lambda x: x if x else None, map(safe_strip, row)))

        if row[1]:
            shifts.add(row[1])
        
        if row[2] and row[2] not in db_call_units:
            call_units.add(row[2])
        
        cur_id = row[3]
        # Clean the officer's name
        cur_name = ','.join([t.strip() for t in row[4].split(',')]) if row[4] else ''
        
        if cur_id not in officers:
            if cur_name.isdigit():
                officers[cur_id] = {'name_aka': [cur_name]}
            else:
                officers[cur_id] = {'name': cur_name, 'name_aka': []}
        else:
            #if cur_name in officers[cur_id]['name_aka'] and cur_name == officers[cur_id]['name']:
            #    print(cur_name, officers[cur_id]['name'], officers[cur_id]['name_aka'])
                
            if ('name' in officers[cur_id] or cur_name.isdigit())   \
                and cur_name not in officers[cur_id]['name_aka']    \
                and cur_name != officers[cur_id]['name']            \
                and cur_name is not None:
                    officers[cur_id]['name_aka'].append(cur_name)
            elif 'name' not in officers[cur_id] and not cur_name.isdigit():
                officers[cur_id]['name'] = cur_name

try:
    call_unit = db['call_unit']
    db.begin()
    for c in call_units:
        call_unit.insert({'descr': c})
        
    officer = db['officer']
    for o_id in officers.keys():
        if 'name_aka' in officers[o_id]:
            db_rec = {
                'officer_id': o_id,
                'name': officers[o_id]['name'],
                'name_aka': ','.join([ '"' + n + '"' for n in officers[o_id]['name_aka'] ])
            }
        else:
            db_rec = {
                'officer_id': o_id,
                'name': officers[o_id]['name']
            }
        officer.insert(db_rec)
        
    shift = db['shift']
    for s in shifts:
        shift.insert({'shift_id': s})
        
    db.commit()
except Exception as e:
    db.rollback()
    raise e

print("lookup tables loaded")

loading lookup tables
lookup tables loaded


###Main table
shift

In [34]:
import csv

def safe_strip(str_):
    try:
        return str_.strip()
    except AttributeError:
        return str_
    
def safe_map(m,d):
    return m[d] if d else None    

def safe_int(x):
    return int(x) if x else None

def safe_datetime(x):
    # to_datetime returns a pandas Timestamp object, and we want a vanilla datetime
    return pd.to_datetime(x).to_datetime() if x not in ('NULL', None) else None

# Populate the mappings from the database
call_unit_mapping = {}

for row in db.query("SELECT * FROM call_unit;"):
    call_unit_mapping[row['descr']] = row['call_unit_id']
    
start = dt.datetime.now()
j = 0

shift_unit = db['shift_unit']
db_rows = []
db.begin()

try:
    with open('../csv_data/cfs_2014_unitper.csv', 'r') as f:
        reader = csv.reader(f)
        first_row = True
        for row in reader:
            if first_row:
                header = row
                first_row = False
                continue
            
            # Strip whitespace and convert empty strings to None
            row = list(map(lambda x: x if x else None, map(safe_strip, row)))
            
            db_row = {
                'shift_unit_id': safe_int(row[0]),
                'shift_id': safe_int(row[1]),
                'call_unit_id': safe_map(call_unit_mapping, row[2]),
                'officer_id': safe_int(row[3]),
                'in_time': safe_datetime(row[6]),
                'out_time': safe_datetime(row[7]),
                'bureau_id': safe_map(bureau_code_mapping, row[8]),
                'division_id': safe_map(division_code_mapping, row[9]),
                'unit_id': safe_map(unit_code_mapping, row[10])
            }
            
            # have to insert one by one to properly handle the duplicate PKs in the data
            shift_unit.insert(db_row, ensure=False) # we know the right columns are already there
            
            j+=1
            if j % 10000 == 0:
                print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j))    
            
    db.commit()
            
except Exception as e:
    db.rollback()
    raise e

9 seconds: completed 10000 rows
18 seconds: completed 20000 rows
27 seconds: completed 30000 rows
37 seconds: completed 40000 rows
46 seconds: completed 50000 rows
55 seconds: completed 60000 rows
65 seconds: completed 70000 rows


#cfs_xxx2014_incilog.csv

###Lookup tables
Need to update call_unit and do a full load of transaction.

In [35]:
import csv

months = ("jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec")

def safe_strip(str_):
    try:
        return str_.strip()
    except AttributeError:
        return str_

transactions = set()

call_units = set()
db_call_units = set()
# we already have most of the call_units from the call data, but there are more
for row in db.query("SELECT * FROM call_unit;"):
    db_call_units.add(row['descr'])

print("loading lookup tables")

for month in months:  

    with open('../csv_data/cfs_%s2014_incilog.csv' % (month), 'r', encoding='ISO-8859-1') as f:
        reader = csv.reader(f)
        first_row = True
        for row in reader:
            if first_row:
                header = row
                first_row = False
                continue

            # Strip whitespace and convert empty strings to None
            row = list(map(lambda x: x if x else None, map(safe_strip, row)))

            if row[2]:
                transactions.add(row[2])
            
            if row[6] and row[6] not in db_call_units:
                call_units.add(row[6])

try:
    call_unit = db['call_unit']
    db.begin()
    for c in call_units:
        call_unit.insert({'descr': c})
    db.commit()

    transaction = db['transaction']
    db.begin()
    for t in transactions:
        transaction.insert({'descr': t})
    db.commit()
except Exception as e:
    db.rollback()
    raise e

print("lookup tables loaded")

loading lookup tables
lookup tables loaded


###Main table
call_log

In [36]:
import csv

months = ("jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec")

# We'll use this to ensure we either map to a value of the foreign key or null
def safe_map(m,d):
    return m[d] if d else d

def safe_strip(str_):
    try:
        return str_.strip()
    except AttributeError:
        return str_
    
def safe_int(x):
    return int(x) if x else None

def safe_datetime(x):
    # to_datetime returns a pandas Timestamp object, and we want a vanilla datetime
    return pd.to_datetime(x).to_datetime() if x not in ('NULL', None) else None

# Populate the mappings from the database
call_unit_mapping = {}
transaction_code_mapping = {}

for row in db.query("SELECT * FROM call_unit;"):
    call_unit_mapping[row['descr']] = row['call_unit_id']
    
for row in db.query("SELECT * FROM transaction;"):
    transaction_code_mapping[row['descr']] = row['transaction_id']
    
# We have fire and EMS call_log data, which we don't have calls for.  We need to ignore this data.
valid_call_ids = set()
for row in db.query("SELECT call_id FROM call;"):
    valid_call_ids.add(row['call_id'])
    
# Same deal for shifts, but in this case, we'll just set the shift_ids to null in the offending records instead of
# dropping them.
valid_shift_ids = set()
for row in db.query("SELECT shift_id FROM shift;"):
    valid_shift_ids.add(row['shift_id'])

start = dt.datetime.now()
j = 0

call_log = db['call_log']
db_rows = []
db.begin()

try:
    for month in months:
        print("loading data for %s" % (month))
        with open('../csv_data/cfs_%s2014_incilog.csv' % (month), 'r') as f:
            reader = csv.reader(f)
            first_row = True
            for row in reader:
                if first_row:
                    header = row
                    first_row = False
                    continue

                #for i in range(len(header)):
                #    print(i, header[i])

                # Strip whitespace and convert empty strings to None
                row = list(map(lambda x: x if x else None, map(safe_strip, row)))
                
                call_id = safe_int(row[5])
                shift_id = safe_int(row[8])
                if call_id in valid_call_ids:
                    db_rows.append({
                        'call_log_id': safe_int(row[0]),
                        'transaction_id': safe_map(transaction_code_mapping, row[2]),
                        'time_recorded': safe_datetime(row[3]),
                        'call_id': call_id,
                        'call_unit_id': safe_map(call_unit_mapping, row[6]),
                        'shift_id': shift_id if shift_id in valid_shift_ids else None,
                        'close_code_id': safe_map(close_code_mapping, row[9])
                    })
                    j+=1
                    if j % 10000 == 0:
                        call_log.insert_many(db_rows, chunk_size=10000, ensure=False)
                        db_rows=[]
                        print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j))
    
    call_log.insert_many(db_rows, ensure=False)
    db.commit()
            
except Exception as e:
    db.rollback()
    raise e

loading data for jan
3 seconds: completed 10000 rows
7 seconds: completed 20000 rows
11 seconds: completed 30000 rows
14 seconds: completed 40000 rows
18 seconds: completed 50000 rows
22 seconds: completed 60000 rows
25 seconds: completed 70000 rows
29 seconds: completed 80000 rows
33 seconds: completed 90000 rows
36 seconds: completed 100000 rows
40 seconds: completed 110000 rows
44 seconds: completed 120000 rows
47 seconds: completed 130000 rows
51 seconds: completed 140000 rows
55 seconds: completed 150000 rows
59 seconds: completed 160000 rows
62 seconds: completed 170000 rows
loading data for feb
66 seconds: completed 180000 rows
70 seconds: completed 190000 rows
73 seconds: completed 200000 rows
77 seconds: completed 210000 rows
81 seconds: completed 220000 rows
84 seconds: completed 230000 rows
88 seconds: completed 240000 rows
92 seconds: completed 250000 rows
95 seconds: completed 260000 rows
99 seconds: completed 270000 rows
103 seconds: completed 280000 rows
106 seconds: com

#cfs_2014_outserv.csv

##Lookup tables
Need to update call_unit.

In [37]:
import csv

call_units = set()
db_call_units = set()
for row in db.query("SELECT * FROM call_unit;"):
    db_call_units.add(row['descr'])
    
print("loading lookup tables")

with open('../csv_data/cfs_2014_outserv.csv', 'r', encoding='ISO-8859-1') as f:
    reader = csv.reader(f)
    first_row = True
    for row in reader:
        if first_row:
            header = row
            first_row = False
            continue

        # Strip whitespace and convert empty strings to None
        row = list(map(lambda x: x if x else None, map(safe_strip, row)))

        if row[1] and row[1] not in db_call_units:
            call_units.add(row[1])

try:
    call_unit = db['call_unit']
    db.begin()
    for c in call_units:
        call_unit.insert({'descr': c})
    db.commit()
except Exception as e:
    db.rollback()
    raise e

print("lookup tables loaded")

loading lookup tables
lookup tables loaded


##Main table

In [38]:
import csv

# We have several columns we need to convert to int that can also be None
def safe_int(x):
    return int(x) if x else None

def safe_datetime(x):
    # to_datetime returns a pandas Timestamp object, and we want a vanilla datetime
    return pd.to_datetime(x).to_datetime() if x not in ('NULL', None) else None

# We'll use this to ensure we either map to a value of the foreign key or null
def safe_map(m,d):
    return m[d] if d else None

call_unit_mapping = {}
    
for row in db.query("SELECT * FROM call_unit;"):
    call_unit_mapping[row['descr']] = row['call_unit_id']
    
# Set shift_ids to null if they violate the fk constraint
valid_shift_ids = set()
for row in db.query("SELECT shift_id FROM shift;"):
    valid_shift_ids.add(row['shift_id'])
    
start = dt.datetime.now()
j = 0

oos_rows = []

oos = db['out_of_service']

try:
    with open('../csv_data/cfs_2014_outserv.csv', 'r', encoding='ISO-8859-1') as f:
        reader = csv.reader(f)
        first_row = True
        for row in reader:
            if first_row:
                header = row
                first_row = False
                continue
            
            # Strip whitespace and convert empty strings to None
            row = list(map(lambda x: x if x else None, map(safe_strip, row)))
            shift_id = safe_int(row[8])
            db_row = {
                'oos_id': safe_int(row[0]),
                'call_unit_id': safe_map(call_unit_mapping, row[1]),
                'oos_code_id': safe_map(oos_code_mapping, row[2]),
                'location': row[3],
                'comments': row[4],
                'start_time': safe_datetime(row[5]),
                'end_time': safe_datetime(row[6]),
                'duration': safe_datetime(row[6]) - safe_datetime(row[5]),
                'shift_id': shift_id if shift_id in valid_shift_ids else None
            }
            oos_rows.append(db_row)
            
            j+=1
            if j % 10000 == 0:
                oos.insert_many(oos_rows, chunk_size=10000, ensure=False)
                oos_rows = []
                print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j))
    oos.insert_many(oos_rows, ensure=False)
    db.commit()
            
except Exception as e:
    db.rollback()
    raise e

7 seconds: completed 10000 rows
14 seconds: completed 20000 rows
21 seconds: completed 30000 rows
27 seconds: completed 40000 rows
34 seconds: completed 50000 rows
41 seconds: completed 60000 rows
48 seconds: completed 70000 rows


#Other tables
##squad

In [39]:
call_unit_squad_regexes = {
    'A': '^A[1-5][0-9]{2}$',
    'B': '^B[1-5][0-9]{2}$',
    'C': '^C[1-5][0-9]{2}$',
    'D': '^D[1-5][0-9]{2}$',
    'BIKE': '^L5[0-9]{2}$',
    'HEAT': '^H[1-4][0-9]{2}$',
    'K9': '^K[0-9]{2}$',
    'MOTORS': '^MTR[2-8]$',
    'TACT': '^T[2-8]$',
    'VIR': '^ED6[0-6]$'
}

squad_table = db['squad']

for squad in call_unit_squad_regexes.keys():
    squad_table.insert({'descr': squad})

squad_mapping = {}
for row in db.query("SELECT * FROM squad;"):
    squad_mapping[row['descr']] = row['squad_id']

for squad, re in call_unit_squad_regexes.items():
    db.query("""
    UPDATE call_unit
    SET squad_id = %s
    WHERE descr ~ '%s'
    """ % (squad_mapping[squad], re))


#Enable GIS extensions (PostGIS)

In [None]:
db.query("CREATE EXTENSION postgis;")
db.query("CREATE EXTENSION postgis_topology;")
db.query("CREATE EXTENSION fuzzystrmatch;")
db.query("CREATE EXTENSION postgis_tiger_geocoder;")

Now use PostGIS to convert the NC state planar coordinates to latitude and longitude.

In [40]:
db.query("""DROP TABLE IF EXISTS call_latlong CASCADE;""")

db.query("""
CREATE TABLE call_latlong AS (
    SELECT call_id, st_x(point) AS longitude, st_y(point) AS latitude, point
    FROM (
        SELECT call_id, 
        st_Transform(ST_SetSRID(ST_MakePoint(geox, geoy), 2264), 4326)::geometry(Point, 4326) AS point
        FROM call
    ) AS a
);
""")

db.query("""
ALTER TABLE call_latlong
ADD CONSTRAINT call_latlong_pk PRIMARY KEY (call_id);
""")

db.query("""
ALTER TABLE call_latlong
ADD CONSTRAINT call_call_latlong_fk FOREIGN KEY (call_id) REFERENCES call (call_id);
""")


db.query("""DROP TABLE IF EXISTS incident_latlong CASCADE;""")

db.query("""
CREATE TABLE incident_latlong AS (
    SELECT incident_id, st_x(point) AS longitude, st_y(point) AS latitude, point
    FROM (
        SELECT incident_id, 
        -- We have to divide incident x and y by 100 to get the proper numbers
        st_Transform(ST_SetSRID(ST_MakePoint(geox/100, geoy/100), 2264), 4326)::geometry(Point, 4326) AS point
        FROM incident
    ) AS a
);
""")

db.query("""
ALTER TABLE incident_latlong
ADD CONSTRAINT incident_latlong_pk PRIMARY KEY (incident_id);
""")

db.query("""
ALTER TABLE incident_latlong
ADD CONSTRAINT incident_incident_latlong_fk FOREIGN KEY (incident_id) REFERENCES incident (incident_id);
""")

<dataset.persistence.util.ResultIter at 0x111f92208>

Loading the beats shapefiles via PostGIS: (the shapefile.sql script creates the `beat_temp` table, which is used to make the final `beat_geom` table)

In [41]:
# create the beat_temp table with the exact info from the shapefile; we'll only be keeping part of it
with open("../shapefiles/beats_districts/shapefile.sql", 'r') as f:
    db.query(f.read())

In [42]:
db.query("""DROP TABLE IF EXISTS beat_geom CASCADE;""")

db.query("""
CREATE TABLE beat_geom AS
SELECT
  --generate_series(1, (SELECT COUNT(*) FROM beat_temp)) AS beat_geom_id,
  row_number() over () AS beat_geom_id,
  beat_id,
  CASE
    WHEN contiguous = 'yes' THEN TRUE
    WHEN contiguous = 'no' THEN FALSE
    ELSE NULL
  END AS contiguous,
  geom
FROM
  beat_temp,
  beat
WHERE beat_temp.cad = beat.descr;
""")

db.query("""
ALTER TABLE beat_geom
ADD CONSTRAINT beat_geom_pk PRIMARY KEY (beat_geom_id);
""")

db.query("""
ALTER TABLE beat_geom
ADD CONSTRAINT beat_beat_geom_fk FOREIGN KEY (beat_id) REFERENCES beat (beat_id);
""")

db.query("""
DROP TABLE beat_temp;
""")

<dataset.persistence.util.ResultIter at 0x1069e1e80>

#Misc. Other Changes

Adding the incident_id column to call:

In [43]:
with open("../scripts/call_incident_id.sql", 'r') as f:
    db.query(f.read())

Setting up the relationships between beats, districts, and sectors:

In [44]:
db.query("""
UPDATE beat
SET district_id = (
  SELECT district_id
  FROM district
  WHERE district.descr = 'D' || SUBSTRING(beat.descr::text FROM 1 FOR 1)
)
WHERE beat.descr NOT IN ('DSO', 'OOJ');
""")

db.query("""
UPDATE beat
SET sector_id = (
  SELECT sector_id
  FROM sector
  WHERE sector.descr = 'NTH')
WHERE beat.district_id IN (
  SELECT district_id
  FROM district
  WHERE district.descr IN ('D2', 'D1', 'D5')
);
""")

db.query("""
UPDATE beat
SET sector_id = (
  SELECT sector_id
  FROM sector
  WHERE sector.descr = 'STH')
WHERE beat.district_id IN (
  SELECT district_id
  FROM district
  WHERE district.descr IN ('D3', 'D4')
);
""")

db.query("""
UPDATE district
SET sector_id = (
SELECT sector_id
  FROM sector
  WHERE sector.descr = 'STH')
WHERE district.descr IN ('D3', 'D4');
""")

db.query("""
UPDATE district
SET sector_id = (
SELECT sector_id
  FROM sector
  WHERE sector.descr = 'NTH')
WHERE district.descr IN ('D2', 'D1', 'D5');
""")

<dataset.persistence.util.ResultIter at 0x101fb8be0>

Indexes

In [45]:
# (index_name, table_name, column_name)
ndx_jobs = [
    ('call_call_source_id_ndx', 'call', 'call_source_id'),
    ('call_primary_unit_id_ndx', 'call', 'primary_unit_id'),
    ('call_first_dispatched_unit_id_ndx', 'call', 'first_dispatched_id'),
    ('call_reporting_unit_id_ndx', 'call', 'reporting_unit_id'),
    ('call_city_id_ndx', 'call', 'city_id'),
    ('call_beat_id_ndx', 'call', 'beat_id'),
    ('call_district_id_ndx', 'call', 'district_id'),
    ('call_sector_id_ndx', 'call', 'sector_id'),
    ('call_nature_id_ndx', 'call', 'nature_id'),
    ('call_close_code_id_ndx', 'call', 'close_code_id'),
    ('call_zip_code_id_ndx', 'call', 'zip_code_id'),
    # call_incident_id_ndx is created with the incident_id column
    ('incident_city_id_ndx', 'incident', 'city_id'),
    ('incident_beat_id_ndx', 'incident', 'beat_id'),
    ('incident_district_id_ndx', 'incident', 'district_id'),
    ('incident_sector_id_ndx', 'incident', 'sector_id'),
    ('incident_premise_id_ndx', 'incident', 'premise_id'),
    ('incident_weapon_id_ndx', 'incident', 'weapon_id'),
    ('incident_emp_bureau_id_ndx', 'incident', 'emp_bureau_id'),
    ('incident_emp_division_id_ndx', 'incident', 'emp_division_id'),
    ('incident_emp_unit_id_ndx', 'incident', 'emp_unit_id'),
    ('incident_investigation_status_id_ndx', 'incident', 'investigation_status_id'),
    ('incident_investigator_unit_id_ndx', 'incident', 'investigator_unit_id'),
    ('incident_case_status_id_ndx', 'incident', 'case_status_id'),
    ('incident_ucr_descr_id_ndx', 'incident', 'ucr_descr_id'),
    ('incident_zip_code_id_ndx', 'incident', 'zip_code_id')
]

for job in ndx_jobs:
    db.query("CREATE INDEX %s ON %s (%s);" % (job[0], job[1], job[2]))