# NYC Parking Violations
This example demonstrates ETL operations for transforming New York City parking summons data to create maps. 

Original example can be found [here](https://github.com/JBlumstein/NYCParking/blob/master/NYC_Parking_Violations_Mapping_Example.ipynb)

### Notes on running these queries:

Bodo is used by defaults, which distributes data chunks across cores automatically.

Using 2016 and 2017 dataset [here](https://www.kaggle.com/new-york-city/nyc-parking-tickets) which is ~4GB.


To run the code:
1. Make sure you [add your AWS account credentials to Saturn Cloud](https://saturncloud.io/docs/examples/python/load-data/qs-load-data-s3/#create-aws-credentials) to access the data.
2. If you want to run a query in regular pandas:
    1. Comment lines with Jupyter parallel magic (%%px) and bodo decorator (@bodo.jit) from all the code cells.
    2. Then, re-run cells from the beginning.

### Start an IPyParallel cluster
Run the following code in a cell to start an IPyParallel cluster. 4 cores are used in this example. 

In [None]:
import ipyparallel as ipp

import psutil

n = min(psutil.cpu_count(logical=False), 8)
rc = ipp.Cluster(engines="mpi", n=n).start_and_connect_sync(activate=True)

### Verifying your setup
Run the following code to verify that your IPyParallel cluster is set up correctly:

In [None]:
%%px
import bodo

print(f"Hello World from rank {bodo.get_rank()}. Total ranks={bodo.get_size()}")

## Importing the Packages

These are the main packages we are going to work with:
 - Bodo to parallelize Python code automatically
 - Pandas to work with data
 - Numpy to work with arrays

In [None]:
%%px
import warnings

warnings.filterwarnings("ignore")

import time

import bodo
import numpy as np
import pandas as pd

## Data Loading
In this section parking tickets data is loaded from S3 bucket and aggregated by day, violation type, and police precinct and placed in a dataframe. 

Each dataframe is added to a list of dataframes, and then the dataframes are all appended into a single dataframe named `main_df`.

In addition, violcation codes, and precincts information are loaded as well.

In [None]:
%%px


@bodo.jit(distributed=["many_year_df"], cache=True)
def load_parking_tickets():
    start = time.time()
    year_2016_df = pd.read_csv(
        "s3://bodo-examples-data/nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2016.csv",
        parse_dates=["Issue Date"],
    )
    year_2016_df = year_2016_df.groupby(
        ["Issue Date", "Violation County", "Violation Precinct", "Violation Code"],
        as_index=False,
    )["Summons Number"].count()

    year_2017_df = pd.read_csv(
        "s3://bodo-examples-data/nyc-parking-tickets/Parking_Violations_Issued_-_Fiscal_Year_2017.csv",
        parse_dates=["Issue Date"],
    )
    year_2017_df = year_2017_df.groupby(
        ["Issue Date", "Violation County", "Violation Precinct", "Violation Code"],
        as_index=False,
    )["Summons Number"].count()

    # concatenate all dataframes into one dataframe
    many_year_df = pd.concat([year_2016_df, year_2017_df])
    end = time.time()
    print("Reading Time: ", end - start)
    print(many_year_df.head())
    return many_year_df


main_df = load_parking_tickets()

In [None]:
%%px
@bodo.jit
def load_violation_precincts_codes():
    start = time.time()
    violation_codes = pd.read_csv("./DOF_Parking_Violation_Codes.csv")
    violation_codes.columns = [
        "Violation Code",
        "Definition",
        "manhattan_96_and_below",
        "all_other_areas",
    ]
    nyc_precincts_df = pd.read_csv("./nyc_precincts.csv", index_col="index")
    end = time.time()
    print("Violation and precincts load Time: ", end - start)
    return violation_codes, nyc_precincts_df


violation_codes, nyc_precincts_df = load_violation_precincts_codes()

## Data Cleaning

1. Remove summons with undefined violations (violation code 36).
2. Delete entries that have dates not within our dataset dates.

In [None]:
%%px
@bodo.jit(distributed=["main_df"], cache=True)
def elim_code_36(main_df):
    start = time.time()
    """function to take out all violations with code 36 (other)"""
    main_df = main_df[main_df["Violation Code"] != 36].sort_values(
        "Summons Number", ascending=False
    )
    end = time.time()
    print("Eliminate undefined violations time: ", end - start)
    print(main_df.head())
    return main_df


main_df = elim_code_36(main_df)

In [None]:
%%px
@bodo.jit(distributed=["main_df"], cache=True)
def remove_outliers(main_df):
    start = time.time()
    main_df = main_df[
        (main_df["Issue Date"] >= "2016-01-01")
        & (main_df["Issue Date"] <= "2017-12-31")
    ]
    end = time.time()
    print("Remove outliers time: ", (end - start))
    print(main_df.head())
    return main_df


main_df = remove_outliers(main_df)

## Collect More Information
Data on each violation type, like ticket cost and violation descriptions, are added to the dataset by joining our main_df dataset with a violation type level dataset

In [None]:
%%px
@bodo.jit(distributed=["main_df"], cache=True)
def merge_violation_code(main_df):
    start = time.time()
    # left join main_df and violation_codes df so that there's more info on violation in main_df
    main_df = pd.merge(main_df, violation_codes, on="Violation Code", how="left")
    # cast precincts as integers from floats (inadvertent type change by merge)
    main_df["Violation Precinct"] = main_df["Violation Precinct"].astype(int)
    end = time.time()
    print("Merge time: ", (end - start))
    print(main_df.head())
    return main_df


main_df = merge_violation_code(main_df)

## Compute Cost of Summons For Each Precinct.

1. Most violations have different ticket prices, based on whether they occur in Manhattan below 96th St. or elsewhere in New York City. The daily revenue for each violation type in each precinct are determined by multiplying the number of offenses by the average cost of the offense (based on how much of the precinct is in Manhattan below 96th St.).

In [None]:
%%px
# calculate the total summonses in dollars for a violation in a precinct on a day
@bodo.jit(distributed=["main_df"], cache=True)
def calculate_total_summons(main_df):
    start = time.time()
    # create column for portion of precinct 96th st. and below
    n = len(main_df)
    portion_manhattan_96_and_below = np.empty(n, np.int64)
    # NOTE: To run pandas, use this loop.
    # for i in range(n):
    for i in bodo.prange(n):
        x = main_df["Violation Precinct"].iat[i]
        if x < 22 or x == 23:
            portion_manhattan_96_and_below[i] = 1.0
        elif x == 22:
            portion_manhattan_96_and_below[i] = 0.75
        elif x == 24:
            portion_manhattan_96_and_below[i] = 0.5
        else:  # other
            portion_manhattan_96_and_below[i] = 0
    main_df["portion_manhattan_96_and_below"] = portion_manhattan_96_and_below

    # create column for average dollar amount of summons based on location
    main_df["average_summons_amount"] = (
        main_df["portion_manhattan_96_and_below"] * main_df["manhattan_96_and_below"]
        + (1 - main_df["portion_manhattan_96_and_below"]) * main_df["all_other_areas"]
    )

    # get total summons dollars by multiplying average dollar amount by number of summons given
    main_df["total_summons_dollars"] = (
        main_df["Summons Number"] * main_df["average_summons_amount"]
    )
    main_df = main_df.sort_values(by=["total_summons_dollars"], ascending=False)
    end = time.time()
    print("Calculate Total Summons Time: ", (end - start))
    print(main_df.head())
    return main_df


main_df = calculate_total_summons(main_df)

2. The aggregate function aggregates main_df by precinct. Once the data is run through this function that it will have a single row per precinct with the precinct number, the number of summonses, and the combined dollar value of the summonses.

In [None]:
%%px


@bodo.jit(distributed=["main_df", "precinct_offenses_df"], cache=True)
def aggregate(main_df):
    """function that aggregates and filters data
    e.g. total violations by precinct
    """
    start = time.time()
    filtered_dataset = main_df[
        ["Violation Precinct", "Summons Number", "total_summons_dollars"]
    ]
    precinct_offenses_df = (
        filtered_dataset.groupby(by=["Violation Precinct"])
        .sum()
        .reset_index()
        .fillna(0)
    )
    end = time.time()
    print("Aggregate code time: ", (end - start))
    print(precinct_offenses_df.head())
    return precinct_offenses_df


precinct_offenses_df = aggregate(main_df)

In [None]:
# To stop the cluster run the following command.
rc.cluster.stop_cluster_sync()