## Analyzing NDWI through Time with Landsat 8

In this section, we'll be taking a look at how the 
Normalized Difference Water Index (NDWI) changes throughout 2016
for California.

In [None]:
import geopyspark as gps
from pyspark import SparkContext
import numpy as np
from datetime import datetime
from shapely.geometry import mapping, shape
import pyproj
from shapely.ops import transform
from functools import partial
import urllib.request, json
from geonotebook.wrappers import TMSRasterData, GeoJsonData
from PIL import Image
import pandas as pd
import matplotlib.pyplot as plt

### Setup: State data and Spark initialization

The next 2 cells grab the shapes for our state and start up the spark context.

In [None]:
# Grab data for New Mexico
state_name, county_name = "NM", "Colfax"
def get_state_shapes(state, county):
    project = partial(
        pyproj.transform,
        pyproj.Proj(init='epsg:4326'),
        pyproj.Proj(init='epsg:3857'))

    state_url = "https://raw.githubusercontent.com/johan/world.geo.json/master/countries/USA/{}.geo.json".format(state)
    county_url = "https://raw.githubusercontent.com/johan/world.geo.json/master/countries/USA/{}/{}.geo.json".format(state,county)
    read_json = lambda url: json.loads(urllib.request.urlopen(url).read().decode("utf-8"))
    state_ll = shape(read_json(state_url)['features'][0]['geometry'])
    state_wm = transform(project, state_ll)
    county_ll = shape(read_json(county_url)['features'][0]['geometry'])
    county_wm = transform(project, county_ll)
    return (state_ll, state_wm, county_ll, county_wm)

(state_ll, state_wm, county_ll, county_wm) = get_state_shapes(state_name, county_name) 

In [None]:
# Set up our spark context
conf = gps.geopyspark_conf(appName="Exercise 1") \
          .setMaster("local[*]") \
          .set(key='spark.ui.enabled', value='true') \
          .set(key="spark.driver.memory", value="8G") \
          .set("spark.hadoop.yarn.timeline-service.enabled", False)
sc = SparkContext(conf=conf)

### Setup: Band names and color ramp

The ingested layers have the RGB, near infrared, and QA bands of landsat 8 data.
This dict maps the band names to band index, for more readable code.

We also define a color ramp for viewing NDWI data.

In [None]:
bands = { "Blue": 0,
          "Green": 1,
          "Red": 2,
          "NIR": 3,
          "QA": 4 }

ndwi_color_map = \
     gps.ColorMap.build(breaks= {-0.1 : 0xaacdffaa,                                
                                 0.0 : 0x70abffff,
                                 0.05 : 0x3086ffff,
                                 0.1 : 0x1269e2ff,
                                 0.15 : 0x094aa5ff,
                                 0.2 : 0x012c69ff,
                                 0.25: 0x012cbcff},
                        classification_strategy=gps.ClassificationStrategy.LESS_THAN_OR_EQUAL_TO)

# Viewing mosaiced imagery

In the following part, we'll see mosaiced imagery on the map for our county.

First let's see where our county is on the map:

In [None]:
M.add_layer(GeoJsonData(mapping(state_ll)), name="county")
p = county_ll.centroid
M.set_center(p.x, p.y, 9)

This cell queries the landsat layer for our county region during the summer months. 

In [None]:
layer = gps.query("s3://datahub-catalogs-us-east-1", 
                  "landsat-8-continental-us-2016", 
                  layer_zoom=13,
                  time_intervals=[datetime(2016, 6, 1, 0, 0, 0),
                                  datetime(2016, 9, 1, 0, 0, 0)],
                  query_geom=county_wm,
                  num_partitions=500).cache()

## Cloud masking based on QA band

This block of code maps over the tiles of the imagery layer, determines whether the Landsat QA band indicates that this is a cloudy pixel, and sets each of the color bands to 0 (the no_data_value) if so. This will transform our 5 band imagery into 4 band RGB+NIR tiles.

The cloud masking is based on the bit flag of the QA band, which is structured like this:

![qa_band_values](landsat-qa-band.jpg)

In [None]:
def mask_clouds(tile):
    # Use the Landsat QA band to mask out cloud values
    qa = tile.cells[bands["QA"]]
    cloud = np.right_shift(qa, 14)
    result_bands = []
    for band in tile.cells[:-1]:
        band[cloud == 3] = 0
        result_bands.append(band)
    return gps.Tile.from_numpy_array(np.array(result_bands), no_data_value=0)

cloud_masked = layer.to_numpy_rdd().mapValues(mask_clouds)

## Mosaicing layers

The code below mosaics the stack of imagery over time into a set of spatial tiles, where only up to one pixel covers any area. It chooses the "youngest" pixel, meaning the scene that is more recent wins. The mosaicing avoids keeping no data values; because we did cloud masking, this means that the clouds of newer images should be filled in with non-cloudy pixels from older images.

