In [16]:
import pandas as pd
import time
from prefect import flow, task

In [17]:
#The dataset is very big and pandas has trouble assigning dtypes to the columns automatically because of bad data quality. We assign dtype=str and cast manually.
@task(retries=2, log_prints=True)
def load(rows):
    print('Loaded sample of the total dataset, with some of the total columns only')
    return pd.read_csv('./311_Service_Requests_from_2010_to_Present.csv', dtype=str, nrows=rows, 
        usecols = [
            "Unique Key",
            "Created Date",
            "Closed Date",
            "Agency",
            "Agency Name",
            "Complaint Type",
            "Descriptor",
            "Location Type",
            "Incident Zip",
            "Incident Address",
            "Borough",
            "Status",
            "Bridge Highway Direction",
            "Taxi Company Borough",
            "Vehicle Type",
            "School or Citywide Complaint",
            "Intersection Street 1",
            "Ferry Direction",
            "Ferry Terminal Name",
            "Bridge Highway Segment",
            "Taxi Pick Up Location",
            "Bridge Highway Name",
            "Garage Lot Name",
            "Intersection Street 2",
            "Road Ramp",
            "Landmark"
        ]

    )
    


In [18]:
#We remove columns that have too many nulls, like max_nulls = 0,5  so 50%
@task(retries=2, log_prints=True)
def remove_too_empty_cols(df, max_nulls):
    ds = df.dropna(axis=1, thresh=max_nulls*len(df))
    difference = set(df.columns)-set(ds.columns)
    if len(difference) != 0:
        print(f'==>Removing too empty columns: removed {difference}')
    else:
        print('==>No too empty columns to remove')
    return ds
    


In [19]:
#Check unique IDs. Remove duplicate entries if existing
@task(retries=2, log_prints=True)
def remove_duplicated_IDs(df):
    ds = df.drop_duplicates(subset=['Unique Key'])
    set_difference = set(ds['Unique Key']) - set(df['Unique Key'])
    if len(set_difference)!= 0:
        print(f'==>Removing duplicated keys IDs: {set_difference}')
    else:
        print('==>No duplicated IDs to remove')
    return ds

        

In [20]:
@task(retries=2, log_prints=True)
def dates_to_datetime(df):
    df['Created Date'] = pd.to_datetime(df['Created Date'], errors='coerce', format="%m/%d/%Y %I:%M:%S %p")
    df['Closed Date'] = pd.to_datetime(df['Closed Date'], errors='coerce', format="%m/%d/%Y %I:%M:%S %p")
    print('==>Casting dates as datetime format')
    return df
    


In [21]:
@task(retries=2, log_prints=True)
def remove_invalid_zip_codes(df):
    df['Incident Zip'] = df['Incident Zip'].str.strip()
    isnum = df['Incident Zip'].str.isnumeric()
    islength5 = df['Incident Zip'].str.len()==5
    iscorrectrange = df['Incident Zip'].astype(float).between(501,99950)
    condition = isnum & islength5 & iscorrectrange
    if df[~condition]['Incident Zip'].shape[0] !=0:
        print(f'==>Step check zip codes: removed incident zip codes {df[~condition]['Incident Zip'].unique()}')
    else:
        print('==>No removed Incident zip codes')
    df = df[condition]
    return df


In [22]:
@task(retries=2, log_prints=True)
def remove_invalid_status(df):
    df.Status = df.Status.str.strip().str.lower()
    mask = df.Status.isin(['closed', 'open', 'assigned'])
    if df[~mask].shape[0] != 0:
        print(f'==>Step Status check: Removed {df[~mask].shape[0]}')
    else:
        print('==>No invalid status to remove')
    return df[mask]

    
    

In [23]:
@task(retries=2, log_prints=True)
def polish_strings(df):
    for col in ['Borough', 'Agency', 'Agency Name', 'Complaint Type', 'Descriptor', 'Location Type', 'Incident Address']:
        df[col] = df[col].astype(str).str.strip().str.lower()
    print('==>All string columns have been standarized')
    return df
        

In [24]:
@task(retries=2, log_prints=True)
def final_validation(df):
    assert df['Unique Key'].duplicated().sum() == 0, 'Duplicated ID'
    assert df['Created Date'].dtype == 'datetime64[ns]', 'Created Date is not a date'
    assert df['Closed Date'].dtype == 'datetime64[ns]', 'Closed Date is not a date'
    assert df['Status'].isin(['closed','open','assigned']).all(), 'Unrecognized status'
    print('==>All validations passed')
    return df

In [25]:
@flow()
def main():
    t0 = time.time()
    df = (load(20000)
    .pipe(remove_too_empty_cols,0.5)
    .pipe(remove_duplicated_IDs)
    .pipe(dates_to_datetime)
    .pipe(remove_invalid_zip_codes)
    .pipe(remove_invalid_status)
    .pipe(polish_strings)
    .pipe(final_validation)
    )
    dt = round(time.time() - t0,2)
    print(f'Your data is ready, processed in {dt} seconds, resulting in {df.shape[0]} rows and {df.shape[1]} columns')
    df.to_csv('311_NYC_requests_clean.csv')
    
main()

Your data is ready, processed in 2.65 seconds, resulting in 19859 rows and 12 columns
