## Setup

In [2]:
#Data
import iris
import cartopy.io.shapereader as shpreader
import pandas as pd
import cftime

#Plotting
import iris.plot as iplt
import iris.quickplot as qplt
import matplotlib.pyplot as plt
%matplotlib inline

#System
import os
import sys
import glob

#Met Office utils
import shape_utils as shape

#Supress warnings
import warnings
warnings.filterwarnings('ignore')

### 1. Load Met Office Global Data

The files for each variable are contained in a separate folder.

In [3]:
#List all the filepaths and store in a dict with each variable as a key

folder = '~/datasets/open-data/metoffice_global_daily/'
print(os.path.expanduser(folder))
filepaths = {path: glob.glob(os.path.join(os.path.expanduser(folder), path, '*.nc')) for path in os.listdir(os.path.expanduser(folder))}
variables = list(filepaths.keys())

print(variables)
print(f'Number of files for each variable: {len(filepaths[variables[0]])}')

/root/datasets/open-data/metoffice_global_daily/
['precip_max', 'metoffice_orography_global.nc', 'precip_mean', 'sh_max', 'sh_mean', 'sh_min', 'sw_max', 'sw_mean', 't1o5m_max', 't1o5m_mean', 't1o5m_min', 'windgust_max', 'windgust_mean', 'windspeed_max', 'windgust_min', 'windspeed_min', 'windspeed_mean', 'cldbase_max', 'cldbase_mean', 'cldfrac_max', 'cldbase_min', 'cldfrac_mean', 'pmsl_max', 'cldfrac_min', 'pmsl_mean', 'pmsl_min']
Number of files for each variable: 209


In [4]:
# FILTERING STEP (!!)
# Notebook kernel runs out of memory if we try to process all the cubes at once (for all dates), so here we can subselect from the paths above if necessary - and generate a partial CSV when executing all the steps
# In the end we will merge all the partial CSV to produce a full dataset of daily country average for all metrics

FILTER_STRING_PRECIPITATION = "precip_"
FILTER_STRING_HUMIDITY = "sh_"
FILTER_STRING_SW = "sw_"
FILTER_STRING_TEMPERATURE = "t1o5m_"
FILTER_STRING_WINDSPEED = "windspeed_"

ALL_FILTER_STRINGS = [FILTER_STRING_PRECIPITATION, FILTER_STRING_HUMIDITY, FILTER_STRING_SW, FILTER_STRING_TEMPERATURE, FILTER_STRING_WINDSPEED]

#FILTER_STRINGS = [FILTER_STRING_PRECIPITATION]

FILTER_STRINGS = ALL_FILTER_STRINGS

def has_at_least_one(var, str_list):
    for filter_str in str_list:
        if filter_str in var:
            return True
    return False

filtered_variables = [var for var in variables if has_at_least_one(var, FILTER_STRINGS)]

print(filtered_variables)

['precip_max', 'precip_mean', 'sh_max', 'sh_mean', 'sh_min', 'sw_max', 'sw_mean', 't1o5m_max', 't1o5m_mean', 't1o5m_min', 'windspeed_max', 'windspeed_min', 'windspeed_mean']


In [5]:
# Load the cities lat lon data.

cities_path = os.path.expanduser('~/datasets/share/datapoet/cities_latlon_data/worldcities.csv')
cities_df = pd.read_csv(cities_path)
cities_df.head(10)

Unnamed: 0,city,city_ascii,lat,lng,country,iso2,iso3,admin_name,capital,population,id
0,Tokyo,Tokyo,35.685,139.7514,Japan,JP,JPN,Tōkyō,primary,35676000.0,1392685764
1,New York,New York,40.6943,-73.9249,United States,US,USA,New York,,19354922.0,1840034016
2,Mexico City,Mexico City,19.4424,-99.131,Mexico,MX,MEX,Ciudad de México,primary,19028000.0,1484247881
3,Mumbai,Mumbai,19.017,72.857,India,IN,IND,Mahārāshtra,admin,18978000.0,1356226629
4,São Paulo,Sao Paulo,-23.5587,-46.625,Brazil,BR,BRA,São Paulo,admin,18845000.0,1076532519
5,Delhi,Delhi,28.67,77.23,India,IN,IND,Delhi,admin,15926000.0,1356872604
6,Shanghai,Shanghai,31.2165,121.4365,China,CN,CHN,Shanghai,admin,14987000.0,1156073548
7,Kolkata,Kolkata,22.495,88.3247,India,IN,IND,West Bengal,admin,14787000.0,1356060520
8,Los Angeles,Los Angeles,34.1139,-118.4068,United States,US,USA,California,,12815475.0,1840020491
9,Dhaka,Dhaka,23.7231,90.4086,Bangladesh,BD,BGD,Dhaka,primary,12797394.0,1050529279


