In [233]:
import pandas as pd
import re
import sqlite3

In [234]:
conn = sqlite3.Connection('data/database.db')
cur = conn.cursor()

In [205]:
cur.execute('drop table if exists complaint_summary')
cur.execute('drop table if exists violation_summary')
cur.execute('drop table if exists complaint_history')
cur.execute('drop table if exists violation_history')
cur.execute('drop table if exists rental_registrations')
cur.execute('drop table if exists owners')
cur.execute('drop table if exists parcels')

conn.commit()

In [206]:
def clean_string(string):
    if pd.isna(string):
        return string
    
    string = re.sub(r"[^A-Za-z0-9]+", " ", string)
    
    string = (string
        .upper()
        .replace('LLC', '')
        .replace('LTD', '')
    )

    string = re.sub(r"\s\s+", " ", string)

    return string.strip()
    

In [207]:
pd.set_option('display.max_columns', None)

In [208]:
# Aggregate owners
owners = pd.read_csv('data/combined_cleveland_parcels.csv', low_memory=False)
owners = owners[[
    'parcelpin', 'transfer_date', 'deeded_owner', 
    'grantor', 'grantee', 'par_addr_all'
]].copy()
owners['transfer_date'] = pd.to_datetime(owners['transfer_date']).apply(lambda x: x.date())

TO_CLEAN = [
    'deeded_owner', 'grantor', 'grantee', 'par_addr_all'
]

for col in TO_CLEAN:
    owners[col + '_clean'] = owners[col].apply(clean_string)

owners = owners[
    ~owners['deeded_owner_clean'].isna() &
    (owners['deeded_owner_clean'] != '')
]

owners = (owners
    .groupby('deeded_owner_clean')
    .agg({
        'deeded_owner': list,
        'parcelpin': list
    })
    .reset_index()
)

owners['parcelpin'] = owners.parcelpin.apply(lambda x: sorted(list(set(x))))
owners['deeded_owner'] = owners.deeded_owner.apply(lambda x: sorted(list(set(x))))
owners['deeded_owner_main_alias'] = owners.deeded_owner.apply(lambda x: x[0])
owners['parcels_owned'] = owners.parcelpin.apply(lambda x: len(x))

owners_crosswalk = owners.explode('parcelpin')[['parcelpin', 'deeded_owner_clean', 'deeded_owner_main_alias']].copy()

owners = owners[['deeded_owner_clean', 'deeded_owner_main_alias', 'parcels_owned']].copy()

In [209]:
# Attach complaints, violations, and rentals to parcels
complaint_history = pd.read_csv('data/complaint_history.csv', low_memory=False)
complaint_history['FILE_DATE'] = pd.to_datetime(complaint_history['FILE_DATE'], utc=True)
complaint_history['TASK_DATE'] = pd.to_datetime(complaint_history['TASK_DATE'], utc=True)
complaint_history[~complaint_history.PERMIT_ID.isna()]
complaint_history['PERMIT_ID'] = complaint_history['PERMIT_ID'].apply(lambda x: x.strip())

violation_history = pd.read_csv('data/violation_history.csv', low_memory=False)
violation_history['FILE_DATE'] = pd.to_datetime(violation_history['FILE_DATE'], utc=True)
violation_history['TASK_DATE'] = pd.to_datetime(violation_history['TASK_DATE'], utc=True)
violation_history['ISSUE_DATE'] = pd.to_datetime(violation_history['ISSUE_DATE'], utc=True)
violation_history[~violation_history.RECORD_ID.isna()]
violation_history['RECORD_ID'] = violation_history['RECORD_ID']

rentals = pd.read_csv('data/rental_registrations.csv', low_memory=False)
rentals['FileDate'] = pd.to_datetime(rentals['FileDate'], utc=True)
rentals['StatusDate'] = pd.to_datetime(rentals['StatusDate'], utc=True)

parcels = pd.read_csv('data/combined_cleveland_parcels.csv', low_memory=False)
parcels = parcels[['parcelpin', 'transfer_date', 'par_addr_all']].copy()
parcels['transfer_date'] = pd.to_datetime(parcels['transfer_date'], utc=True)

