In [375]:
# Import libraries
import boto3
import botocore

from datetime import datetime
import json
import requests
from pandas import json_normalize
import pandas as pd 
import os
import zipfile

In [None]:
'''
--- OVERVIEW ---

Given that vast amount of data accessible through the Zomato API, the pipeline that I am creating
in this project will be configured on a per city basis. After all, from a user's perspective, you can 
only be in one city at a time.

- The pipeline can eventually be made more robust to analyze price, cuisine, ratings, and other fields 
across cities.



In Part I (Zomato - Processing Files), we processed the raw data that was originally uploaded to 
Kaggle here: https://www.kaggle.com/shrutimehta/zomato-restaurants-data#file1.json

- Many duplicates were found in the raw data. This is likely due to the same parameters being passed 
to the `/search` endpoint multiple times. 



Zomato maintains 100 response limit for search results. Thus, a simple way to decrease duplication 
upstream of data processing is to add parameters with high cardinalities. 

- These parameters can be acquired through other Zomato endpoints, such as /categories, /collections, 
and /establishments, which is covered in the first part of this notebook.

- Once this data is collected, we hit the `/search` endpoint and save results for each respective param
(e.g., each establishment) to S3.



The pipeline will be structed as follows:
(1) Various API calls to collect metadata about the restaurants we want to query (This Notebook)
(2) API calls to `/search` endpoint that pass in desired metadata (This Notebook)
(3) Save raw results to S3 (This Notebook)
(4) Read results from S3, process logs (Part I)
(5) Store processed logs in RDS or another database (To Be Completed)



APIs that we can hit before using the /search API.
- One-off or infrequently required information can be stored in a database

- Regular API calls (/search) can pass in metadata that is read from DB tables

(1) /cities --> for id of city (once per city; store new results in database)
(2) /locations --> for entity_id (once per city/location; store new results in database)
(3) /categories --> for category_id (one-off; store in database)
(4) /collections --> for collections in a city (requires city_id; once; store in DB)
(5) /establishments --> for restaurant types (requires city_id; infrequently updated; store in DB)
(6) /cuisines --> for cuisine ids in a city (requires city_id; infrequently updatesd; store in DB)


NOTE: passing in conflicting `categories`, `establishments`, and `cuisines` fields can lead to a limited
or nonexistent result set (e.g., 'Cafe' category and 'BBQ' cuisine)

- For now, we are going to focus on the `establishments` param (the process for gathering metadata from other
endpoints is still completed below)

Once metadata is obtained, we pass in various params into the /search API
- For this exercise, we are going to configure a pipeline for NYC-based restaurants
'''

In [219]:
## Configuring a request for various Zomato endpoints
# Can build this out more if needed
params=None
endpoint=None

def send_request(params=params, endpoint=endpoint):
    base_url = "https://developers.zomato.com/api/v2.1/"
    new_url = base_url + str(endpoint)
    
    headers = {
    'user-key': '<YOUR_API_KEY_HERE>',
    'Content-Type': 'application/json'
    }
    
    params=params
    r = requests.get(url=new_url, headers=headers, params=params)
    r.raise_for_status
    return r.json()

In [273]:
## /cities endpoint
params = {
    'q': 'New York City'
}
r = send_request(params, endpoint='cities')
r.keys()

dict_keys(['location_suggestions', 'status', 'has_more', 'has_total', 'user_has_addresses'])

In [222]:
# viewing one item in results to conceptualize logic for metadata storage
location_suggestions = r['location_suggestions']
nyc = location_suggestions[0]
nyc

{'id': 280,
 'name': 'New York City, NY',
 'country_id': 216,
 'country_name': 'United States',
 'country_flag_url': 'https://b.zmtcdn.com/images/countries/flags/country_216.png',
 'should_experiment_with': 0,
 'has_go_out_tab': 0,
 'discovery_enabled': 1,
 'has_new_ad_format': 0,
 'is_state': 0,
 'state_id': 103,
 'state_name': 'New York State',
 'state_code': 'NY'}

In [239]:
'''
Creating a defaultdict for the cities metadata
- This can be stored in S3 and later normalized for storage in RDS

- We can have a smaller metadata table for city/city ID to reference when new cities are queried. That allows
us to use existing metadata instead of an API call when a city was already used.

- This assumes that data does not go stale, and it may not be worth the effort if the process is
not cost prohibitive.

- We are assuming that we do want to store metadata when collected to get into the habit of using databases
'''
# Import defaultdict
from collections import defaultdict


