In [None]:
from dask import do
from distributed import LocalCluster, Executor
from configparser import ConfigParser
import requests
import numpy as np

You must have a folder `config` in the parent directory or current directory or simply modify the `get_config` method. You will also need to [acquire an API Key for the OpenWeatherMap API](http://openweathermap.org/appid). Your `prod.cfg` file in the aforementioned `config` folder should have a section like so:

```
[openweather]
api_key=425b9b9e2416cjfr47329434jk2lX4u32
```
with your assigned key from OpenWeatherMap.

In [None]:
def get_current(location_str, config):
    '''Get latest temperature data from openweather
    params:
        location_str: string with city,country_code
        config: ConfigParser object with openweather section and api_key key
    returns:
        tuple: (location_str, parsed json response)    
    '''
    weather_key = config.get('openweather', 'api_key')
    resp = requests.get('http://api.openweathermap.org/data/2.5/weather',       
                        params={'q': location_str, 
                                'appid': weather_key, 
                                'units': 'metric'}) 
    return location_str, resp.json()
   

In [None]:
def get_forecast(location_str, config):
    '''Get forecast temperature data from openweather
    params:
        location_str: string with city,country_code
        config: ConfigParser object with openweather section and api_key key
    returns:
        tuple: (location_str, parsed json response)
    '''
    weather_key = config.get('openweather', 'api_key')
    resp = requests.get('http://api.openweathermap.org/data/2.5/forecast',       
                        params={'q': location_str, 
                                'appid': weather_key,        
                                'units': 'metric'})
    return location_str, resp.json()

In [None]:
def filter_temp(location_str, weather_json):
    '''Filter out just the city, temperature, and humidity in forecast or current weather data.
    params:
        location_str: string with city,country_code
        weather_json: json returned from get_forecast or get_current 
    returns:
        dict: containing city names and either list of forecast temps and humidity or current temp and humidity
    '''
    if 'cod' in weather_json.keys() and int(weather_json['cod']) != 200:
        raise ValueError('Bad Data Returned from API: {} - {}'.format(
                location_str, weather_json))
    try:
        api_city_str = '{},{}'.format(weather_json['name'], weather_json['sys']['country'])
    except KeyError:
        api_city_str = '{},{}'.format(weather_json['city']['name'], weather_json['city']['country'])
    resp = {
             'search_city': location_str,
             'api_city': api_city_str,
    }
    if 'main' in weather_json.keys():
        resp['current_temp'] = weather_json['main']['temp']
        resp['current_humidity'] = weather_json['main']['humidity']
    else:
        resp['forecast_temps'] = [fr['main']['temp'] for fr in weather_json['list']]
        resp['forecast_humidity'] = [fr['main']['humidity'] for fr in weather_json['list']]
    return resp

In [None]:
def merge_data(latest, forecast):
    ''' Merge data from current and forecast dictionaries and avg forecasts
    params:
        latest: filtered dictionary from get_latest
        forecast: filtered dictionary from get_forecast
    returns:
        dict: merged dict with additional mean for forecasts
    '''
    final = latest.copy()
    final.update(forecast)
    mean_tmp, mean_hum = np.mean(forecast['forecast_temps']), np.mean(forecast['forecast_humidity'])
    final['mean_temp'] = np.round(mean_tmp, 2)
    final['mean_hum'] = np.round(mean_hum, 2)
    return final

In [None]:
def main(city):
    ''' Main function which will take city names and return a final dataset for each city
    params:
        city: string (ex: 'Berlin,DE')
    returns:
        dict: current and forecast temps and humidities for given city
    '''
    config = get_config()
    city_str, weather_data = get_current(city, config)
    latest = filter_temp(city_str, weather_data)
    city_str, weather_data = get_forecast(city, config)
    forecast = filter_temp(city_str, weather_data)
    final = merge_data(latest, forecast)
    return final

In [1]:
def get_config():
    ''' returns config '''
    config = ConfigParser()
    config.read(['../config/prod.cfg', 'config/prod.cfg'])
    return config

In [None]:
city_list = ['London,UK', 'Berlin,DE', 'NewYork,NY', 
             'LosAngeles,CA', 'Madrid,ES', 'Bangkok,TH', 
             'Baghdad,IQ', 'Auckland,NZ', 'Istanbul,TR',
             'MexicoCity,MX', 'Primavera,CL', 'KualaLumpur,MY',
             'Shanghai,CN', 'Chicago,IL', 'Rome,IT', 'Nairobi,KE',
             'MachuPicchu,PE', 'Cardiff,UK', 'Somewhere,WL']

In [None]:
%%time
res = []
for city in city_list:
    try:
        final = main(city)
        res.append(final)
    except Exception as e:
        print(city, e)

print('sorted by current temp: ', sorted(res, key=lambda x: x.get('current_temp'), reverse=True))
print('sorted by upcoming forecast temp: ', sorted(res, key=lambda x: x.get('mean_temp'), reverse=True))

Depending on your setup the `start_diagnostics_server` which starts the web UI for analyzing your Dask scheduler and work via the Executor may or may not work. If it doesn't work out of the box, you'll need to start the dask-scheduler a different way. Easiest is using:

`/path/to/your/virtualenv/bin/dask-scheduler`

which will start the scheduler process in your terminal as well as the Bokeh server for the web UI. The output should have the links for both the web UI (usually [localhost:8787](http://127.0.0.1:8787)) as well as the local scheduler. 

In a new shell or screen session, run the worker nodes with however many workers you'd like (here I chose 8):

`/path/to/your/virtualenv/bin/dask-worker --nprocs 8 127.0.0.1:8786`

I recommend using [`screen`](https://www.gnu.org/software/screen/) so you can easily switch between shells and keep track of logs. Once installed, you can create a new named screen like so: `screen -S scheduler`, use ctl + a followed by d to detach back to your main shell and ctl + a followed by k to kill the screen session when you are done. To reattach to a running named screen you can use `screen -r scheduler`. Read through the docs for more info. 

Then you can utilize the code directly below this cell instead of the `LocalCluster` code two cells below.

In [None]:
exc = Executor('127.0.0.1:8786') # You may want to change this to the exact IP shown when you ran dask-scheduler

In [None]:
lc = LocalCluster()
lc.start_diagnostics_server() 
exc = Executor(lc)

In [None]:
%%time

futures = [e.submit(main, i) for i in city_list]
print(futures)
print('sorted by current temp', 
      sorted([f.result() for f in futures if f.status != 'error'], 
             key=lambda x: x['current_temp'], reverse=True))
print('sorted by forecast temp', 
      sorted([f.result() for f in futures if f.status != 'error'], 
             key=lambda x: x['mean_temp'], reverse=True))

In [None]:
example_error = futures[-1]

In [None]:
example_error.status

In [None]:
example_error.result()