In [None]:
import xarray as xr
import fsspec
import s3fs
import os
import matplotlib.pyplot as plt
import dask
import dask_ml
import rasterio
from dask.distributed import Client, LocalCluster, progress
import datetime
import tempfile
import boto3
import geoviews as gv
from geoviews import opts
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score
import joblib
from dask.distributed import Client, progress
import warnings

gv.extension('matplotlib')

gv.output(size=150)

I found the most reliable way to install packages is to run `conda install -c pyviz geoviews-core dask-ml`

In [None]:
env = dict(GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR', 
           AWS_NO_SIGN_REQUEST='YES',
           GDAL_MAX_RAW_BLOCK_CACHE_SIZE='200000000',
           GDAL_SWATH_SIZE='200000000',
           VSI_CURL_CACHE_SIZE='200000000')
os.environ.update(env)

In [None]:
def convert_full_date_to_continous_day(year, month, day):
    """
    Helper function if you wish to use month, day vs julian day
    """
    return datetime.datetime(year, month, day).timetuple().tm_yday

def get_geo_uri(year, day):
    """
    returns list of geo uris
    """
    fs = s3fs.S3FileSystem(anon=True)
    files = []
    
    filepath = "s3://noaa-goes17/ABI-L2-FDCC/%s/%s/*/*.nc" % (str(year).zfill(4), str(day).zfill(3)) 
    files = fs.glob(filepath)
    
    if len(files) < 1:
        raise Exception("No files found")
    
    return files

def download_to_xarray(uri):
    """
    Downloads file and directly loads it into xarray in memory
    """
    s3 = boto3.client("s3")
    
    with tempfile.NamedTemporaryFile() as temp_file:
        s3.download_file(Bucket=uri[:11], Key=uri[12:], Filename=temp_file.name)
        datastore = xr.open_dataset(temp_file.name)
        
    return datastore

def download_to_disk(uri):
    s3 = boto3.client("s3")
    filename = uri[12:].replace("/", "-")
    if not os.path.exists(filename):
        s3.download_file(Bucket=uri[:11], Key=uri[12:], Filename=filename)
        
    return filename

There are many gaps in the NOAA GOES17 data and it's quite large, so we'll be using a subset of the data for the examples to show the general idea quickly. Dates can be changed if one wants the entirety of the data.

In [None]:
#TUBBS = {"year": 2017, "day1":220, "day2":243}
#CAMP = {"year": 2018, "day1":312, "day2":329}
#WOOLSEY = {"year": 2018, "day1":312, "day2":325}

CAMP = {"year": 2018, "day1":317, "day2":318}
WOOLSEY = {"year": 2018, "day1":317, "day2":319}

Downloading a single goes file from s3 takes 1.7s so we first download to disk the portions of time we're interested in 

In [None]:
camp_uris = []
woolsey_uris = []

In [None]:
for i in range(CAMP["day2"] - CAMP["day1"]):
    day = i + CAMP["day1"]
    camp_uris += get_geo_uri(CAMP["year"], day)

In [None]:
for i in range(WOOLSEY["day2"] - WOOLSEY["day1"]):
    day = i + WOOLSEY["day1"]
    woolsey_uris += get_geo_uri(WOOLSEY["year"], day)

In [None]:
local_filepaths = []

In [None]:
for key in camp_uris:
    local_filepaths.append(download_to_disk(key))
for key in woolsey_uris:
    local_filepaths.append(download_to_disk(key))

In [None]:
def load_local_file_into_xarray(year, day, hour, localfilepaths):
    """
    Returns an xarray of a single hour of data
    """
    files = []
    for element in localfilepaths:
        split_file = element.split("-")
        if split_file[3] == str(year) and split_file[4] == str(day) and split_file[5] == str(hour).zfill(2):
            files.append(element)
    
    if len(files) < 1:
        raise Exception("File with that date is not found")
        
    return xr.open_mfdataset(files,combine='nested',concat_dim='time')

In [None]:
def visualize_xarray(data, vdims):
    kdims = ['t', 'x', 'y']
    xr_dataset = gv.Dataset(data, kdims=kdims, vdims=vdims)
    image = xr_dataset.to(gv.Image, ['x', 'y'])
    return image

Now we have the data quickly and easily accessible on disk, and any section of it can be visualized.
To visualize each fire, you can simply stack the functions like so:

In [None]:
# To see part of the Camp Fire:
# Warnings are ignored due to cartopy deprecation warning
warnings.filterwarnings('ignore')

visualize_xarray(load_local_file_into_xarray(2018, 318, 8, local_filepaths), ['Mask'])

To see multiple hours or days of the fire, you can concatenate them and feed it into the visualizer:

