# Monitoring System

## Outline

1. **Task Orchestration**
    1. Dependency resolution
    1. Job scheduling
    1. Container/server execution    
1. **Input/Output Pipeline**
    1. Data Warehouse (predictors)
    1. Dashboard (model output)
1. **Model Description**
    1. Indicator definition
    1. Training/Survey dataset
    1. Model structure
1. **Modeling Outlook**
    1. Training/Survey Dataset
    1. Predictor Dataset
    1. ANN Advances

In [None]:
import warnings
warnings.simplefilter(action='ignore')

In [None]:
from datetime import date
from itertools import product
from urllib.parse import urlparse
import json
import pickle

from IPython.display import SVG
from sklearn.metrics import roc_curve
from sklearn.decomposition import PCA
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import model_to_dot
import geopandas as gpd
import geoviews as gv
import geoviews.tile_sources as gts
import holoviews as hv
import hvplot.pandas
import hvplot.xarray
import luigi
import numpy as np
import pandas as pd
import panel as pn
import param
import xarray as xr

from kidomain.poverty.data import GetPredictors, GuessResponsePixels, GetTrainingData
from kidomain.poverty.model import Train, Predict, Output
from warehouse import esacci

## Task Orchestration

- The EWS data and models as installable Python packages
- All data processing and modeling steps are linked "Tasks"
- A cloud-based scheduler resolves task dependencies and executes task in Docker containers

In [None]:
import luigi
from kidomain.poverty.model import Output

task = Output.from_str_params({
    "fgt_a": 0,
    "admin_level": 1,
    "threshold": 0.7,
    "period": "2011-01-01-2017-12-31"
})

luigi.build([task], local_scheduler=True)

---

## Input/Output Pipeline

**Input**

- Warehouse of remote sensing data
- Easy to update by changing task parameter
- Standardized format as Cloud Optimized GeoTIFF (COG), but otherwise "raw" data
- Ready to catalog and query with Spatio-Temporal Asset Catalogs (STACs) and sat-api

In [None]:
task = esacci.MigrateToWarehouse()
task.input()

In [None]:
class Predictors(param.Parameterized):
    
    _task = dict(GetPredictors.get_params())
    period = param.CalendarDate(
        default=_task["period"]._default.date_a,
        bounds=(
            _task["period"]._default.date_a,
            _task["period"]._default.date_b,
            )
        )
    variable = param.ObjectSelector(
        default=next(iter(_task["predictors"]._default.keys())),
        objects=_task["predictors"]._default.keys(),
        )
    band = param.ObjectSelector(
        default=1,
        objects=[1],
    )
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.da = None
        self.read()
    
    @param.depends("period", "variable", watch=True)
    def read(self):
        
        task = GetPredictors(period=self.period)        
        with task.output().open() as f:
            da = f.read()
        da = da.sel(var=(self.variable,), drop=True)
        if self.variable == "landscan":
            da = np.log(da.where(da > 0))
            da.attrs["cmap"] = "viridis"
        elif self.variable == "svdnb":
            da = np.log(da.where(da > 0))
            da.attrs["cmap"] = "gray"
        self.da = da

        self.band = 1
        self.param["band"].objects = da["band"].data.tolist()
    
    def view(self):
        
        da = self.da.sel(band=self.band, drop=True)
        plt = da.hvplot(
            x="lon", y="lat", rasterize=True, geo=True, cmap=da.attrs.get("cmap"),
            xlabel="Longitude",
            ylabel="Latitude",
            )
        
        return plt

predictors = Predictors(variable="landscan")

In [None]:
pn.Row(
    pn.Param(predictors.param, widgets={"period": pn.widgets.DatePicker}),
    predictors.view,
)

---

**Output**

- Gridded and aggregated to administrative boundaries
- Self-describing JSON (incorporating GeoJSON)
- HoloViews ready

In [None]:
task = Predict.from_str_params({"period": "2016-12-01"})
target = task.output()["FGT_0"]
da = xr.open_rasterio(target.path).sel(band=1).drop("band")

da.hvplot(
    geo=True, rasterize=True, dynamic=False,
    xlabel="Longitude", ylabel="Latitude", clabel="FGT(a=0)",
    cmap="bkr", title=f"Gridded Output: 2016-12-01",
)