In [None]:
def mosaic(tiles):
    # Mosiac by taking the youngest pixel.
    sorted_tiles = sorted(list(tiles), key=lambda x: x[0], reverse=True)
    result = sorted_tiles[0][1].cells.copy()
    no_data_value = sorted_tiles[0][1].no_data_value
    
    DARK_PIXEL_CUTOFF = 6000

    for _, tile_to_merge in sorted_tiles[1:]:        
        cells_to_merge = tile_to_merge.cells
        left_merge_condition = result[0] < DARK_PIXEL_CUTOFF
        right_merge_condition = cells_to_merge[0] >= DARK_PIXEL_CUTOFF
        
        # We want to merge in data that is not already set or are very dark
        # in the result,
        # and where the incoming pixel represents relatively bright data
        
        for band_idx in range(1, result.shape[0] - 1):
            left_merge_condition = left_merge_condition & \
                                   (result[band_idx] < DARK_PIXEL_CUTOFF)
            right_merge_condition = right_merge_condition | \
                                    (cells_to_merge[band_idx] >= DARK_PIXEL_CUTOFF)
            
        result_bands = []
        for band_idx in range(0, result.shape[0]):
            band = result[band_idx]
            np.copyto(band, 
                      cells_to_merge[band_idx], 
                      where=(left_merge_condition) & \
                            (right_merge_condition))
            result_bands.append(band)
        result = np.array(result_bands)  


    return gps.Tile.from_numpy_array(result, no_data_value=no_data_value)

mosaiced = cloud_masked.map(lambda tup: \
                                (gps.SpatialKey(tup[0].col, tup[0].row), 
                                (tup[0].instant, tup[1]))) \
                       .groupByKey() \
                       .mapValues(mosaic)

mosaiced_layer = \
    gps.TiledRasterLayer.from_numpy_rdd(layer_type=gps.LayerType.SPATIAL, 
                                        numpy_rdd=mosaiced, 
                                        metadata=layer.layer_metadata, 
                                        zoom_level=layer.zoom_level)


## Viewing color corrected landsat

Here we utilize GeoPySpark's ability to call python code from the JVM; 
for the display parameter we pass in a method that takes a python Tile and returns
a PIL image. This image is then returned as a PNG by the tile server.

`render_image` uses some values that seem to work OK with most landsat scenes
to color correct the image. Clearly more advanced color correction (as well as more advanced mosaicing and cloud masking) could be used in place of these methods.

In [None]:
def render_image(tile):
    cells = tile.cells
    # Color correct - use magic numbers
    magic_min, magic_max = 4000, 15176
    norm_range = magic_max - magic_min
    cells = cells.astype('int32')
    # Clamp cells
    cells[(cells != 0) & (cells < magic_min)] = magic_min
    cells[(cells != 0) & (cells > magic_max)] = magic_max
    colored = ((cells - magic_min) * 255) / norm_range
    (r, g, b) = (colored[2], colored[1], colored[0])
    alpha = np.full(r.shape, 255)
    alpha[(cells[0] == tile.no_data_value) & \
          (cells[1] == tile.no_data_value) & \
          (cells[2] == tile.no_data_value)] = 0
    rgba = np.dstack([r,g,b, alpha]).astype('uint8')

    return Image.fromarray(rgba, mode='RGBA')

mosaic_pyramid = mosaiced_layer \
                    .mask(county_wm) \
                    .repartition(100) \
                    .pyramid(resample_method=gps.ResampleMethod.BILINEAR)
tms_server = gps.TMS.build(mosaic_pyramid, display=render_image)


In [None]:
p = county_ll.centroid
M.set_center(p.x, p.y, 9)

for l in M.layers:
    M.remove_layer(l)
M.add_layer(TMSRasterData(tms_server), name="mosaic")

## Computing NDWI over time

Here we compute the [Normalized Difference Water Index, or NDWI](https://en.wikipedia.org/wiki/Normalized_difference_water_index) of the imagery. We are using the version for detecing bodies of water, defined by:

![ndwi equation](files/ndwi.png)

In [None]:
g = layer.bands(bands["Green"]).convert_data_type(gps.CellType.FLOAT64).cache()
nir = layer.bands(bands["NIR"]).convert_data_type(gps.CellType.FLOAT64).cache()

ndwi = (g - nir) / (g + nir)

Now we can take the `mean_series` of the data to calculate the average NDWI
of our county per time of available imagery. 

In [None]:
mean_series = ndwi.mean_series(county_wm)
ndwi_over_time = { k: v for (k, v) in mean_series }

To visualize this data, we can convert it to a pandas dataframe, remove `nan` values
and plot it over time

In [None]:
df = pd.DataFrame.from_dict(ndwi_over_time,  orient='index')
df = df.dropna(axis=0)
df

In [None]:
df.plot()
plt.show()

We can compute the time that has the maximum NDWI with pandas:

In [None]:
max_time = df[0].argmax().to_pydatetime()
max_time

## Visualizing the scene with the maximum NDWI

Here we filter our spatiotemporal layer to a spatial layer, using the date we computed above. We then paint it on the map using the color ramp defined above.
Ideally, we'll see any bodies of water marked in clear blue.

In [None]:
spatial_layer = ndwi.to_spatial_layer(target_time=max_time)

In [None]:
pyramid = spatial_layer \
            .mask(county_wm) \
            .repartition(100) \
            .pyramid(resample_method=gps.ResampleMethod.BILINEAR)

In [None]:
tms_server = gps.TMS.build(pyramid, display=ndwi_color_map)

for l in M.layers:
    M.remove_layer(l)
M.add_layer(TMSRasterData(tms_server), name="ndwi")