In [None]:
#To see several consecutive hours:
warnings.filterwarnings('ignore')

fires = []
for i in range(5, 7):
    fires.append(load_local_file_into_xarray(2018, 318, i, local_filepaths))

visualize_xarray(xr.concat(fires, dim="time"), ['Mask'])

Next, we'll train an ML model on this data. I am going to first simply do binary classification on the entire image, whether or not the date is during the fire.

If I were to do binary classification for every pixel individiually, it would at core look very similar. Using a model without locally correlated information, you can take the 2 dimensional image and convert it into a single vector, aligning it with the similarly transformed binary pixels.

If the data does have locally correlated information, you want to use models that utilize that information, the canonical example being a convolutional neural net. In this case, you could use a U-Net with a binary loss function on each pixel, so that it directly creates the segmentation of each patch of images.

The data I'm going to gather here is a toy example: it's much smaller than what will get actual results. The purpose is just to show the outline of how this would be done, and would be expanded and systemtized for a product.

In [None]:
non_fire_day1 = 200
non_fire_day2 = 202
non_fire_uris = []

for i in range(non_fire_day1, non_fire_day2):
    non_fire_uris += (get_geo_uri(2019, i))
    
for key in non_fire_uris:
    local_filepaths.append(download_to_disk(key))

In [None]:
def load_subset_of_data(year, day1, day2, y, data, labels):
    for i in range(day1, day2):
        for j in range(24):
            try:
                data.append(load_local_file_into_xarray(year, i, j, local_filepaths))
                labels.append(y)
            except:
                continue

In [None]:
train_data = []
train_labels = []
test_data = []
test_labels = []

load_subset_of_data(2019, non_fire_day1, non_fire_day1+1, 0, train_data, train_labels)
load_subset_of_data(2019, non_fire_day1+1, non_fire_day1+2, 0, test_data, test_labels)
load_subset_of_data(2018, 317, 318, 1, train_data, train_labels)
load_subset_of_data(2018, 318, 319, 1, test_data, test_labels)

For a toy example, let's just use the mean fire temperature array in chunks of 6

In [None]:
trainX, trainY = [], []
testX, testY = [], []

for data, label in zip(train_data, train_labels):
    if (data.mean_fire_temperature.values.shape[0] % 6) == 0:
        for i in range(0, data.mean_fire_temperature.values.shape[0], 6):
            trainX.append(data.mean_fire_temperature.values[i:i+6])
            trainY.append(label)
    
for data, label in zip(test_data, test_labels):
    if (data.mean_fire_temperature.values.shape[0] % 6) == 0:
        for i in range(0, data.mean_fire_temperature.values.shape[0], 6):
            testX.append(data.mean_fire_temperature.values[i:i+6])
            testY.append(label)
        
trainX = np.vstack(trainX)
testX = np.vstack(testX)
trainX = np.nan_to_num(trainX)
testX = np.nan_to_num(testX)

In [None]:
clf = RandomForestClassifier(n_estimators=2).fit(trainX, trainY)

Now we have our classifier, first let's make sure it fits the train set correctly

In [None]:
train_predictions = clf.predict(trainX)

In [None]:
accuracy_score(trainY, train_predictions)

And now the test set

In [None]:
test_predictions = clf.predict(testX)

In [None]:
accuracy_score(testY, test_predictions)

Looks like we simply overfit the data, and didn't learn anything from it. This makes sense, as the amount of data is extremely small, and we didn't make sure that we had sufficient signal in the source.

Dask can be used to do some distribution of training, let's use it for hyperparameter grid search


In [None]:
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='7GB')
client

In [None]:
param_grid = {"n_estimators":[4,6], "min_samples_split": [2,3]}

grid_search = GridSearchCV(RandomForestClassifier(),
                           param_grid=param_grid,
                           return_train_score=False,
                           iid=True,
                           cv=3,
                           n_jobs=-1)

In [None]:
with joblib.parallel_backend('dask'):
    grid_search.fit(trainX, trainY)

And then we can observe the results of the search

In [None]:
pd.DataFrame(grid_search.cv_results_).head()

## Next Steps
To get classification for each individual pixel, we'd need to acquire an array of boolean labels which matches the dimensions of the input for each location. This would look fundamentally to the above, but just feeding in 2d dimensional imaging data rather than agglomerated fire measurements.


There are a number of ways to serve that imaging data. One could make a front end which hosts an open source element that displays the 2d classification matrix for each given date on a slider. There are a huge number of available libraries that will do this as a front end element that connects to a backend database hosting the matricies. This is not a very data heavy problem, as the users wouldn't request data extremely quickly. If they were frequently scrubbing, some caching will make it faster. 