In [None]:
class Predict(param.Parameterized):
    
    admin_level = param.ObjectSelector(
        default=1,
        objects=[1, 2, 3],
        )
    location = param.ObjectSelector(allow_None=True)
    period = param.CalendarDate()
    fgt_a = param.Integer(
        default=1,
        bounds=(0, 1),
        label="FGT \"a\" Parameter",
        )
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.gdf = None
        self.hover_cols = None
        self.da = None
        self.read()

    @param.depends("admin_level", "fgt_a", watch=True)
    def read(self):
        
        task = Output.from_str_params({
            "admin_level": self.admin_level,
            "fgt_a": self.fgt_a,
            "period": "2011-01-01-2017-12-31",
            "threshold": 0.7,
            })
        with task.output().open() as f:
            dd = json.load(f)
            
        # store location as instance attribute
        location = dd["coords"].pop("location")
        # FIXME upstream: have a location "@id" relative to a "@context.@base"
        dd["@context"] = {
            "@base": "s3://" + urlparse(location["@id"]).hostname.split(".")[0]
        }
        location["@id"] = urlparse(location["@id"]).path
        gdf = gpd.read_file(dd["@context"]["@base"] + location["@id"])
        gdf = gdf.set_index("id")
        hover_cols = gdf.columns.str.endswith("_NAME")
        loc = gdf.loc[:, hover_cols].isnull()
        for _loc in loc:
            gdf.loc[loc[_loc], _loc] = ""
        self.gdf = gdf
        self.hover_cols = gdf.columns[hover_cols].tolist()
        
        self.location = None
        self.param["location"].objects = gdf.index.values.tolist()
        
        # store timeseries data as instance attribute, and update parameter
        period = dd["coords"].pop("period")
        period = [pd.Period(p).to_timestamp().date() for p in period["data"]]

        self.period = period[0]
        self.param["period"].bounds = (period[0], period[-1])

        da = xr.DataArray.from_dict(dd)
        da["location"] = gdf.index.values
        da["period"] = period
        self.da = da
        
    def view_map(self):

        self.gdf["FGT"] = self.da.sel(period=self.period, drop=True)
        plt = self.gdf.hvplot(
            c="FGT", hover_cols=self.hover_cols,
            geo=True, tiles="CartoLight",
            xlabel="Longitude", ylabel="Latitude", clabel="FGT",
        )

        return plt
    
    def view_ts(self):
        
        location = self.location
        if not location:
            location = self.da["location"][0]
        da = self.da.sel(location=location, drop=True)
        plt = da.hvplot(xlabel="Month", ylabel="FGT")
        
        return plt

predict = Predict()

In [None]:
pn.Column(
    pn.Row(
        pn.Param(predict.param, widgets={"period": pn.widgets.DatePicker}),
        predict.view_map,
        ),
    pn.Row(
        predict.view_ts,
        )
    )

## Model Description

**Indicator Definition**

- Foster-Greer-Thorbecke Index is a headcount value for $a=0$ and a "gap" value otherwise
- The survey response, in addition to a population weight $w_i$, gives a FGT sample:

$$
y_i = \max\left(0, \frac{z_{s, t} - e_i}{z_{s,t}}\right)^a
$$

- The point-in-time averages make up the training data:

$$
y_{s, t} = \sum_{i \in {s, t}} w_i y_i
$$

In [None]:
task = GetTrainingData()
target = task.output()
with target.open() as f:
    ds = f.read()

#ds = ds.sortby("lat", ascending=False)
ds = ds.drop_dims("var")
ds_flat = ds.reset_index("location").stack(sample=["location", "period"]).reset_index("sample").dropna("sample")
ds_flat.hvplot.hist(
    y="y0", bins=25,
    title=f"Training Data Distribution (n={ds_flat.sizes['sample']})",
    xlabel="FGT(a=0)",
    ylabel="Count",
)