In [6]:
# Here we create a dictionary mapping each country to its cities, containing data on latitude, longitude, and population.

import collections

country_to_city_dict = collections.defaultdict(list)
City = collections.namedtuple("City", "city lat lng population")

for index, row in cities_df.iterrows():
    country_to_city_dict[row["iso3"]].append(City(row["city"], row["lat"], row["lng"], row["population"]))

In [7]:
%%time
#Run through all the variables and append the loaded cubes to a CubeList
cubes = iris.cube.CubeList([])

for var in filtered_variables:
    cubes.extend(iris.load(filepaths[var]))
    
print(cubes)

0: precipitation_flux / (kg m-2 s-1)   (time: 209; latitude: 1920; longitude: 2560)
1: precipitation_flux / (kg m-2 s-1)   (time: 209; latitude: 1920; longitude: 2560)
2: specific_humidity / (1)             (time: 209; latitude: 1920; longitude: 2560)
3: specific_humidity / (1)             (time: 209; latitude: 1920; longitude: 2560)
4: specific_humidity / (1)             (time: 209; latitude: 1920; longitude: 2560)
5: m01s01i202 / (1)                    (time: 209; latitude: 1920; longitude: 2560)
6: m01s01i202 / (1)                    (time: 209; latitude: 1920; longitude: 2560)
7: air_temperature / (K)               (time: 209; latitude: 1920; longitude: 2560)
8: air_temperature / (K)               (time: 209; latitude: 1920; longitude: 2560)
9: air_temperature / (K)               (time: 209; latitude: 1920; longitude: 2560)
10: wind_speed / (m s-1)                (time: 209; latitude: 1921; longitude: 2560)
11: wind_speed / (m s-1)                (time: 209; latitude: 1921; longitu

In [8]:
# Fetch only the types that we need

selected_cubes = iris.cube.CubeList([])
selected_cube_derived_stat_names = []

for cube in cubes:
    if cube.name() == "air_temperature":
        cube.convert_units('Celsius')
        selected_cubes.append(cube)
        selected_cube_derived_stat_names.append("Temperature_Weighted_Daily_Average_" + str(cube.cell_methods[0].method))
    elif cube.name() == "specific_humidity":
        selected_cubes.append(cube)
        selected_cube_derived_stat_names.append("Humidity_Weighted_Daily_Average_" + str(cube.cell_methods[0].method))
    elif cube.name() == "precipitation_flux":
        selected_cubes.append(cube)
        # Note that for precipitation, we index the cell_methods by 1, not 0, due to the initial one referring to a different averaging stage
        selected_cube_derived_stat_names.append("Precipitation_Weighted_Daily_Average_" + str(cube.cell_methods[1].method))
    elif cube.name() == "m01s01i202":
        selected_cubes.append(cube)
        selected_cube_derived_stat_names.append("SW_Weighted_Daily_Average_" + str(cube.cell_methods[0].method))
    elif cube.name() == "wind_speed":
        selected_cubes.append(cube)
        selected_cube_derived_stat_names.append("Wind_Speed_Weighted_Daily_Average_" + str(cube.cell_methods[0].method))
        
print(selected_cube_derived_stat_names)

['Precipitation_Weighted_Daily_Average_maximum', 'Precipitation_Weighted_Daily_Average_mean', 'Humidity_Weighted_Daily_Average_maximum', 'Humidity_Weighted_Daily_Average_mean', 'Humidity_Weighted_Daily_Average_minimum', 'SW_Weighted_Daily_Average_maximum', 'SW_Weighted_Daily_Average_mean', 'Temperature_Weighted_Daily_Average_maximum', 'Temperature_Weighted_Daily_Average_mean', 'Temperature_Weighted_Daily_Average_minimum', 'Wind_Speed_Weighted_Daily_Average_maximum', 'Wind_Speed_Weighted_Daily_Average_minimum', 'Wind_Speed_Weighted_Daily_Average_mean']


In [11]:
import datetime
import cftime

# NOTE: All cubes are aligned in terms of time, so we take the dates from one of them
# TODO: add additional logic to support for misaligned cubes, if in the future the assumptions do not hold
time_coord_temp = cubes[0].coord("time")
all_cube_dates = time_coord_temp.units.num2date(time_coord_temp.points)
print(len(all_cube_dates))
print(all_cube_dates[0])
print(all_cube_dates[-1])

print(int(time_coord_temp.points[0]))

209
2020-01-01 11:00:00
2020-07-27 11:00:00
438299


In [12]:
# We don't need to be processing the entirety of the time range each time, but are rather generating incremental updates - and here we specify the cutoff date, which is the end date of the previous update - we will cut the data along that vertical and only process subsequent dates in the cells that follow.

CUTOFF_TIME = cftime.DatetimeGregorian(year=2020, month=7, day=10, hour=12)

print(CUTOFF_TIME)
print(time_coord_temp.units.date2num(CUTOFF_TIME))

2020-07-10 12:00:00
442884.0


In [13]:
print(selected_cubes[0])

precipitation_flux / (kg m-2 s-1)   (time: 209; latitude: 1920; longitude: 2560)
     Dimension coordinates:
          time                           x              -                -
          latitude                       -              x                -
          longitude                      -              -                x
     Auxiliary coordinates:
          forecast_reference_time        x              -                -
     Scalar coordinates:
          forecast_period: 2.0 hours, bound=(-1.0, 5.0) hours
     Attributes:
          Conventions: CF-1.5
          STASH: m01s05i216
          source: Data from Met Office Unified Model
          um_version: 11.2
     Cell methods:
          mean: time (1 hour)
          maximum: time


In [14]:
print(len(selected_cubes))

13


In [15]:
# Subsetting to the time range

selected_cube_slices = []
all_cube_dates_sliced = None

for cube in selected_cubes:
    time_coord_subsetting = cube.coord("time")
    cut_index = 0
    cut_time = 0
    for i, t in enumerate(time_coord_subsetting.points):
        if t > time_coord_subsetting.units.date2num(CUTOFF_TIME):
            cut_index = i
            cut_time = t
            break
    print(cube.name())
    print(str(cube.cell_methods[0].method))
    print(cut_index)
    print(time_coord_subsetting.units.num2date(cut_time))
    if all_cube_dates_sliced is None:
        all_cube_dates_sliced = all_cube_dates[cut_index:]
    cube_slice = cube[cut_index:,:,:]
    selected_cube_slices.append(cube_slice)


precipitation_flux
mean
192
2020-07-11 11:00:00
precipitation_flux
mean
192
2020-07-11 11:00:00
specific_humidity
maximum
192
2020-07-11 11:30:00
specific_humidity
mean
192
2020-07-11 11:30:00
specific_humidity
minimum
192
2020-07-11 11:30:00
m01s01i202
maximum
192
2020-07-11 11:30:00
m01s01i202
mean
192
2020-07-11 11:30:00
air_temperature
maximum
192
2020-07-11 11:30:00
air_temperature
mean
192
2020-07-11 11:30:00
air_temperature
minimum
192
2020-07-11 11:30:00
wind_speed
maximum
192
2020-07-11 11:30:00
wind_speed
minimum
192
2020-07-11 11:30:00
wind_speed
mean
192
2020-07-11 11:30:00


In [16]:
# Optionally, we can look into the stats across the cubes, for the date range selected - to see if everything seems reasonable.

for cube, name in zip(selected_cube_slices, selected_cube_derived_stat_names):
    print("MIN for " + name)
    print(cube.data.min())
    print("MAX for " + name)
    print(cube.data.max())
    print("MEAN for " + name)
    print(cube.data.mean())

MIN for Precipitation_Weighted_Daily_Average_maximum
0.0
MAX for Precipitation_Weighted_Daily_Average_maximum
0.051280975
MEAN for Precipitation_Weighted_Daily_Average_maximum
0.00018588956
MIN for Precipitation_Weighted_Daily_Average_mean
0.0
MAX for Precipitation_Weighted_Daily_Average_mean
0.009557724
MEAN for Precipitation_Weighted_Daily_Average_mean
2.9954384e-05
MIN for Humidity_Weighted_Daily_Average_maximum
0.0
MAX for Humidity_Weighted_Daily_Average_maximum
0.061279297
MEAN for Humidity_Weighted_Daily_Average_maximum
0.0093509145
MIN for Humidity_Weighted_Daily_Average_mean
0.0
MAX for Humidity_Weighted_Daily_Average_mean
0.028645834
MEAN for Humidity_Weighted_Daily_Average_mean
0.008464838
MIN for Humidity_Weighted_Daily_Average_minimum
0.0
MAX for Humidity_Weighted_Daily_Average_minimum
0.02758789
MEAN for Humidity_Weighted_Daily_Average_minimum
0.0077401577
MIN for SW_Weighted_Daily_Average_maximum
0.0
MAX for SW_Weighted_Daily_Average_maximum
1079.0
MEAN for SW_Weighted_Da

In [17]:
# Plotting code, given a sampled cube / after a query, for data visualization

#import numpy as np
#import matplotlib as plt

#hist, bins = np.histogram(ex_cube.data, bins=20, range=(-20, 20), weights=None, density=True)
#hist = hist / np.sum(hist)
#plt.pyplot.bar(bins[:-1], hist,  width=(bins[1] - bins[0]))
#plt.pyplot.show()

In [18]:
# For each of the metrics, we compute a dictionary from a country code into all of the cubes corresponding to results of lat/long queries for cities from that country
# The ordering of the dicts in the list is the same as the ordering of the original cubes and their name list

LATLONG_DELTA = 0.2 # Degrees, defining the 'vicinity of a city' as lat/long +- LATLONG_DELTA

all_country_city_cubes_dicts = []

for cube, name in zip(selected_cube_slices, selected_cube_derived_stat_names):
    print("Processing " + name)
    country_city_cubes_dict = collections.defaultdict(list)
    for iso_code in country_to_city_dict.keys():
        for city in country_to_city_dict[iso_code]:
            city_latlon = ((city.lat - LATLONG_DELTA, city.lat + LATLONG_DELTA), (city.lng - LATLONG_DELTA, city.lng + LATLONG_DELTA))
            city_cube = cube.intersection(latitude=city_latlon[0], longitude=city_latlon[1])
            country_city_cubes_dict[iso_code].append(city_cube)
    all_country_city_cubes_dicts.append(country_city_cubes_dict)

Processing Precipitation_Weighted_Daily_Average_maximum
Processing Precipitation_Weighted_Daily_Average_mean
Processing Humidity_Weighted_Daily_Average_maximum
Processing Humidity_Weighted_Daily_Average_mean
Processing Humidity_Weighted_Daily_Average_minimum
Processing SW_Weighted_Daily_Average_maximum
Processing SW_Weighted_Daily_Average_mean
Processing Temperature_Weighted_Daily_Average_maximum
Processing Temperature_Weighted_Daily_Average_mean
Processing Temperature_Weighted_Daily_Average_minimum
Processing Wind_Speed_Weighted_Daily_Average_maximum
Processing Wind_Speed_Weighted_Daily_Average_minimum
Processing Wind_Speed_Weighted_Daily_Average_mean


In [19]:
# Here we compute the population-weighted averages for all metrics.

import numpy as np

all_country_averages_dicts = []

for cube, name, city_cubes_dict in zip(selected_cube_slices, selected_cube_derived_stat_names, all_country_city_cubes_dicts):
    print("Processing " + name)
    country_averages_dict = collections.defaultdict(list)
    
    # We compute the weights for the convex combination, from the city populations.
    for iso_code in country_to_city_dict.keys():
        weights = []
        for city in country_to_city_dict[iso_code]:
            if np.isnan(city.population):
                weights.append(0.)
            else:
                weights.append(float(city.population))
        # Normalization, so that they sum up to 1.
        weights = np.asarray(weights)
        weights /= np.sum(weights)

        # Computation of daily averages
        for date_index in range(len(all_cube_dates_sliced)):
            value_list = []
            for city_cube in city_cubes_dict[iso_code]:
                value_list.append(np.mean(city_cube.data[date_index, :, :]))
            masked_values = np.ma.MaskedArray(value_list, mask=np.isnan(value_list))
            country_weighted_day_mean = np.average(masked_values, weights=weights)
            country_averages_dict[iso_code].append(country_weighted_day_mean)
    all_country_averages_dicts.append(country_averages_dict)

Processing Precipitation_Weighted_Daily_Average_maximum
Processing Precipitation_Weighted_Daily_Average_mean
Processing Humidity_Weighted_Daily_Average_maximum
Processing Humidity_Weighted_Daily_Average_mean
Processing Humidity_Weighted_Daily_Average_minimum
Processing SW_Weighted_Daily_Average_maximum
Processing SW_Weighted_Daily_Average_mean
Processing Temperature_Weighted_Daily_Average_maximum
Processing Temperature_Weighted_Daily_Average_mean
Processing Temperature_Weighted_Daily_Average_minimum
Processing Wind_Speed_Weighted_Daily_Average_maximum
Processing Wind_Speed_Weighted_Daily_Average_minimum
Processing Wind_Speed_Weighted_Daily_Average_mean


In [27]:
# Sometimes we need to generate data in multiple runs, or incrementally generate updates, due to container memory restrictions.
# This could lead to multiple generated CSV-s with the split being either by time or by data type.
# Here we generate the update from the current sub-selection, which will then be merged with the rest of the data in the subsequent step.

FILE_SUFFIX = "_update_6"

iso_column_vals = []
date_column_vals = []

selected_cube_derived_stat_names

all_features_column_vals = [[] for _ in range(len(selected_cube_derived_stat_names))]

for iso_code in country_to_city_dict.keys():
    for date_index in range(len(all_cube_dates_sliced)):
        iso_column_vals.append(iso_code)
        date_column_vals.append(all_cube_dates_sliced[date_index].strftime("%Y-%m-%d"))
        for val_list, country_averages_dict in zip(all_features_column_vals, all_country_averages_dicts):
            val_list.append(country_averages_dict[iso_code][date_index])

data = {
        'ISO':  iso_column_vals,
        'Date':  date_column_vals}

for name, vals in zip(selected_cube_derived_stat_names, all_features_column_vals):
    data[name] = vals
    
column_names = ['ISO', 'Date']
column_names.extend(selected_cube_derived_stat_names)

export_df = pd.DataFrame (data, columns = column_names)

out_path = os.path.expanduser('~/datasets/share/datapoet/weather_summaries/countries_daily_weighted_averages' + FILE_SUFFIX + '.csv')
export_df.to_csv(out_path, index=False)

In [28]:
# Here we can potentially join the multiple individual CSV-s into a joint one - if we have been processing different quan

#output_folder = '~/datasets/share/datapoet/weather_summaries/'

#print(os.path.expanduser(output_folder))
#filepaths = glob.glob(os.path.join(os.path.expanduser(output_folder), 'countries_daily_weighted_averages*'))

#print(filepaths)

#all_dfs = []

#for path in filepaths:
#    df = pd.read_csv(path)
#    all_dfs.append(df)
    
#print(len(all_dfs))

#merge_keys = ['ISO', 'Date']

#merged_df = all_dfs[0]

#for i in range(1, len(all_dfs)):
#    print("Merged with " + str(i))
#    merged_df = pd.merge(merged_df, all_dfs[i], how='inner', on=merge_keys, left_on=None, right_on=None,
#                         left_index=False, right_index=False, sort=True,
#                         suffixes=('_x', '_y'), copy=True, indicator=False,
#                         validate=None)
    
#merged_out_path = os.path.expanduser('~/datasets/share/datapoet/weather_summaries/countries_daily_weighted_averages_merged.csv')
#merged_df.to_csv(merged_out_path, index=False)

In [29]:
# Or potentially apply an update

original_path = os.path.expanduser('~/datasets/share/datapoet/weather_summaries/countries_daily_weighted_averages_july_10.csv')
update_path = os.path.expanduser('~/datasets/share/datapoet/weather_summaries/countries_daily_weighted_averages_update_6.csv')

original_df = pd.read_csv(original_path)
update_df = pd.read_csv(update_path)

final_df = original_df.copy()
final_df = final_df.append(update_df, sort=True)

print(len(original_df))
print(len(update_df))
print(len(final_df))

final_df = final_df.sort_values(by=['ISO', 'Date'])

final_out_path = os.path.expanduser('~/datasets/share/datapoet/weather_summaries/countries_daily_weighted_averages_july_27.csv')
final_df.to_csv(final_out_path, index=False)

41924
3791
45715