If the primary use was not visual, but instead focused on serving a computational purpose, it would be very simple to provide the matrix as a REST response to a query with the date and area. To lower the amount of data that needs to get passed back and forth, it would make sense for the user to specify what type of data they wanted, as some of the data as demonstrated is quite small. Some common pitfalls here would be serving too much data without caching or specifying intelligently. For example, if the data changed only rarely, it would make sense to update the frontend with only deltas between pixels as that would lower data load. Depending on use, image compression could be used. I don't think this will end up being much of a problem, as modern browsers, computers and internet connections are plenty powerful enough to handle it. From the machine learning perspective, it's likely that one would want to process and generate the data, and populate the database automatically, rather than doing it on request, as the data isn't large enough that you'd run into problems storing it. 

### General thoughts

This project had me do machine learning and data processing in a way I typically wouldn't. Usually, I avoid jupyter notebooks, as I find it difficult to do high performance processing with the way they track state and threads. If I were to do this assignment without the notebook and wanted to look at visualization, it may make sense to use a flask app as that way you can take advantage of webapp library support. The flask app would make python calls to a local server where the s3 files would be hosted. One could also simply use geoviews or matplotlib from the terminal, or you could use a python notebook but just for the visualization, all other code would occur in python scripts elsewhere. However, the nice thing about having the notebook format is that it allows someone else to walk through your code, something that isn't as easy with a large codebase. 

For model training and data loading, I would have done more parallel processing, taking advantage of multiple threads on the machine to gather data and do all the necessary preprocessing. There are of course a huge number of things that should be done in machine learning training that I didn't do here. Analyzing the data to ensure a good distribution, augmentation, normalization, etc. These are all things that should be done to ensure high performance on the task. It's also important that you have multiple unit tests for the model, with a set of typically out of distribution edge cases that can frequently trip up your model. In the deployment process, it's important to have these automatically run before deployment to ensure you are maintaining the quality that's necessary. Visualization is also important here: observing the output of your models can lead you to find patterns you would be unable to see otherwise, so having a good visuzliation engine is important, especially when trying to figure out how to improve performance in the early stages of model development. 

I think it's important to view code like the above as being a key product for your machine learning system, and not just the way to get to models running. When building your initial system, a robust, powerful development environment can enable fast prototyping for every other product down the line. It's extremely important that the code itself is modular and generalizable, not just for building new products, but also for building new code on top of it. Flexibility is key; a common junior engineer mistake is to hardcode too much, causing problems in the future when new needs arise. This is why I dislike jupyter notebooks for anything besides visualization: the cell format pushes people towards a bespoke coding schema for the purposes of that particular notebook. A way to avoid that is to create common codebases the notebooks import. 

One thing that's commonly overlooked is model check ins. It is important that weights and hyperparameters of models are stored for comparison during development. This is vital for making sure improvements are being made, as well as for observing hyperparamter impact on the qualitative aspects of model output. Metrics like accuracy or AUC can frequently obscure how the product is actually being used, and so the situation can occur where accuracy continues to rise, but customer satisfaction remains static or lowers. It's important that model weights and hyperparameters are saved in that situation, vs just the output metrics, so that one can go back and perform subjective analysis on their predictions.

From the data perspective, doing your computation colacted with your data is absolutely key. This will enable much faster processing. This can be done with on-prem storage, or ensuring you're using cloud resources such that storage is colocated. This same notion also applies to disk vs memory. Frequently deep learning is handicapped by data going from disk->ram->vram constantly, rather than ensuring you're getting the most data you possibly can. Programs like dask can help with this, and it's important to chunk correctly. 

Usually, it's a good idea to develop with one environment, and then send off a model, hyperparameters and data properties to a server that is optimized just for training. A third environment exists for just inference, as speed is absolutely key if there's to be a great deal of data processed. I think it's important that these environments are split because the desires for each ofthem are so different. It's because of this line of thinking that makes me believe doing data loading, model development, visualization, and inference in one notebook is not going to be the optimal plan. 

There are a number of key steps I always stick to for model development, described by Andrej Karpathy [here](https://karpathy.github.io/2019/04/25/recipe/). These are mainly focused on the ML model development rather than the surrounding software infrastructure. However, if the infrastructure is built with these steps in mind, it can make performing them very easy and increase rate of prototyping.

I found the NOAA GOES data to be quite interesting and potentially very valuable. It had an extremely wide range of signals captured, and it would be fun to explore how autocorrelated they all are, as well as correlations to signals captured from other sources. In addition, one might be able to use ML models trained on the GOES data to build simulations and discern general principles, which could allow for rough predictions on rare events where labeled data isn't available. 