In [1]:
from dask import do
from distributed import LocalCluster, Executor
from configparser import ConfigParser
import requests
import numpy as np

In [2]:
def get_current(location_str, config):
    '''Get latest temperature data from openweather
    params:
        location_str: string with city,country_code
        config: ConfigParser object with openweather section and api_key key
    returns:
        tuple: (location_str, parsed json response)    
    '''
    weather_key = config.get('openweather', 'api_key')
    resp = requests.get('http://api.openweathermap.org/data/2.5/weather',       
                        params={'q': location_str, 
                                'appid': weather_key, 
                                'units': 'metric'}) 
    return location_str, resp.json()
   

In [3]:
def get_forecast(location_str, config):
    '''Get forecast temperature data from openweather
    params:
        location_str: string with city,country_code
        config: ConfigParser object with openweather section and api_key key
    returns:
        tuple: (location_str, parsed json response)
    '''
    weather_key = config.get('openweather', 'api_key')
    resp = requests.get('http://api.openweathermap.org/data/2.5/forecast',       
                        params={'q': location_str, 
                                'appid': weather_key,        
                                'units': 'metric'})
    return location_str, resp.json()

In [4]:
def filter_temp(location_str, weather_json):
    '''Filter out just the city, temperature, and humidity in forecast or current weather data.
    params:
        location_str: string with city,country_code
        weather_json: json returned from get_forecast or get_current 
    returns:
        dict: containing city names and either list of forecast temps and humidity or current temp and humidity
    '''
    if 'cod' in weather_json.keys() and int(weather_json['cod']) != 200:
        raise ValueError('Bad Data Returned from API: {} - {}'.format(
                location_str, weather_json))
    try:
        api_city_str = '{},{}'.format(weather_json['name'], weather_json['sys']['country'])
    except KeyError:
        api_city_str = '{},{}'.format(weather_json['city']['name'], weather_json['city']['country'])
    resp = {
             'search_city': location_str,
             'api_city': api_city_str,
    }
    if 'main' in weather_json.keys():
        resp['current_temp'] = weather_json['main']['temp']
        resp['current_humidity'] = weather_json['main']['humidity']
    else:
        resp['forecast_temps'] = [fr['main']['temp'] for fr in weather_json['list']]
        resp['forecast_humidity'] = [fr['main']['humidity'] for fr in weather_json['list']]
    return resp

In [5]:
def merge_data(latest, forecast):
    ''' Merge data from current and forecast dictionaries and avg forecasts
    params:
        latest: filtered dictionary from get_latest
        forecast: filtered dictionary from get_forecast
    returns:
        dict: merged dict with additional mean for forecasts
    '''
    final = latest.copy()
    final.update(forecast)
    mean_tmp, mean_hum = np.mean(forecast['forecast_temps']), np.mean(forecast['forecast_humidity'])
    final['mean_temp'] = np.round(mean_tmp, 2)
    final['mean_hum'] = np.round(mean_hum, 2)
    return final

In [6]:
def main(city):
    ''' Main function which will take city names and return a final dataset for each city
    params:
        city: string (ex: 'Berlin,DE')
    returns:
        dict: current and forecast temps and humidities for given city
    '''
    config = get_config()
    city_str, weather_data = get_current(city, config)
    latest = filter_temp(city_str, weather_data)
    city_str, weather_data = get_forecast(city, config)
    forecast = filter_temp(city_str, weather_data)
    final = merge_data(latest, forecast)
    return final

In [7]:
def get_config():
    ''' returns config '''
    config = ConfigParser()
    config.read('../config/prod.cfg')
    return config

In [8]:
city_list = ['London,UK', 'Berlin,DE', 'NewYork,NY', 
             'LosAngeles,CA', 'Madrid,ES', 'Bangkok,TH', 
             'Baghdad,IQ', 'Auckland,NZ', 'Istanbul,TR',
             'MexicoCity,MX', 'Primavera,CL', 'KualaLumpur,MY',
             'Shanghai,CN', 'Chicago,IL', 'Rome,IT', 'Nairobi,KE',
             'MachuPicchu,PE', 'Cardiff,UK', 'Somewhere,WL']

In [9]:
%%time
res = []
for city in city_list:
    try:
        final = main(city)
        res.append(final)
    except Exception as e:
        print(city, e)

print('sorted by current temp: ', sorted(res, key=lambda x: x.get('current_temp'), reverse=True))
print('sorted by upcoming forecast temp: ', sorted(res, key=lambda x: x.get('mean_temp'), reverse=True))

