# Socrata to Parquet Initial Data Pull

In [1]:
import pandas as pd
from sodapy import Socrata
from configparser import ConfigParser

## Configuration
Setting up domain of dataset and dataset identifier

In [2]:
# Setup configuration
config = ConfigParser()
config.read('./config.ini')
app_token = config['socrata']['APP_TOKEN']

# Create client to Socrata
client = Socrata(domain='data.cityofnewyork.us', app_token=app_token, timeout=60)

# NYC 311 Calls (2010-Present)
dataset = 'erm2-nwe9'

In [3]:
def get_query(dataset_identifier: str, query: str, return_df: bool=False) -> pd.DataFrame | list:
    '''
    Function to query Socrata 'data.cityofnewyork.us' domain.

    Parameters
    ----------
    dataset_identifier: name of the dataset
    query: SQL-like query string
    return_df: Whether to return DataFrame (True) or list of results (False)

    Returns
    -------
    DataFrame or list of results
    '''
    results = client.get(dataset_identifier=dataset_identifier, query=query)
    if return_df:
        return pd.DataFrame.from_records(results)
    else:
        return results

## Initial Data Pull
Retrieving the initial dataset, converting datatypes, and writing to Parquet file

In [4]:
query = (
    """
    SELECT
        unique_key,
        created_date,
        descriptor,
        incident_zip,
        community_board,
        latitude,
        longitude
    WHERE
        complaint_type = "Dead Animal"
        AND incident_zip IS NOT NULL
        AND community_board IS NOT NULL
        AND latitude IS NOT NULL
        AND longitude IS NOT NULL
    ORDER BY
        created_date DESC,
        unique_key DESC
    LIMIT
        20000
    """
)
df = get_query(dataset_identifier=dataset, query=query, return_df=True)

In [5]:
df.head(3)

Unnamed: 0,unique_key,created_date,descriptor,incident_zip,community_board,latitude,longitude
0,56531911,2023-01-15T23:12:46.000,Cat,11206,04 BROOKLYN,40.702889289111376,-73.93339315118288
1,56534747,2023-01-15T21:23:23.000,Bird,11102,01 QUEENS,40.77271127561967,-73.92181339007267
2,56535166,2023-01-15T21:19:15.000,Raccoon,11102,01 QUEENS,40.77171101962946,-73.92416850407255


### Convert Datetypes
Converting dates to datetime and latitude/longitude to numeric (float)

In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12107 entries, 0 to 12106
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   unique_key       12107 non-null  object
 1   created_date     12107 non-null  object
 2   descriptor       12107 non-null  object
 3   incident_zip     12107 non-null  object
 4   community_board  12107 non-null  object
 5   latitude         12107 non-null  object
 6   longitude        12107 non-null  object
dtypes: object(7)
memory usage: 662.2+ KB


In [7]:
def convert_datatypes(df: pd.DataFrame) -> pd.DataFrame:
    # Convert created date to datetime object
    df['created_date'] = pd.to_datetime(df['created_date'])

    # Convert latitude and longitude to double
    df['latitude'] = pd.to_numeric(df['latitude'])
    df['longitude'] = pd.to_numeric(df['longitude'])
    
    return df

In [8]:
df = convert_datatypes(df)

In [9]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12107 entries, 0 to 12106
Data columns (total 7 columns):
 #   Column           Non-Null Count  Dtype         
---  ------           --------------  -----         
 0   unique_key       12107 non-null  object        
 1   created_date     12107 non-null  datetime64[ns]
 2   descriptor       12107 non-null  object        
 3   incident_zip     12107 non-null  object        
 4   community_board  12107 non-null  object        
 5   latitude         12107 non-null  float64       
 6   longitude        12107 non-null  float64       
dtypes: datetime64[ns](1), float64(2), object(4)
memory usage: 662.2+ KB


### Writing Parquet and JSON metadata file

In [10]:
# Easy!
df.to_parquet(dataset + '.parquet')

In [11]:
import json
from datetime import datetime

In [12]:
def write_metadata(current_series: pd.Series, dataset_identifier: str) -> None:
    temp = {
        'last_date': current_series['created_date'].isoformat(),
        'last_pull': datetime.now().isoformat()
    }
    date_obj = json.dumps(temp, indent=4)
    filename = dataset_identifier + '.metadata.json'
    with open(filename, 'w') as outfile:
        outfile.write(date_obj)

In [13]:
write_metadata(df.iloc[0], dataset)

## Subsequent Data Pulls
After the initial data pull, metadata will determine if new 311 calls for dead animals have been made before rewriting a new Parquet file

In [14]:
def read_metadata(dataset_identifier: str) -> str:
    filename = dataset_identifier + '.metadata.json'
    with open(filename, 'r') as file:
        metadict = json.load(file)
    
    return metadict['last_date']

In [15]:
md_date = read_metadata(dataset)

In [16]:
md_date

'2023-01-15T23:12:46'

In [17]:
last_complaint_query = (
    """
    SELECT created_date
    WHERE
        complaint_type = "Dead Animal"
        AND incident_zip IS NOT NULL
        AND community_board IS NOT NULL
        AND latitude IS NOT NULL
        AND longitude IS NOT NULL
    ORDER BY
        created_date DESC,
        unique_key DESC
    LIMIT 1
    """
)
lc_date = get_query(dataset, last_complaint_query)[0]['created_date']

In [18]:
lc_date

'2023-01-15T23:12:46.000'

In [19]:
def should_pull(metadata_date: str, last_complaint_date: str) -> bool:
    md = datetime.fromisoformat(metadata_date)
    ld = datetime.fromisoformat(last_complaint_date)

    return True if md != ld else False

In [20]:
should_pull(md_date, lc_date)

False

In [21]:
# Possible SQL injection but not front facing
latest_query = (
    """
    SELECT
        unique_key,
        created_date,
        descriptor,
        incident_zip,
        community_board,
        latitude,
        longitude
    WHERE
        created_date > '{}'
        AND complaint_type = "Dead Animal"
        AND incident_zip IS NOT NULL
        AND community_board IS NOT NULL
        AND latitude IS NOT NULL
        AND longitude IS NOT NULL
    ORDER BY
        created_date DESC,
        unique_key DESC
    LIMIT
        20000
    """.format(md_date)
)
latest = get_query(dataset, latest_query, True)

In [22]:
get_query(dataset, latest_query)    # Should be empty list since notebook run on same day

[]

## Check PySpark

In [23]:
from pyspark.sql import SparkSession

In [24]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/17 13:57:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [25]:
new_df = spark.read.parquet(dataset + '.parquet')

In [26]:
new_df.printSchema()
new_df.show(n=5, truncate=10)

root
 |-- unique_key: string (nullable = true)
 |-- created_date: timestamp (nullable = true)
 |-- descriptor: string (nullable = true)
 |-- incident_zip: string (nullable = true)
 |-- community_board: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

+----------+------------+----------+------------+---------------+----------+----------+
|unique_key|created_date|descriptor|incident_zip|community_board|  latitude| longitude|
+----------+------------+----------+------------+---------------+----------+----------+
|  56531911|  2023-01...|       Cat|       11206|     04 BROO...|40.7028...|-73.933...|
|  56534747|  2023-01...|      Bird|       11102|      01 QUEENS|40.7727...|-73.921...|
|  56535166|  2023-01...|   Raccoon|       11102|      01 QUEENS|40.7717...|-73.924...|
|  56533271|  2023-01...|   Opossum|       10305|     02 STAT...|40.6032...|-74.073...|
|  56532657|  2023-01...|       Cat|       11229|     15 BROO...|40.6017...