In [None]:
class Response(param.Parameterized):
    
    _task = GuessResponsePixels()
    with _task.input()["grid"].open() as f:
        _grid = f.read()
    _grid.name = "grid"
    
    with GetTrainingData().output().open() as f:
        _target = f.read()
    _target = _target.drop_dims("var")
    _target["period"] = [
        pd.Period(p).to_timestamp().date() for p in _target["period"].data
    ]

    period = param.ObjectSelector(
        default=_target["period"][0].data.item(),
        objects=_target["period"].data.tolist(),
        )
    fgt_a = param.Integer(
        default=1,
        bounds=(0, 1),
        label="FGT \"a\" Parameter"
    )
        
    def view_map(self):
        
        ds = self._target.sel(period=self.period)
        if self.fgt_a == 0:
            da = ds["y0"]
        else:
            da = ds["ya"]
        da = da.unstack("location")
        da = xr.merge([da, self._grid])[da.name]
        plt = da.hvplot.image(
            x="lon", y="lat", rasterize=False, geo=True,
            cmap="bkr", tiles="CartoLight",
            xlabel="Longitude",
            ylabel="Latitude",
            title="Training Data",
            )
        
        return plt
    
    def view_ts(self):
        
        ds_ts = self._target.groupby("period").mean(...)
        ds_ts["year"] = (
            "period",
            [p.year for p in ds_ts["period"].data]
            )
        ds_ts["month"] = (
            "period",
            [p.month for p in ds_ts["period"].data]
            )
        ds_ts = ds_ts.drop_vars("period").set_coords(["year", "month"])
        y0 = ds_ts["y0"]
        y0["a"] = 0
        ya = ds_ts["ya"]
        ya["a"] = 1
        y = xr.concat([y0, ya], dim="period")
        
        plt = y.hvplot.scatter(
            x="month", by=["a", "year"],
            xlabel="Month", ylabel="FGT",
            title="Training Data Pixel Average",
            )
        
        return plt


response = Response()

In [None]:
pn.Column(
    pn.Row(
        response.param,
        response.view_map,
    ),
    pn.Row(
        response.view_ts,
    )
)

---

**Model Structure**

In [None]:
train = Train()
model = load_model(train.output()["model"].path)

SVG(model_to_dot(model, show_shapes=True, rankdir="LR").create(prog='dot', format='svg'))

## Model Outlook

**Goodness-of-Fit**

- View "ROC" for classification accuracy
- View residuals for classification and regression

In [None]:
# task = GetTrainingData()
# target = task.output()
with open("/output/intermediate_targets/poverty/data/GetTrainingDataX/f7f1db7cb2", "rb") as f:
    ds = pickle.load(f)

ds["location"] = np.arange(ds.sizes["location"])
ds = ds.drop_dims("var").stack(sample=["period", "location"]).dropna("sample", subset=["y0", "ya"])

with open("/output/y_hat.pickle", "rb") as f:
    clsn, rgrn = pickle.load(f)
    
fp, tp, thr = roc_curve(ds["y0"] == 0, clsn[:, 0])
roc = xr.Dataset(
    {"fp": ("index", fp), "tp": ("index", tp), "thr": ("index", thr)}
)
roc.isel(index=slice(1, None, 100)).to_dataframe().hvplot.scatter(
    x="fp", y="tp", c="thr", frame_width=300, aspect=1, xlim=[0, 1], ylim=[0, 1], cmap="dimgray_r",
    title="Accuracy of Binary Classification", xlabel="False Positive Rate", ylabel="True Positive Rate", clabel="Cut-off",
    ) * hv.Slope(1, 0).opts(color="black", line_width=1, line_dash="dashed")

In [None]:
cut = 0.7
clsn_adj = clsn.copy()
clsn_adj[:, 0] = np.where(clsn[:, 0] <= cut, 0, clsn[:, 0])
clsn_max = clsn_adj.argmax(axis=1)

ds["y0_hat"] = ("sample", np.where(clsn_max == 2, rgrn[:, 0], clsn_max))
ds["ya_hat"] = ("sample", np.where(clsn_max > 0, rgrn[:, 1], 0))

lfig = ds.hvplot.hexbin(
    x="y0", y="y0_hat", frame_width=300, aspect=1, logz=True, clabel="Count", cmap="dimgray_r",
    title=f"Training vs. Prediction (cut-off at {cut})",
    xlabel="True FGT(a=0)", ylabel="Predicted FGT(a=0)",
) * hv.Slope(1, 0).opts(line_width=1, line_dash="dashed", line_color="black")

rfig = ds.hvplot.hexbin(
    x="ya", y="ya_hat", frame_width=300, aspect=1, logz=True, clabel="Count", cmap="dimgray_r",
    title=f"Training vs. Prediction (cut-off at {cut})",
    xlabel="True FGT(a=1)", ylabel="Predicted FGT(a=1)",
) * hv.Slope(1, 0).opts(line_width=1, line_dash="dashed", line_color="black")

(lfig + rfig)

---

**Value of Predictors**

- "Feature importance" for ANN is not well-defined
- Correlations suggest low quality predictors