survey = pd.read_csv('data/survey_2022_parcels.csv', low_memory=False)
survey = survey[['Survey Parcel Number', 'Property Category', 'Property Use', 'Photo Link', 'Survey Grade Result']].copy()
survey = survey.rename(columns={'Survey Parcel Number': 'parcelpin'})

  violation_history['ISSUE_DATE'] = pd.to_datetime(violation_history['ISSUE_DATE'], utc=True)


In [210]:
parcel_rentals = parcels.merge(
    rentals, 
    left_on='parcelpin',
    right_on='DW_Parcel',
    how='inner'
)

total_rentals = parcel_rentals.groupby('parcelpin').size().reset_index().rename(columns={0: 'total_rentals'})

same_owner_parcel_rentals = parcel_rentals[parcel_rentals.transfer_date <= parcel_rentals.FileDate].copy()
same_owner_rentals = same_owner_parcel_rentals.groupby('parcelpin').size().reset_index().rename(columns={0: 'same_owner_rentals'})

In [211]:
violations = violation_history.groupby('RECORD_ID').agg({'FILE_DATE': list, 'DW_Parcel': list}).reset_index()
violations['FILE_DATE'] = violations['FILE_DATE'].apply(lambda x: x[0])
violations['DW_Parcel'] = violations['DW_Parcel'].apply(lambda x: list(set(x)))
violations = violations.explode('DW_Parcel')

In [212]:
parcel_violations = parcels.merge(
    violations, 
    left_on='parcelpin',
    right_on='DW_Parcel',
    how='inner'
)

total_violations = parcel_violations.groupby('parcelpin').size().reset_index().rename(columns={0: 'total_violations'})

same_owner_parcel_violations = parcel_violations[parcel_violations.transfer_date <= parcel_violations.FILE_DATE].copy()
same_owner_violations = same_owner_parcel_violations.groupby('parcelpin').size().reset_index().rename(columns={0: 'same_owner_violations'})

In [213]:
complaints = complaint_history.groupby('PERMIT_ID').agg({'FILE_DATE': list, 'DW_Parcel': list}).reset_index()
complaints['FILE_DATE'] = complaints['FILE_DATE'].apply(lambda x: x[0])
complaints['DW_Parcel'] = complaints['DW_Parcel'].apply(lambda x: list(set(x)))
complaints = complaints.explode('DW_Parcel')

In [214]:
parcel_complaints = parcels.merge(
    complaints, 
    left_on='parcelpin',
    right_on='DW_Parcel',
    how='inner'
)

total_complaints = parcel_complaints.groupby('parcelpin').size().reset_index().rename(columns={0: 'total_complaints'})

same_owner_parcel_complaints = parcel_complaints[parcel_complaints.transfer_date <= parcel_complaints.FILE_DATE].copy()
same_owner_complaints = same_owner_parcel_complaints.groupby('parcelpin').size().reset_index().rename(columns={0: 'same_owner_complaints'})

In [215]:
parcel_stats = (parcels
    .merge(total_rentals, on='parcelpin', how='left')
    .merge(total_complaints, on='parcelpin', how='left')                
    .merge(total_violations, on='parcelpin', how='left')

    .merge(same_owner_rentals, on='parcelpin', how='left')                
    .merge(same_owner_complaints, on='parcelpin', how='left')                
    .merge(same_owner_violations, on='parcelpin', how='left')

    .merge(survey, on='parcelpin', how='left')  

    .merge(owners_crosswalk, on='parcelpin', how='left')
)

COLS_TO_IMPUTE = [
    'total_rentals', 'total_complaints', 'total_violations', 
    'same_owner_rentals', 'same_owner_complaints', 'same_owner_violations'
]

for col in COLS_TO_IMPUTE:
    parcel_stats[col] = parcel_stats[col].fillna(0)

parcel_stats['par_addr_all_clean'] = parcel_stats.par_addr_all.apply(clean_string)

parcel_stats = parcel_stats.rename(columns={
    'Property Category': 'property_category', 
    'Property Use': 'property_use', 
    'Photo Link': 'photo_link', 
    'Survey Grade Result': 'survey_grade_result'
})

