In [25]:
import numpy as np
import requests
import pandas as pd
from datetime import datetime
import mysql.connector
import schedule
import time

In [26]:
config = {
  'user': 'newuser',
  'password': 'newpassword',
  'host': '0.0.0.0',
  'database': 'test',
  'raise_on_warnings': True
}

In [27]:
#get data through API
with open("app_token.txt") as file: app_token = file.readline().rstrip()

# HTTPS address for the APIs
url_311 = "https://data.cityofnewyork.us/resource/erm2-nwe9.json?"
url_dhs = "https://data.cityofnewyork.us/resource/k46n-sa2m.json?"
url_acc = "https://data.cityofnewyork.us/resource/h9gi-nx95.json?"
url_events = "https://data.cityofnewyork.us/resource/tvpp-9vvx.json?"

# Prepare queries (change date queries to standardized past week)
query_dates_311 = "$where=created_date between '2019-11-02T00:00:00' and '2019-11-09T00:00:00'&$limit=50000"
query_311 = url_311 + "$$app_token=" + app_token + "&" + query_dates_311
query_dates_dhs = "$where=date_of_census between '2019-11-02T00:00:00' and '2019-11-09T00:00:00'&$limit=50000"
query_dhs = url_dhs + "$$app_token=" + app_token + "&" + query_dates_dhs
query_acc = "$where=accident_date > '2019-01-01T00:00:00'&$limit=50000"
query_url_acc = url_acc + "$$app_token=" + app_token + "&" + query_acc
query_events = "$where=start_date_time > '2019-01-01T00:00:00'&$limit=50000"
query_url_events = url_events + "$$app_token=" + app_token + "&" + query_events

In [28]:
#Select keys

keys_311 = ['unique_key', 
            'created_date',
            'closed_date',
            'agency',
            'agency_name',
            'complaint_type',
            'descriptor',
            'incident_zip',
            'borough',
           'facility_type',
           'location_type',
            'status',
           'due_date',
            'resolution_action_updated_date',
            'x_coordinate_state_plane',
            'y_coordinate_state_plane',
            'open_data_channel_type',
            'latitude',
            'longitude']

keys_dhs = ['date_of_census',
             'total_adults_in_shelter',
             'total_children_in_shelter',
             'total_individuals_in_shelter',
             'single_adult_men_in_shelter',
             'single_adult_women_in_shelter',
             'total_single_adults_in_shelter',
             'families_with_children_in_shelter',
             'adults_in_families_with_children_in_shelter',
             'children_in_families_with_children_in_shelter',
             'total_individuals_in_families_with_children_in_shelter_',
             'adult_families_in_shelter',
             'individuals_in_adult_families_in_shelter']

keys_acc =  ["collision_id", "accident_date", "accident_time", "borough", "zip_code", "latitude", "longitude", 
             "number_of_persons_injured", "number_of_persons_killed", 
             "number_pedestrians_injured", "number_of_pedestrians_killed", 
             "number_of_cyclists_injured", "number_of_cyclists_killed", 
             "number_of_motorists_injured", "number_of_motorists_killed", 
             "contributing_factor_vehicle_1", "contributing_factor_vehicle_2"]

keys_events = ['event_id', 'event_name', 'start_date_time', 'end_date_time',
               'event_agency', 'event_type', 'event_borough', 'event_location']

In [29]:
#Queries to insert data into MariaDB
#Needs to be updated in a way that removes existing content and loads in new content

calls_sql = (
       "insert into 311_calls"
       "(unique_key, created_date, closed_date, agency, agency_name, complaint_type,"
                "descriptor, location_type, incident_zip, borough, facility_type,"
                "status, due_date, resolution_action_updated_date, x_coordinate_state_plane,"
                "y_coordinate_state_plane, open_data_channel_type, latitude, longitude)"
       "values (%(unique_key)s, %(created_date)s, %(closed_date)s, %(agency)s, %(agency_name)s, %(complaint_type)s,"
                "%(descriptor)s, %(location_type)s, %(incident_zip)s, %(borough)s, %(facility_type)s,"
                "%(status)s, %(due_date)s, %(resolution_action_updated_date)s, %(x_coordinate_state_plane)s,"
                "%(y_coordinate_state_plane)s, %(open_data_channel_type)s, %(latitude)s, %(longitude)s)")