In [None]:
task = GetTrainingData()
target = task.output()
with target.open() as f:
    ds = f.read()

ds["location"] = np.arange(ds.sizes["location"])
ds = ds.stack(sample=["period", "location"]).dropna("sample", subset=["y0", "ya"])
ds["sample"] = np.arange(ds.sizes["sample"])

plt = ds.sel(source="fldas", band=5, sample=slice(None, None, 10)).to_dataframe()
lfig = plt.hvplot.hexbin(
    y="y0", x="x", frame_width=300, aspect=1, cmap="dimgray_r",
    title="FGT by FEWSNet-FLDAS::SoilMoi10_40cm_tavg", xlabel="z-score", ylabel="FGT(a=0)", logz=True)

plt = ds.sel(source="esacci", sample=slice(None, None, 10))
pca = PCA(1)
pca.fit(plt["x"].data.transpose())
plt["s"] = ("sample", pca.transform(plt["x"].data.transpose())[:, 0])
plt = plt.drop_dims("band")
rfig = plt.hvplot.hexbin(
    y="y0", x="s", frame_width=300, aspect=1, title="FGT by Landcover",
    ylabel="FGT(a=0)", xlabel="PC-1 Score", cmap="dimgray_r", clabel="Count", logz=True)

lfig + rfig

---

**Conclusion: Model Development Pathways**

- Training data:
    - review accuracy of FGT calculation from survey data
    - precise geolocation
    - pool with LSMS or additional PSNP data
- Predictors
    - canvase additional gridded input
    - higher resolution, Peter's drought indices
- Model: ANN to LSTM

### work-in-process

In [None]:
hv.opts.defaults(hv.opts.Polygons(nonselection_alpha=0.5))

In [None]:
class ExplorePovertyModel(param.Parameterized):
    
    month = param.CalendarDate(
        default=date(2016, 1, 1),
        bounds=(date(2016, 1, 1), date(2018, 12, 1))
    )

    @pn.depends('month', watch=True)
    def viz(self):
        with open("output/poverty_format.json", "rb") as f:
            ds = xr.Dataset.from_dict(json.load(f))

        data = ds.to_dataframe().reset_index()
        # Drop admin3 data 
        data = data.drop(['admin3', 'W_CODE'], axis=1)
        data = data.drop_duplicates()
        
        gdf = gpd.read_file("output/poverty_features.geojson")
        gdf = gdf.drop(['W_NAME', 'W_CODE', 'R_CODE'], axis=1)
        gdf= gdf.dissolve(by='Z_CODE', as_index=False)
        # Merge admin gdf and the poverty data
        data = gdf.merge(data, on='Z_CODE')
        data["Z_NAME"] = data["Z_NAME"].astype(str)
        map_df = data.loc[data['period'] == self.month.strftime("%Y-%m")].copy()
        cty_plot = gv.Polygons(map_df, vdims=['admin2', 'Z_NAME']).opts(
            tools=['hover', 'tap'], height=300, width=525,title="Average Difference from Poverty Line"
        )
        map_plot = gts.OSM * cty_plot
        selection_stream = hv.streams.Selection1D(source=cty_plot)
        
        def index_to_selection(index):
            if len(index) == 0:
                index = [0]
            # FIXME: plot multiple zones
            z_name = np.unique(cty_plot.iloc[index]['Z_NAME'])[0]
            plot_df = data.loc[(data['Z_NAME'] == z_name)].copy()
            # FIXME: Check the reason for duplicates
            plot_df = plot_df.drop_duplicates(subset=['period', 'Z_NAME'])
            return plot_df

        def timeseries_callback(index):
            plot_df = index_to_selection(index)
            ts = hv.Dataset(plot_df[['period', 'admin2']]).to(hv.Curve, 'period' ,'admin2').opts(
                opts.Curve(tools=['hover'], height=300, width=1000, show_grid=True, show_frame=False, xrotation=90, 
                           xlabel="Month", ylabel='Difference from Poverty Line', show_legend=True))
            return ts

        ts_plot = hv.DynamicMap(timeseries_callback, streams=[selection_stream])
        
        out = pn.Column(map_plot.opts(width=700, height=400), ts_plot.opts(width=800))
        return out
    
viewer = ExplorePovertyModel(name="Poverty Explore")

In [None]:
params = pn.Param(viewer.param,widgets={"month": pn.widgets.DatePicker}, width=200)
pn.Row(params, viewer.viz).servable()