# Add cities to default dict
# Can add in logc to append city names if not found in an existing metadata store 
cities_metadata = defaultdict(list,{
        location['id']: {
            'name': location['name'],
            'country_id': location['country_id'],
            'country_name': location['country_name'],
            'state_id': location['state_id'],
            'state_name': location['state_name'],
            'state_code': location['state_code']
        } for location in location_suggestions
    }
)

In [241]:
# Get NYC id, name for future API calls
nyc_city_id = nyc['id']
nyc_city_name = nyc['name']

print(nyc_city_id)
print(nyc_city_name)

280
New York City, NY


In [276]:
## /locations endpoint
params = {
    'query': nyc_city_name
}

locations_resp = send_request(params, endpoint='locations')
locations_resp.keys()

dict_keys(['location_suggestions', 'status', 'has_more', 'has_total', 'user_has_addresses'])

In [243]:
# Limit results to suggested location
location_suggestion = locations_resp['location_suggestions'][0]

# Add location data to metadata store if city_id is found in metadata keys
if nyc_city_id in cities_metadata.keys():
    
    cities_metadata[nyc_city_id]['entity_id'] = location_suggestion['entity_id']
    cities_metadata[nyc_city_id]['entity_type'] = location_suggestion['entity_type']
    cities_metadata[nyc_city_id]['latitude'] = location_suggestion['latitude']
    cities_metadata[nyc_city_id]['longitude'] = location_suggestion['longitude']

In [244]:
# Get entity_type, entity_id for NYC
entity_type = location_suggestion['entity_type']
entity_id = location_suggestion['entity_id']

print(entity_type)
print(entity_id)

city
280


In [274]:
## /categories endpoint
params=None

# Send request
categories_resp = send_request(params, endpoint='categories')
categories_resp.keys()

dict_keys(['categories'])

In [250]:
# Limit results to `categories` key
categories = categories_resp['categories']
categories

[{'categories': {'id': 1, 'name': 'Delivery'}},
 {'categories': {'id': 2, 'name': 'Dine-out'}},
 {'categories': {'id': 3, 'name': 'Nightlife'}},
 {'categories': {'id': 4, 'name': 'Catching-up'}},
 {'categories': {'id': 5, 'name': 'Takeaway'}},
 {'categories': {'id': 6, 'name': 'Cafes'}},
 {'categories': {'id': 7, 'name': 'Daily Menus'}},
 {'categories': {'id': 8, 'name': 'Breakfast'}},
 {'categories': {'id': 9, 'name': 'Lunch'}},
 {'categories': {'id': 10, 'name': 'Dinner'}},
 {'categories': {'id': 11, 'name': 'Pubs & Bars'}},
 {'categories': {'id': 13, 'name': 'Pocket Friendly Delivery'}},
 {'categories': {'id': 14, 'name': 'Clubs & Lounges'}}]

In [252]:
# Store category metadata in defaultdict
category_metadata = defaultdict(list, {
    category['categories']['id']: category['categories']['name'] for category in categories
})

In [275]:
## /collections endpoint
params={
    'city_id': nyc_city_id
}

# Send request
collections_resp = send_request(params, endpoint='collections')
collections_resp.keys()

dict_keys(['collections', 'has_more', 'share_url', 'display_text', 'has_total', 'user_has_addresses'])

In [285]:
# Limit results to `collections` key
collections = collections_resp['collections']

# Inspect first result
collections[0]

{'collection': {'collection_id': 1,
  'res_count': 30,
  'image_url': 'https://b.zmtcdn.com/data/collections/b53772a204429cb9b42313d6dc22bf3c_1556018415.jpg',
  'url': 'https://www.zomato.com/new-york-city/top-restaurants?utm_source=api_basic_user&utm_medium=api&utm_campaign=v2.1',
  'title': 'Trending This Week',
  'description': 'Most popular restaurants in town this week',
  'share_url': 'http://www.zoma.to/c-280/1'}}

In [258]:
# Store collections metadata in defaultdict
collections_metadata = defaultdict(list, {
    collection['collection']['collection_id']: {
        'title': collection['collection']['title'],
        'description': collection['collection']['description'],
        'num_restaurants': collection['collection']['res_count']
    } for collection in collections
})

In [271]:
'''
Get collection ID for title `Trending This Week`
- Choosing this collection as it provides a frame of reference for how often this DAG can run (weekly). Also
suggests that the dataset will need to be updated on a regular basis (more guaranteed than other IDs)

- However, this could limit our results to 30 restaurants. If we want to include more results in our data, we can
extend the calls passed to /search to other cities

- This could also simply be a parameter that we do not want to include in our search if it is too restrictive 
on the result set
'''