dhs_sql = ("insert into dhs"
       "(date_of_census, total_adults_in_shelter, total_children_in_shelter, total_individuals_in_shelter,"
                "single_adult_men_in_shelter, single_adult_women_in_shelter, total_single_adults_in_shelter,"
                "families_with_children_in_shelter, adults_in_families_with_children_in_shelter,"
                "children_in_families_with_children_in_shelter, total_individuals_in_families_with_children_in_shelter_,"
                "adult_families_in_shelter, individuals_in_adult_families_in_shelter)"
       "values (%(date_of_census)s, %(total_adults_in_shelter)s, %(total_children_in_shelter)s, %(total_individuals_in_shelter)s,"
                "%(single_adult_men_in_shelter)s, %(single_adult_women_in_shelter)s, %(total_single_adults_in_shelter)s,"
                "%(families_with_children_in_shelter)s, %(adults_in_families_with_children_in_shelter)s,"
                "%(children_in_families_with_children_in_shelter)s, %(total_individuals_in_families_with_children_in_shelter_)s,"
                "%(adult_families_in_shelter)s, %(individuals_in_adult_families_in_shelter)s)")

acc_sql = (
   "insert into accidents"
   "(collision_id, accident_date, accident_time, borough, zip_code, latitude, longitude,"
    "number_of_persons_injured, number_of_persons_killed, " 
    "number_pedestrians_injured, number_of_pedestrians_killed, " 
    "number_of_cyclists_injured, number_of_cyclists_killed, " 
    "number_of_motorists_injured, number_of_motorists_killed, " 
    "contributing_factor_vehicle_1, contributing_factor_vehicle_2)"
   
    "values (%(collision_id)s, %(accident_date)s, %(accident_time)s, %(borough)s, %(zip_code)s, %(latitude)s,"
    "%(longitude)s, %(number_of_persons_injured)s, %(number_of_persons_killed)s," 
    "%(number_pedestrians_injured)s, %(number_of_pedestrians_killed)s, %(number_of_cyclists_injured)s," 
    "%(number_of_cyclists_killed)s, %(number_of_motorists_injured)s, %(number_of_motorists_killed)s,"
    "%(contributing_factor_vehicle_1)s, %(contributing_factor_vehicle_2)s)")

events_sql = (
   "insert into events"
   "(event_id, event_name, start_date_time, end_date_time,"
    "event_agency, event_type, event_borough, event_location)"
   
    "values (%(event_id)s, %(event_name)s, %(start_date_time)s, %(end_date_time)s, %(event_agency)s, %(event_type)s,"
    "%(event_borough)s, %(event_location)s)")

In [30]:
def job():
    print("I'm working...")
    
    #Get past week's data

    response = requests.get(query_311)
    calls_311 = response.json()

    response = requests.get(query_dhs)
    dhs = response.json()
    
    response = requests.get(query_url_acc)
    acc = response.json()
    
    response = requests.get(query_url_events)
    events = response.json()
    
    #Extract relevant data
    
    data_311 = []
    for call in calls_311:
        dict_calls = {'unique_key': call['unique_key']}
        for key in keys_311:
           if np.sum([i==key for i in call.keys()])==1: 
               dict_calls[key] = call[key]
           else:
                dict_calls[key] = None
        data_311.append(dict_calls)
        
    data_dhs = []
    for record in dhs:
        dict_dhs = {'date_of_census': record['date_of_census']}
        for key in keys_dhs:
           if np.sum([i==key for i in record.keys()])==1: 
               dict_dhs[key] = record[key]
           else:
                dict_dhs[key] = None
        data_dhs.append(dict_dhs)
        
    data_acc = []
    for a in acc:
        #start by adding the collission_id to the dictionary
        dict_acc = {'collision_id': a['collision_id']}
        for key in keys_acc:
           if np.sum([k == key for k in a.keys()])== 1: 
               dict_acc[key] = a[key]
           else:
                dict_acc[key] = None
        data_acc.append(dict_acc)
    
    data_events = []
    for a in events:
        #start by adding the event_id to the dictionary
        dict_events = {'event_id': a['event_id']}
        for key in keys_events:
           if np.sum([k == key for k in a.keys()])== 1: 
               dict_events[key] = a[key]
           else:
                dict_events[key] = None
        data_events.append(dict_events)
    
    cnx = mysql.connector.connect(**config)
    cursor = cnx.cursor()

    for call in data_311:
       cursor.execute(calls_sql, call)

    for day in data_dhs:
       cursor.execute(dhs_sql, day)
    
    for a in data_acc:
       cursor.execute(acc_sql, a)
    
    for a in data_events:
       cursor.execute(events_sql, a)

    cnx.commit()
    cursor.close()
    cnx.close()
    
    print('done')