DATE_COLS = ['transfer_date']
for col in DATE_COLS:
    parcel_stats[col] = parcel_stats[col].dt.strftime("%Y-%m-%d %H:%M:%S")

print(len(parcel_stats))
parcel_stats = parcel_stats.drop_duplicates('parcelpin')
print(len(parcel_stats))

163157
163156


In [216]:

rentals = rentals[['b1_alt_ID', 'FileDate', 'StatusDate', 'Status', 'Units', 'ACCELA_CITIZEN_ACCESS_URL', 'DW_Parcel']].copy()
rentals = rentals.rename(columns={'b1_alt_ID': 'record_id', 'DW_Parcel': 'parcelpin'})
rentals = (rentals
    .merge(parcel_stats[[
        'parcelpin', 'par_addr_all', 'deeded_owner_clean',
        'deeded_owner_main_alias', 'par_addr_all_clean',
        'photo_link'
    ]], on='parcelpin', how='left')
    .rename(columns={
        'FileDate': 'file_date', 
        'StatusDate': 'status_date', 
        'Status': 'status', 
        'Units': 'units', 
        'ACCELA_CITIZEN_ACCESS_URL': 'aca_url'
    })
)

DATE_COLS = ['file_date', 'status_date']
for col in DATE_COLS:
    rentals[col] = rentals[col].dt.strftime("%Y-%m-%d %H:%M:%S")

In [217]:
violation_history = violation_history.rename(columns={
    'RECORD_ID': 'record_id',
    'FILE_DATE': 'file_date',
    'TASK_NAME': 'task_name',
    'TASK_STATUS': 'task_status',
    'TASK_DATE': 'task_date',
    'TYPE_OF_VIOLATION': 'type_of_violation',
    'OCCUPANCY_OR_USE': 'occupancy_or_use',
    'ISSUE_DATE': 'issue_date',
    'ACCELA_CITIZEN_ACCESS_URL': 'aca_url',
    'DW_Parcel': 'parcelpin'
})

violation_history = violation_history[[
    'record_id', 'file_date', 'task_name',
    'task_status', 'task_date', 'type_of_violation',
    'occupancy_or_use', 'issue_date', 'aca_url',
    'parcelpin'
]].copy()

violation_history = (violation_history
    .merge(parcel_stats[[
        'parcelpin', 'par_addr_all', 'deeded_owner_clean', 
        'deeded_owner_main_alias', 'par_addr_all_clean',
        'photo_link'
    ]], on='parcelpin', how='left')
)

DATE_COLS = ['file_date', 'task_date', 'issue_date']
for col in DATE_COLS:
    violation_history[col] = violation_history[col].dt.strftime("%Y-%m-%d %H:%M:%S")

In [218]:
complaint_history = complaint_history.rename(columns={
    'PERMIT_ID': 'record_id',
    'SOURCE': 'source',
    'FILE_DATE': 'file_date',
    'CURRENT_TASK': 'current_task',
    'CURRENT_TASK_STATUS': 'current_task_status',
    'TASK_DATE': 'task_date',
    'TYPE_OF_COMPLAINT': 'type_of_complaint',
    'ACCELA_CITIZEN_ACCESS_URL': 'aca_url',
    'DW_Parcel': 'parcelpin'
})

complaint_history = complaint_history[[
    'record_id',
    'source',
    'file_date',
    'current_task',
    'current_task_status',
    'task_date',
    'type_of_complaint',
    'aca_url',
    'parcelpin'
]].copy()

complaint_history = (complaint_history
    .merge(parcel_stats[[
        'parcelpin', 'par_addr_all', 'deeded_owner_clean', 
        'deeded_owner_main_alias', 'par_addr_all_clean',
        'photo_link'
    ]], on='parcelpin', how='left')
)

DATE_COLS = ['file_date', 'task_date']
for col in DATE_COLS:
    complaint_history[col] = complaint_history[col].dt.strftime("%Y-%m-%d %H:%M:%S")

In [219]:
cur.execute('drop table if exists owners')
conn.commit()