# Hard coding - assumption is that it will not change
trending_collection_id = 1

In [277]:
## /establishments endpoint

params={
    'city_id': nyc_city_id
}

# Send request
establishments_resp = send_request(params, endpoint='establishments')
establishments_resp.keys()

dict_keys(['establishments'])

In [278]:
# Narrow to establishments key; view content
establishments = establishments_resp['establishments']
establishments

[{'establishment': {'id': 21, 'name': 'Quick Bites'}},
 {'establishment': {'id': 271, 'name': 'Sandwich Shop'}},
 {'establishment': {'id': 1, 'name': 'Café'}},
 {'establishment': {'id': 281, 'name': 'Fast Food'}},
 {'establishment': {'id': 7, 'name': 'Bar'}},
 {'establishment': {'id': 16, 'name': 'Casual Dining'}},
 {'establishment': {'id': 24, 'name': 'Deli'}},
 {'establishment': {'id': 31, 'name': 'Bakery'}},
 {'establishment': {'id': 18, 'name': 'Fine Dining'}},
 {'establishment': {'id': 275, 'name': 'Pizzeria'}},
 {'establishment': {'id': 101, 'name': 'Diner'}},
 {'establishment': {'id': 5, 'name': 'Lounge'}},
 {'establishment': {'id': 278, 'name': 'Wine Bar'}},
 {'establishment': {'id': 6, 'name': 'Pub'}},
 {'establishment': {'id': 286, 'name': 'Coffee Shop'}},
 {'establishment': {'id': 23, 'name': 'Dessert Parlour'}},
 {'establishment': {'id': 8, 'name': 'Club'}},
 {'establishment': {'id': 91, 'name': 'Bistro'}},
 {'establishment': {'id': 285, 'name': 'Fast Casual'}},
 {'establis

In [280]:
# Store results in default dict
establishment_metadata = defaultdict(list, {
    establishment['establishment']['id']: establishment['establishment']['name'] \
    for establishment in establishments
})

In [287]:
# Store list of establishment_ids for /search call further downstream
establishment_ids = [key for key in establishment_metadata.keys()]

In [288]:
## /cuisines endpoint
params={
    'city_id': nyc_city_id
}

# Send request
cuisines_resp = send_request(params, endpoint='cuisines')
cuisines_resp.keys()

dict_keys(['cuisines'])

In [293]:
# Narrowing results to `cuisines` key
cuisines = cuisines_resp['cuisines']
'''
100+ cuisines are returned. Because we are limited to 1,000 API calls per day, we likely cannot pass 
individual cuisines into the API with each of the other parameters.

However, what we could do is configure the /search DAG to execute the max # of API calls for ~6.5 days 
(with each task hitting the /search endpoint for a unique set of parameters), and then have another DAG 
that runs once a week to process all raw data.

* Filtering by cuisines is also similar to filtering by establishment type, though likely more restrictive 
in nature - establishments can offer multiple cuisines, though some cuisines are perhaps unique to a single
establishment
'''
print('Total cuisines: ', len(cuisines))
cuisines[0]

Total cuisines:  141


{'cuisine': {'cuisine_id': 1035, 'cuisine_name': 'Afghan'}}

In [294]:
# Store results in defaultdict
cuisines_metadata = defaultdict(list, {
    cuisine['cuisine']['cuisine_id']: cuisine['cuisine']['cuisine_name'] \
    for cuisine in cuisines
})

In [296]:
'''
We could filter to a shortened cuisine list for now, though for the /search API call, we are going to
filter by establishment_type
'''
cuisine_list = [1, 25, 156, 55, 95]

In [305]:
'''
/search endpoint

- Pass IDs collected upstream into params
- Append results from each set together
- Store results in S3
- Loop through various start dates with logic to break if result set returns 0 restaurants (none remaining)

* Not using collection ID - limits to 30 restaurants
'''
# Define params here; inspect before configuring loop
start_values = [num for num in range(1,100,20)]
count=20

print('Entity ID: ', entity_id)
print('Entity type: ', entity_type)
print('Start values: ', start_values)
print('Count: ', count)
print('Establishment IDs: ', establishment_ids)

Entity ID:  280
Entity type:  city
Start values:  [1, 21, 41, 61, 81]
Count:  20
Establishment IDs:  [21, 271, 1, 281, 7, 16, 24, 31, 18, 275, 101, 5, 278, 6, 286, 23, 8, 91, 285, 283, 284, 20, 282, 295, 292, 309, 272, 41, 291, 81, 294, 161, 290, 293]


In [379]:
'''
Initialize boto3 session, S3 resource; Zomato bucket
- We are saving the raw results in S3
- Metadata can also be stored in S3 under /metadata folder
'''
session = boto3.session.Session(profile_name='acloudguru')
s3 = session.resource('s3')
zomato_bucket = s3.Bucket('zomato-search-api-results')

In [384]:
'''
Main API - /search endpoint

- Here we pass in the establishment_ids (and other parameters) that we want to pull data for.
- Once the max 100 results are extracted for the set of params, the file is saved and uploaded to S3
- Removed from local storage after upload

* Error catching should be added before this is pushed into production
'''
for establishment_id in establishment_ids:
    print('Initiating requests for ', establishment_metadata[establishment_id], ', ', \
             cities_metadata[entity_id]['name'], '.')
    json_responses = []
    i=1
    for start_value in start_values:
        params = {
            'entity_id': entity_id,
            'entity_type': entity_type,
            'start': start_value,
            'count': 20,
            'establishment_type': establishment_id
        }
        
        json_r = send_request(params, endpoint='search')
        if not 'restaurants' in json_r.keys():
            break
        json_responses.append(json_r)
        i+=1
    
    print('Data extracted for ', establishment_metadata[establishment_id], ', ', \
             cities_metadata[entity_id]['name'], '.')
    # Save results to .JSON file
    filename = today + '_' + (cities_metadata[entity_id]['name'].split(',')[0]) + \
        '_' + establishment_metadata[establishment_id] + '.json'
    
    # Save results to JSON file
    with open(filename, 'w') as outfile:
        json.dump(json_responses, outfile)
    
    # Upload results to S3
    zomato_bucket.put_object(
        Body=filename,
        Key=filename
    )
    
    # Remove file from local storage
    os.remove(filename)

Initiating requests for  Quick Bites ,  New York City, NY .
Data extracted for  Quick Bites ,  New York City, NY .
Initiating requests for  Sandwich Shop ,  New York City, NY .
Data extracted for  Sandwich Shop ,  New York City, NY .
Initiating requests for  Café ,  New York City, NY .
Data extracted for  Café ,  New York City, NY .
Initiating requests for  Fast Food ,  New York City, NY .
Data extracted for  Fast Food ,  New York City, NY .
Initiating requests for  Bar ,  New York City, NY .
Data extracted for  Bar ,  New York City, NY .
Initiating requests for  Casual Dining ,  New York City, NY .
Data extracted for  Casual Dining ,  New York City, NY .
Initiating requests for  Deli ,  New York City, NY .
Data extracted for  Deli ,  New York City, NY .
Initiating requests for  Bakery ,  New York City, NY .
Data extracted for  Bakery ,  New York City, NY .
Initiating requests for  Fine Dining ,  New York City, NY .
Data extracted for  Fine Dining ,  New York City, NY .
Initiating requ

In [388]:
# Confirm that all objects were uploaded to s3
for obj in zomato_bucket.objects.all():
    print(obj.key)

09-04-2020_New York City_Bakery.json
09-04-2020_New York City_Bar.json
09-04-2020_New York City_Beer Garden.json
09-04-2020_New York City_Beverage Shop.json
09-04-2020_New York City_Bistro.json
09-04-2020_New York City_Brewery.json
09-04-2020_New York City_Café.json
09-04-2020_New York City_Casual Dining.json
09-04-2020_New York City_Club.json
09-04-2020_New York City_Cocktail Bar.json
09-04-2020_New York City_Coffee Shop.json
09-04-2020_New York City_Deli.json
09-04-2020_New York City_Dessert Parlour.json
09-04-2020_New York City_Diner.json
09-04-2020_New York City_Fast Casual.json
09-04-2020_New York City_Fast Food.json
09-04-2020_New York City_Fine Dining.json
09-04-2020_New York City_Food Court.json
09-04-2020_New York City_Food Truck.json
09-04-2020_New York City_Izakaya.json
09-04-2020_New York City_Juice Bar.json
09-04-2020_New York City_Lounge.json
09-04-2020_New York City_Microbrewery.json
09-04-2020_New York City_Noodle Shop.json
09-04-2020_New York City_Pizzeria.json
09-04-2