# Pre-Process data

A good way to speed up an interactive interface that has a known limited set of possible options and no dynamically-shifting input data is to simply pre-process the data to supply the required output. For this demo, we need to be able to show the following:

* mean air temperature...
  * for each US State,
  * for each year (1860 - 2100)
  * for each RCP scenario (A1B, E1)
* climatic period mean (1971-2000, 1981-2010, 1991-2020)
* anomaly from climatic mean period (calculated from `mean air temp[state][year] - climatic mean[state][period]`) (NOTE: not done! this is calcuated on-the-fly...)

In [None]:
import itertools
import os

import cartopy.io.shapereader as shpreader
import dask.bag as db
import iris
import numpy as np
import pandas as pd

import ascend.shape as shape

## Functions

In [None]:
def _get_2d_field_and_dims(cube):
    """Get a 2D horizontal field, and the horizontal coord dims, from an nD cube."""
    cube_x_coord, = cube.coords(axis='X', dim_coords=True)
    cube_y_coord, = cube.coords(axis='Y', dim_coords=True)
    cube_x_coord_name = cube_x_coord.name()
    cube_y_coord_name = cube_y_coord.name()
    cube_x_coord_dim, = cube.coord_dims(cube_x_coord)
    cube_y_coord_dim, = cube.coord_dims(cube_y_coord)
    field_2d = cube.slices([cube_y_coord_name, cube_x_coord_name]).next()
    coord_2d_dims = sorted([cube_y_coord_dim, cube_x_coord_dim])
    return field_2d, coord_2d_dims
    
def cut_cube_to_shape(cube, shape_record):
    """
    Subset the input cube to the boundary of the shapefile geometry.
    This is a two-step process:
      1. Cut the cube to the bounding box of the shapefile geoetry
      2. Mask the cut cube from (1.) to the boundary of the shapefile
         geometry.
    
    """
    # Set up the cube-to-shapefile cutter.
    cutter = shape.Shape(shape_record.geometry, shape_record.attributes,
                         cube.coord_system())

    # 1. extract subcube to the shapefile's bounding box.
    subcube = cutter.extract_subcube(cube)
    
    # 2. mask the subcube to the shapefile boundary.
    try:
        cube_2d, dims_2d = _get_2d_field_and_dims(subcube)
        mask_2d = cutter.cube_intersection_mask(cube_2d)
    except:
        result = None
    else:
        full_mask = iris.util.broadcast_to_shape(mask_2d, subcube.shape, dims_2d)
        new_data = np.ma.array(subcube.data, mask=np.logical_not(full_mask))
        result = subcube.copy(data=new_data)

    return result

In [None]:
def calculate_one_mean(state_code, rcp, year):
    """
    Calculate one mean value, for a specific combination of US state (defined by its state code),
    RCP scenario and year.
    
    """
    cube = a1b_cube if rcp.lower() == "a1b" else e1_cube
    cstr = iris.Constraint(time=lambda cell: cell.point.year == int(year))
    year_cube = cube.extract(cstr)
    rcd = states_dict[state_code]
    state_year_cube = cut_cube_to_shape(year_cube, rcd)
    if state_year_cube is not None:
        collapsed_cube = state_year_cube.collapsed(["latitude", "longitude"], iris.analysis.MEAN)
        result = float(collapsed_cube.data)
    else:
        result = None
    return result

In [None]:
def _get_state_cube(state_code, rcp):
    cube = a1b_cube if rcp.lower() == "a1b" else e1_cube
    rcd = states_dict[state_code]
    return cut_cube_to_shape(cube, rcd)

def calculate_year_means(state_code, rcp):
    years_cube = _get_state_cube(state_code, rcp)
    if years_cube is not None:
        years_means_cube = years_cube.collapsed(["latitude", "longitude"], iris.analysis.MEAN)
        result = list(years_means_cube.data)
    else:
        result = [None]
    return result

def calculate_climatic_period_mean(state_code, rcp, start_year, end_year):
    years_cube = _get_state_cube(state_code, rcp)
    if years_cube is not None:
        cp_cstr = iris.Constraint(time=lambda cell: start_year <= cell.point.year <= end_year)
        cp_cube = years_cube.extract(cp_cstr)
        cp_mean = cp_cube.collapsed([c.name() for c in cp_cube.coords(dim_coords=True)],
                                    iris.analysis.MEAN)
        result = float(cp_mean.data)
    else:
        result = None
    return result

## Static data

In [None]:
rcp_strs = ["a1b", "e1"]

In [None]:
sample_data_dir = "iris-sample-data/sample_data"
a1b_cube = iris.load_cube(os.path.join(sample_data_dir, "a1b_north_america.nc"))
e1_cube = iris.load_cube(os.path.join(sample_data_dir, "e1_north_america.nc"))

In [None]:
cps = ["1971-2000", "1981-2010", "1991-2020"]
years = [str(c.point.year) for c in a1b_cube.coord("time").cells()]
inds = ["rcp", "year", "mean_temp"] + cps
print(years[:10])
print(inds)

In [None]:
us_states_name = shpreader.natural_earth(
    category="cultural",
    name="admin_1_states_provinces")

us_states = shpreader.Reader(us_states_name)

In [None]:
states_dict = {}
for rcd in us_states.records():
    state_code = rcd.attributes['gn_a1_code']
    states_dict[state_code] = rcd

## Pre-process the data

In [None]:
def build_row(state_code, rcp):
    """
    Build one row in the dataframe:
      * all years and climatic periods,
      * for one US State and RCP scenario
    
    """
    climatic_periods = [[1971, 2000],
                        [1981, 2010],
                        [1991, 2020]]
    year_means = calculate_year_means(state_code, rcp)
    cps = [calculate_climatic_period_mean(state_code, rcp, *yrs) for yrs in climatic_periods]
    vals = [rcp] + year_means + cps
    return pd.Series(vals, name=state_code)

def build_row_v2(state_code, rcp):
    """
    Build one row in the dataframe:
      * one year,
      * all climatic periods,
      * for one US State and RCP scenario
    
    """
    climatic_periods = [[1971, 2000],
                        [1981, 2010],
                        [1991, 2020]]
    year_means = calculate_year_means(state_code, rcp)
    cps = [calculate_climatic_period_mean(state_code, rcp, *yrs) for yrs in climatic_periods]
    res = []
    for (year, year_mean) in zip(years, year_means):
        vals = [rcp, year, year_mean] + cps
        res.append(pd.Series(vals, name=state_code))
    return pd.concat([*res, pd.Series(inds, name="headings")], axis=1).set_index("headings").T

def build_row_helper(args):
    return build_row(*args)

def build_row_v2_helper(args):
    return build_row_v2(*args)

In [None]:
combs = db.from_sequence(itertools.product(states_dict, rcp_strs))

In [None]:
combs_dfs = combs.map(build_row_v2_helper)

In [None]:
import multiprocessing.popen_spawn_posix  # Timely fix for a dask ecosystem bug affecting MacOS.
from distributed import Client, LocalCluster

lc = LocalCluster(n_workers=16)
Client = Client(lc)

In [None]:
pre_preprocessed_data = pd.concat(combs_dfs)
pre_preprocessed_data

In [None]:
pre_preprocessed_data.to_csv("preprocessed_data_v2.csv")