cur.execute("""
    create table if not exists owners (
        deeded_owner_clean text primary key,
        deeded_owner_main_alias text,
        parcels_owned text
    )
""")
conn.commit()

cur.executemany("""
    insert into owners (
        deeded_owner_clean,
        deeded_owner_main_alias,
        parcels_owned
    ) values (
        ?, ?, ? 
    )
""", [
    tuple(map(
        lambda val: val
        if not pd.isna(val) 
        else None
    , row))
    for row in owners.values
])
conn.commit()

In [220]:
cur.execute('drop table if exists parcels')
conn.commit()

cur.execute("""
    create table if not exists parcels (
        parcelpin text primary key, 
        transfer_date text, 
        par_addr_all text,
        total_rentals integer, 
        total_complaints integer, 
        total_violations integer, 
        same_owner_rentals integer, 
        same_owner_complaints integer, 
        same_owner_violations integer, 
        property_category text, 
        property_use text, 
        photo_link text, 
        survey_grade_result text,
        deeded_owner_clean text,
        deeded_owner_main_alias text,
        par_addr_all_clean text,
            
        foreign key (deeded_owner_clean)
            references owners (deeded_owner_clean)
    )
""")
conn.commit()

cur.executemany("""
    insert into parcels (
        parcelpin, 
        transfer_date, 
        par_addr_all,
        total_rentals, 
        total_complaints, 
        total_violations, 
        same_owner_rentals, 
        same_owner_complaints, 
        same_owner_violations, 
        property_category, 
        property_use, 
        photo_link, 
        survey_grade_result,
        deeded_owner_clean,
        deeded_owner_main_alias,
        par_addr_all_clean
    ) values (
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?,
        ?
    )
""", [
    tuple(map(
        lambda val: val
        if not pd.isna(val) 
        else None
    , row))
    for row in parcel_stats.values
])
conn.commit()

In [222]:
cur.execute('drop table if exists rental_registrations')
conn.commit()

cur.execute("""
    create table if not exists rental_registrations (
        record_id text primary key, 
        file_date text, 
        status_date text, 
        status text, 
        units integer, 
        aca_url text, 
        parcelpin text, 
        par_addr_all text, 
        deeded_owner_clean text, 
        deeded_owner_main_alias text,
        par_addr_all_clean text,
        photo_link text,
            
        foreign key (parcelpin)
            references parcels(parcelpin)

        foreign key (deeded_owner_clean)
            references owners(deeded_owner_clean)
    )
""")
conn.commit()

cur.executemany("""
    insert into rental_registrations (
        record_id, 
        file_date, 
        status_date, 
        status, 
        units, 
        aca_url, 
        parcelpin, 
        par_addr_all, 
        deeded_owner_clean, 
        deeded_owner_main_alias,
        par_addr_all_clean,
        photo_link
    ) values (
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?,
        ?, ?
    )
""", [
    tuple(map(
        lambda val: val
        if not pd.isna(val) 
        else None
    , row))
    for row in rentals.values
])
conn.commit()

In [223]:
cur.execute('drop table if exists violation_history')
conn.commit()

cur.execute("""
    create table if not exists violation_history (
        id integer primary key,
        record_id text, 
        file_date text,
        task_name text,
        task_status text,
        task_date text,
        type_of_violation text,
        occupancy_or_use text,
        issue_date text,
        aca_url text,
        parcelpin text,
        par_addr_all text,
        deeded_owner_clean text,
        deeded_owner_main_alias text,
        par_addr_all_clean text,
        photo_link text,
            
        foreign key (parcelpin)
            references parcels(parcelpin)

        foreign key (deeded_owner_clean)
            references owners(deeded_owner_clean)
    )
""")
conn.commit()

cur.executemany("""
    insert into violation_history (
        record_id, 
        file_date,
        task_name,
        task_status,
        task_date,
        type_of_violation,
        occupancy_or_use,
        issue_date,
        aca_url,
        parcelpin,
        par_addr_all,
        deeded_owner_clean,
        deeded_owner_main_alias,
        par_addr_all_clean,
        photo_link
    ) values (
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?
    )
""", [
    tuple(map(
        lambda val: val
        if not pd.isna(val) 
        else None
    , row))
    for row in violation_history.values
])
conn.commit()