requests.packages.urllib3.connectionpool - INFO - Starting new HTTP connection (1): api.openweathermap.org
requests.packages.urllib3.connectionpool - INFO - Starting new HTTP connection (1): api.openweathermap.org
requests.packages.urllib3.connectionpool - INFO - Starting new HTTP connection (1): api.openweathermap.org
requests.packages.urllib3.connectionpool - INFO - Starting new HTTP connection (1): api.openweathermap.org
requests.packages.urllib3.connectionpool - INFO - Starting new HTTP connection (1): api.openweathermap.org
requests.packages.urllib3.connectionpool - INFO - Starting new HTTP connection (1): api.openweathermap.org
requests.packages.urllib3.connectionpool - INFO - Starting new HTTP connection (1): api.openweathermap.org
requests.packages.urllib3.connectionpool - INFO - Starting new HTTP connection (1): api.openweathermap.org
requests.packages.urllib3.connectionpool - INFO - Starting new HTTP connection (1): api.openweathermap.org
requests.packages.urllib3.connectionp

Somewhere,WL Bad Data Returned from API: Somewhere,WL - {'cod': '404', 'message': 'Error: Not found city'}
sorted by current temp:  [{'forecast_humidity': [83, 72, 80, 84, 88, 99, 100, 99, 98, 99, 97, 99, 98, 95, 84, 97, 91, 94, 96, 98, 94, 92, 85, 77, 76, 80, 92, 92, 92, 90, 85, 78, 72, 78, 87, 89, 91], 'mean_hum': 89.219999999999999, 'current_humidity': 52, 'search_city': 'Bangkok,TH', 'current_temp': 34.15, 'forecast_temps': [32.16, 30.5, 28.26, 26.88, 25.88, 24.15, 24.82, 24.72, 24.47, 23.85, 24.17, 24.2, 23.78, 24.78, 28.84, 26.69, 27.3, 24.73, 23.78, 23.42, 23.86, 25.28, 29.83, 30.26, 29.73, 27.92, 25.62, 24.9, 23.96, 24.98, 31.04, 32.53, 32.07, 28.26, 25.58, 24.34, 24.04], 'api_city': 'Bangkok,TH', 'mean_temp': 26.530000000000001}, {'forecast_humidity': [29, 25, 24, 32, 38, 45, 53, 37, 31, 25, 23, 31, 35, 41, 46, 37, 32, 25, 24, 49, 43, 43, 64, 35, 32, 27, 25, 52, 56, 59, 59, 31, 31, 26, 28, 50, 58], 'mean_hum': 37.859999999999999, 'current_humidity': 26, 'search_city': 'Baghdad

In [10]:
c = LocalCluster()

In [11]:
c.start_diagnostics_server(show=True) 

/home/katharine/.virtualenv/data_pipelines/lib/python3.4/site-packages/distributed/bokeh/application.py - INFO -  Bokeh UI at:  http://127.0.0.1:8787/status/


In [12]:
e = Executor(c)

In [13]:
%%time

futures = [e.submit(main, i) for i in city_list]
print(futures)
print('sorted by current temp', 
      sorted([f.result() for f in futures if f.status != 'error'], 
             key=lambda x: x['current_temp'], reverse=True))
print('sorted by forecast temp', 
      sorted([f.result() for f in futures if f.status != 'error'], 
             key=lambda x: x['mean_temp'], reverse=True))

[<Future: status: pending, key: main-189a8f61630da68066ad8287f070b8ea>, <Future: status: pending, key: main-b67bf67cf33a0da835476242b530363b>, <Future: status: pending, key: main-e1f4d25735bd44f90a7a79e9c48424b3>, <Future: status: pending, key: main-a40de7f26ee7767a3a7471a1cc3e0eb6>, <Future: status: pending, key: main-fda51f794c8530d33a9bc10064d37ee3>, <Future: status: pending, key: main-170951ae38f1a917c19414a18f01558a>, <Future: status: pending, key: main-66ad48082a9197924da6d23f455a68bf>, <Future: status: pending, key: main-312676950045989c85956638ec088c8e>, <Future: status: pending, key: main-9d76328810ca2a9f1085aa06e17e2e87>, <Future: status: pending, key: main-56bc9ef3426adb18d5f39cee3ed91d19>, <Future: status: pending, key: main-6e04c55654b10296d888be3994e36b56>, <Future: status: pending, key: main-5aa7985f3a2297205f9258b1c1d6c8b3>, <Future: status: pending, key: main-5b32e8bb7be05d8debb526d5a802e870>, <Future: status: pending, key: main-8bc16dcbd4465e83ffce811eb368ef68>, <Futu

In [14]:
example_error = futures[-1]

In [15]:
example_error.status

'error'

In [16]:
example_error.result()

ValueError: Bad Data Returned from API: Somewhere,WL - {'cod': '404', 'message': 'Error: Not found city'}