# Part 1 - Data Download

To make an API call using python, we use the `requests` library. Before we start coding, we need to read the documentation to understand if we need to set-up any configuration beforehand.

Below is a copy+paste of the MVCC [API Documentation](https://dev.socrata.com/foundry/data.cityofnewyork.us/h9gi-nx95), between the START and END headers.
#### START

### API Documentation 

#### Getting Started
All communication with the API is done through HTTPS, and errors are communicated through HTTP response codes. Available response types include JSON (including GeoJSON), XML, and CSV, which are selectable by the "extension" (.json, etc.) on the API endpoint or through content-negotiation with HTTP Accepts headers.

This documentation also includes inline, runable examples. Click on any link that contains a  gear symbol next to it to run that example live against the Motor Vehicle Collisions - Crashes API. If you just want to grab the API endpoint and go, you'll find it below.

#### Tokens
All requests should include an app token that identifies your application, and each application should have its own unique app token. A limited number of requests can be made without an app token, but they are subject to much lower throttling limits than request that do include one. With an app token, your application is guaranteed access to it's own pool of requests. If you don't have an app token yet, click the button to the right to sign up for one.

Once you have an app token, you can include it with your request either by using the X-App-Token HTTP header, or by passing it via the $$app_token parameter on your URL.

#### END

**The above tells us a few important points:**
1. All API calls are done via HTTPS and errors are communicated through HTTP response codes. Response codes indicate to the client what has happened. Typically a response of 200/201 indicates a success, while a 401/403 indicates an error.

2. We need to use {'X-App-Token': APP_TOKEN} as our headers to pass to our GET request. Even though this is mentioned, there is no mention on where we need to pass our SECRET. Thus, I suspect this header is an optional argument.

### Things to Know / Plans

We have multiple methods to query the underlying API. 
We can use a basic cURL request, the NYC OpenData [Socrata API](https://dev.socrata.com/docs/queries/), and/or write a SQL query to pull from the google [bigquery-public-data project](https://cloud.google.com/bigquery/public-data). 

1. The API Documentation from the Socrate API above tells us that we need to use [Paging Through Data](https://dev.socrata.com/docs/paging.html) to pull all the 1.8 million records from the table. This is because the API defaults the limit to 1000 records returned. Paging through the data allows us to set an offset index, which tells the API where to start the returned list of results. It is important to mention that the data has to be ordered properly to ensure the results will be stable as we page through the dataset.
2. The dataset has 1.83 Million Rows.
3. There are several noticable data quality issues which we will discuss later.
4. After we have created the simple API, we are going to parallelize the IO, increase the number of workers to assist with getting this data more rapidly.


## Step 1: sys.path and Imports

Best practice to include the cwd parent within our sys.path. Next, we include the relevant imports for querying the API. 

In [31]:
import os
import sys
from pathlib import Path
CWD = str(Path.cwd().parent)
sys.path.append(CWD)
import os
import requests
from requests.auth import HTTPBasicAuth
import json
import base64 
import traceback
from datetime import datetime, timedelta
import warnings

import pandas as pd
from pandas.core import api
from sodapy import Socrata # Alternative 

from tqdm import tqdm
# personal common library
from common.utilities import decorators 
from secrets_manager import get_secret
from concurrent.futures import ThreadPoolExecutor

warnings.filterwarnings('ignore')
print(CWD)

/Users/jordancarson/Projects/JPM/data-engineering-nyc


We keep all secrets and application tokens in AWS Secrets Manager also known as Key Management Service (KMS). The below json will **not** be published to git. [Link to AWS KMS](https://us-east-1.console.aws.amazon.com/kms/home?region=us-east-1#/kms/keys/000d47a4-e093-4ffc-8b99-8f6f71381fcc)

In [44]:
secretARN = json.load(open(os.path.join(CWD, 'secret.json')))[0]
SECRET_DATA = json.loads(get_secret(os.getenv('AWS_SECRET_NAME_NYC_OPEN_DATA') or secretARN.get('AWS_SECRET_NAME_NYC_OPEN_DATA')))
assert isinstance(SECRET_DATA, dict), 'SECRET_DATA was not loaded properly!'

In [45]:
# API Constants
API_LIMIT = 50000 # we want to pull 50,000 records at each iteration
NYC_OPEN_DATA_API_ENDPOINT = 'https://data.cityofnewyork.us/resource/h9gi-nx95.json'
NYC_OPEN_DATA_API_KEY = SECRET_DATA["NYC_OPEN_DATA_API_KEY"]
NYC_OPEN_DATA_API_SECRET =  SECRET_DATA['NYC_OPEN_DATA_API_SECRET']
NYC_OPEN_DATA_APP_TOKEN = SECRET_DATA['NYC_OPEN_DATA_APP_TOKEN']
NYC_OPEN_DATA_APP_SECRET = SECRET_DATA['NYC_OPEN_DATA_APP_SECRET']

### Step 2: Query the Data

Next, we create a function to call the API using the offset and limit parameters in our request URL. The below code snippet downloads all the data and returns a single Pandas Dataframe. The problem with this implementation is that it takes **a lot of time** to query all the data. 

We are using the [paging](https://dev.socrata.com/docs/paging.html) option, which allows us to set `offset` and `limit` parameters. To query all the data we need to  we need to create a HTML based SOAP query. To do this we add parameters to the back of the ENDPOINT. 

**Iteration 0**
```python
API_LIMIT   = 50000
OFFSET      = 0
ID          = 'collision_id'
ENDPOINT    = f'https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit={API_LIMIT}&$offset={offset}&$order={ID}'
```

**Iteration >0**
```python
API_LIMIT   = 50000
OFFSET     += API_LIMIT or pd.read_json(response.text).shape[0] # the number of records will be the same as the API_LIMIT until we reach the last page
ID          = 'collision_id'
ENDPOINT    = f'https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit={API_LIMIT}&$offset={offset}&$order={ID}'
```

After the first iteration, the $OFFSET$ parameter will be updated to the value of the API_LIMIT. We will continue to increment the offset parameter by the API_LIMIT until we receive all the data, or until the number of records returned is lower than the limit. This logic can be summarized in the function call below. After the response has been made, we push the `response.text` into a temp dataframe, which will be appended to a list. We then concatenate the list of dataframes outside of the loop. The reason we do not want to concatenate DataFrames within a loop, can be summarized [here](https://stackoverflow.com/questions/36489576/why-does-concatenation-of-dataframes-get-exponentially-slower).

In [29]:
@decorators.timeit
def api_pagination_results(orient = 'records'):
    """
    One method to pull data from the Open Source API is to 
    """
    ID = 'collision_id'
    finished = False
    offset = 0
    out_frames = list()
    while not finished:
        ENDPOINT = f'https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit={API_LIMIT}&$offset={offset}&$order={ID}'
        response = requests.get(ENDPOINT, auth=HTTPBasicAuth(NYC_OPEN_DATA_API_KEY, NYC_OPEN_DATA_API_SECRET))

        temp_df = pd.read_json(response.text, orient=orient)
        length = len(temp_df)
        out_frames.append(temp_df)
        
        offset += temp_df.shape[0] # len(temp_df)

        if length < API_LIMIT:
            finished = True
        
    # concatenate the list into one master dataframe, do not do this inside the loop!
    df = pd.concat(out_frames, ignore_index=True)
    del out_frames, length, offset
    return df

In [30]:
data = api_pagination_results()

api_pagination_results => 1101231.782913208 ms


### Pagination Results

After 1101231.78 ms, all of the data has been returned from the function `api_pagination_results()`. Converting this figure to minutes, we can see that the total time takes ~18 minutes.
```
print(1101231.782913208 / 1000 / 60)
>> 18.353863048553468
```
### Why does this take so long?
A call over a network is IO bound to the caller. There are typically 2 types of performance bottlenecks for most programs, IO or CPU. Python, a dynamic language, has a Global Interpreter Lock (GIL), embedded within, allowing only one thread to hold the control of the python interpreter. Sometimes it makes sense to use multi-threading with IO bound tasks. Using the `concurrent.futures` library we can create a `ThreadPoolExecutor` to increase the number of threads, or execution units, to our program. 

Another option to speed up our program would be to use `asynchronous programming`. There are two libraries `asyncio` and `aiohttp` that allow you to execute some tasks in a *seemingly concurrent* manner. It is commonly used in web-servers and database connections, and also useful to speed up IO bound tasks. 

In our next example, we will address the performance bottleneck by using the `concurrent` library. A few new functions will be created.

Lets see what this will look like.

In [59]:
def get_one(url):
    """
    Function to make one GET request to the URL specified.
    Returns:
        DataFrame / response.text
    """
    response = requests.request("GET", url, auth=HTTPBasicAuth(NYC_OPEN_DATA_API_KEY, NYC_OPEN_DATA_API_SECRET))
    # return pd.read_json(response.text, orient='records')
    return response.text.encode("utf8")

We are now going to parallelize the requests using the ThreadPoolExecutor from the `concurrency` library. The idea is that we will create a function to pull the response from the endpoint, and then use the exector to map the response function to a list of URLs. In this instance, we will need the full-list of URLs ahead of time.

In [60]:
@decorators.timeit
def get_all(urls, workers=15):
    # https://gist.github.com/rednafi/3334a9cce2d7f24226f6fe1231b5ac5f
    with ThreadPoolExecutor(max_workers=workers) as executor:
        results = list(
            tqdm(executor.map(get_one, urls, timeout=60), total=len(urls))
        )
        return results

Next, we create a function to pull all the URLs. In this instance, we need to know ahead of time the entire list of URLs. We can build the different endpoints using the offset and limit parameters in the API. However, there is a drawback to this solution. See the output section below for more detail.

In [61]:
def create_urls(id='collision_id'):
    # we start with an offset of 0, we then increment the offset to be equal to the number of records returned. We are specifying the 
    # number of records returned via the API_LIMIT. 
    offset, limit = 0, 50000
    urls = list()
    for _ in range(0, 2_000_000, limit):
        ENDPOINT = f'https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit={limit}&$offset={offset}&$order={id}'
        urls.append(ENDPOINT)
        offset += limit
    return urls

Lastly, we create our `parallel_pagination_api` function. Inside, we have the list of urls from `create_urls()` and the text_results from `get_all()`. The latter returns a generator, which we loop through to get the results. We do this via a list-comprehension, converting the response.text to a dataframe, and then concatenate all the dataframes afterwards.

In [65]:
# @decorators.parallel_task
def parallel_pagination_api():
    urls = create_urls()
    text_results = get_all(urls)
    # for result in text_results:
    #         final.append(result)
    dfs = [pd.read_json(response, orient='records') for response in text_results]
    return pd.concat(dfs, ignore_index=True)

df_parallel = parallel_pagination_api()

100%|██████████| 40/40 [00:58<00:00,  1.46s/it]


get_all => 58675.67992210388 ms


### Output

Increasing the number of threads speeds up our retreival! The sequential program takes ~18 minutes, while our parallelized algorithm takes ~1-3 minutes. The function returns the data from the API in 1 minute, however it takes another 2 minutes to convert to a pandas dataframe. This is a fantastic improvement, however we have to hard-code a critically important number.
```
print(58675.67992210388 / 1000 / 60)
>> 0.9779279987017314
```
### Issues with Parallelization for Pagination

This solution **requires us** to specify the amount of records - i.e. the max range in `create_urls()`. The NYC OpenData API does not provide a programatic way to locate the the next page, offset, or total records in the MVCC table. Most of the time APIs provide this in the response.headers, however this is not available. **This is one downside to using this approach.**

#### Next Steps

- Given the issue above; it might be worth looking into asynchronous programming.
- Lastly, this data is also available within the bigquery table - `bigquery-public-data.new_york_mv_collisions.nypd_mv_collisions`. During my research of this project, I discovered that the number of records increased from 1829000 to 1830092. I immediately queried the table in GCP, and noticed that the data was already updated. This could be the solution to remove the bottleneck on the API side. 

# Output

We are pushing the output to a parquet file, compressed using gzip, to be consumed by our next notebook.

Cick Here to access the next workbook!

In [75]:
try: 
    df_parallel.to_parquet('output.parquet', compression='gzip')
except Exception as err:
    print(f'Error occured writing parquet, using CSV. Error: {err}')
    df_parallel.to_csv('output.csv')

### Bigquery example

The below query pulls the total number of records from the public dataset `bigquery-public-data.new_york_mv_collisions.nypd_mv_collisions`. All columns are the same with the exception of collision_id, this is called unique_key in the below table.

In [None]:
from bigquery import get_client
from time import sleep
SQL = """
#standardSQL
SELECT COUNT(distinct unique_key) FROM `bigquery-public-data.new_york_mv_collisions.nypd_mv_collisions` a
"""