In [224]:
cur.execute('drop table if exists complaint_history')
conn.commit()

cur.execute("""
    create table if not exists complaint_history (
        id integer primary key,
        record_id text,
        source text,
        file_date text,
        current_task text,
        current_task_status text,
        task_date text,
        type_of_complaint text,
        aca_url text,
        parcelpin text,
        par_addr_all text,
        deeded_owner_clean text,
        deeded_owner_main_alias text,
        par_addr_all_clean text,
        photo_link text,
            
        foreign key (parcelpin)
            references parcels(parcelpin)

        foreign key (deeded_owner_clean)
            references owners(deeded_owner_clean)
    )
""")
conn.commit()

cur.executemany("""
    insert into complaint_history (
        record_id,
        source,
        file_date,
        current_task,
        current_task_status,
        task_date,
        type_of_complaint,
        aca_url,
        parcelpin,
        par_addr_all,
        deeded_owner_clean,
        deeded_owner_main_alias,
        par_addr_all_clean,
        photo_link
    ) values (
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?, ?,
        ?, ?, ?, ?
    )
""", [
    tuple(map(
        lambda val: val
        if not pd.isna(val) 
        else None
    , row))
    for row in complaint_history.values
])
conn.commit()

In [225]:
# Make summary tables.
cur.execute("""
    create table complaint_summary as 
        select
            record_id,
            source,
            file_date,
            current_task,
            current_task_status,
            task_date as last_task_date,
            type_of_complaint,
            aca_url,
            parcelpin,
            par_addr_all,
            deeded_owner_clean,
            deeded_owner_main_alias,
            par_addr_all_clean,
            photo_link
        from complaint_history ch
        join (
            select id, row_number() over (partition by record_id order by task_date asc) as row_num
            from complaint_history
            where task_date is not null
        ) as sub
            on ch.id == sub.id and sub.row_num == 1
""")
conn.commit()

cur.execute("""
    create table violation_summary as 
        select 
            record_id,
            file_date,
            task_name as last_task_name,
            task_status as last_task_status,
            task_date as last_task_date,
            type_of_violation,
            occupancy_or_use,
            issue_date,
            aca_url, 
            parcelpin, 
            par_addr_all, 
            deeded_owner_clean, 
            deeded_owner_main_alias, 
            par_addr_all_clean,
            photo_link
        from violation_history vh
        join (
            select id, row_number() over (partition by record_id order by task_date asc) as row_num
            from violation_history
            where task_date is not null
        ) as sub
            on vh.id == sub.id and sub.row_num == 1
""")
conn.commit()

In [235]:
TO_INDEX = {
    "parcels": [
        "par_addr_all_clean",
        "deeded_owner_clean",
        "deeded_owner_main_alias",
        "parcelpin",
        "transfer_date",
    ],
    "owners": [
        "deeded_owner_clean",
        "deeded_owner_main_alias",
        "parcels_owned",
    ],
    "violation_summary": [
        "record_id",
        "deeded_owner_clean",
        "deeded_owner_main_alias",
        "file_date",
        "parcelpin",
    ],
    "complaint_summary": [
        "record_id",
        "deeded_owner_clean",
        "deeded_owner_main_alias",
        "file_date",
        "parcelpin",
    ],
    "rental_registrations": [
        "record_id",
        "deeded_owner_clean",
        "deeded_owner_main_alias",
        "file_date",
        "parcelpin",
    ],
    "complaint_history": [
        "record_id",
        "task_date",
    ],
    "violation_history": [
        "record_id",
        "task_date",
    ],
}

def create_indexes(to_index, conn):
    cur = conn.cursor()

    for table, columns in to_index.items():
        for column in columns:
            idx_name = f"idx_{table}_{column}"
            sql = f"CREATE INDEX IF NOT EXISTS {idx_name} ON {table}({column});"
            cur.execute(sql)

    conn.commit()

create_indexes(TO_INDEX, conn)