In [31]:
job()

I'm working...
done


In [24]:
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()
cnx.commit()
cursor.close()
cnx.close()

In [None]:
schedule.every().day.at("12:17").do(job)

# schedule.every(10).minutes.do(job)
# schedule.every().hour.do(job)
# schedule.every().day.at("10:30").do(job)
# schedule.every(5).to(10).minutes.do(job)
# schedule.every().monday.do(job)
# schedule.every().wednesday.at("13:15").do(job)
# schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

In [6]:
#Get past week's data

response = requests.get(query_311)
calls_311 = response.json()

response = requests.get(query_dhs)
dhs = response.json()

In [None]:
data_311 = []
for call in calls_311:
    dict_calls = {'unique_key': call['unique_key']}
    for key in keys_311:
       if np.sum([i==key for i in call.keys()])==1: 
           dict_calls[key] = call[key]
       else:
            dict_calls[key] = None
    data_311.append(dict_calls)

In [31]:
#Input NAs if they exist
data_dhs = []
for record in dhs:
    dict_dhs = {'date_of_census': record['date_of_census']}
    for key in keys_dhs:
       if np.sum([i==key for i in record.keys()])==1: 
           dict_dhs[key] = record[key]
       else:
            dict_dhs[key] = None
    data_dhs.append(dict_dhs)

In [None]:
cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()

calls_sql = (
   "insert into 311_calls"
   "(unique_key, created_date, closed_date, agency, agency_name, complaint_type,"
            "descriptor, location_type, incident_zip, borough, facility_type,"
            "status, due_date, resolution_action_updated_date, x_coordinate_state_plane,"
            "y_coordinate_state_plane, open_data_channel_type, latitude, longitude)"
   "values (%(unique_key)s, %(created_date)s, %(closed_date)s, %(agency)s, %(agency_name)s, %(complaint_type)s,"
            "%(descriptor)s, %(location_type)s, %(incident_zip)s, %(borough)s, %(facility_type)s,"
            "%(status)s, %(due_date)s, %(resolution_action_updated_date)s, %(x_coordinate_state_plane)s,"
            "%(y_coordinate_state_plane)s, %(open_data_channel_type)s, %(latitude)s, %(longitude)s)")

for call in data_311:
   cursor.execute(calls_sql, call)

cnx.commit()
cursor.close()
cnx.close()

In [33]:
dhs_sql = (
   "insert into dhs"
   "(date_of_census, total_adults_in_shelter, total_children_in_shelter, total_individuals_in_shelter,"
            "single_adult_men_in_shelter, single_adult_women_in_shelter, total_single_adults_in_shelter,"
            "families_with_children_in_shelter, adults_in_families_with_children_in_shelter,"
            "children_in_families_with_children_in_shelter, total_individuals_in_families_with_children_in_shelter_,"
            "adult_families_in_shelter, individuals_in_adult_families_in_shelter)"
   "values (%(date_of_census)s, %(total_adults_in_shelter)s, %(total_children_in_shelter)s, %(total_individuals_in_shelter)s,"
            "%(single_adult_men_in_shelter)s, %(single_adult_women_in_shelter)s, %(total_single_adults_in_shelter)s,"
            "%(families_with_children_in_shelter)s, %(adults_in_families_with_children_in_shelter)s,"
            "%(children_in_families_with_children_in_shelter)s, %(total_individuals_in_families_with_children_in_shelter_)s,"
            "%(adult_families_in_shelter)s, %(individuals_in_adult_families_in_shelter)s)")

cnx = mysql.connector.connect(**config)
cursor = cnx.cursor()

for day in data_dhs:
   cursor.execute(dhs_sql, day)

cnx.commit()
cursor.close()
cnx.close()

In [None]:
def job():
    print("I'm working...")
    
schedule.every(2).seconds.do(job)

# schedule.every(10).minutes.do(job)
# schedule.every().hour.do(job)
# schedule.every().day.at("10:30").do(job)
# schedule.every(5).to(10).minutes.do(job)
# schedule.every().monday.do(job)
# schedule.every().wednesday.at("13:15").do(job)
# schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)
#sometimes the job runs more than once (???)