# Image processing with satellite data

## Medium-scale computations on a virtual machine in GCP

This notebook performs calculations with a GeoTIFF dataset using XArray and Dask. We load and rescale Landsat 8 images and compute the normalized difference vegetation index (NDVI), which distinguishes green vegetation from areas of bare land or water.

We'll use 30 images of the Denver, USA area taken from May-September 2018.

![RGB image](https://landsat-pds.s3.amazonaws.com/c1/L8/033/033/LC08_L1TP_033033_20180706_20180717_01_T1/LC08_L1TP_033033_20180706_20180717_01_T1_thumb_small.jpg)

## Step 1: Import packages

In [1]:
import dask
import json
import os
import rasterio
import requests
import rioxarray

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

## Step 2: Define input data/images

We are using 30 images from the [Landsat dataset on GCP](https://cloud.google.com/storage/docs/public-datasets/landsat) and each band is available as a separate GeoTIFF file.

In [2]:
images = [
"LC08_L1TP_033033_20130318_20170310_01_T1", "LC08_L1TP_033033_20130419_20170310_01_T1", "LC08_L1TP_033033_20130505_20170310_01_T1",
"LC08_L1TP_033033_20130521_20170310_01_T1", "LC08_L1TP_033033_20130606_20170310_01_T1", "LC08_L1TP_033033_20130622_20170309_01_T1",
"LC08_L1TP_033033_20130708_20170309_01_T1", "LC08_L1TP_033033_20130724_20170309_01_T1", "LC08_L1TP_033033_20130809_20170309_01_T1",
"LC08_L1TP_033033_20130825_20170309_01_T1", "LC08_L1TP_033033_20130910_20170309_01_T1", "LC08_L1TP_033033_20130926_20170308_01_T1",
"LC08_L1TP_033033_20131012_20170308_01_T1", "LC08_L1TP_033033_20131028_20170308_01_T1", "LC08_L1TP_033033_20131113_20170307_01_T1",
"LC08_L1TP_033033_20131129_20170307_01_T1", "LC08_L1TP_033033_20131215_20170307_01_T1", "LC08_L1TP_033033_20131231_20170307_01_T1",
"LC08_L1TP_033033_20140116_20170308_01_T1", "LC08_L1TP_033033_20140201_20170307_01_T1", "LC08_L1TP_033033_20140217_20170307_01_T1",
"LC08_L1TP_033033_20140305_20170307_01_T1", "LC08_L1TP_033033_20140321_20170307_01_T1", "LC08_L1TP_033033_20140406_20170307_01_T1",
"LC08_L1TP_033033_20140422_20170306_01_T1", "LC08_L1TP_033033_20140508_20170307_01_T1", "LC08_L1TP_033033_20140524_20170307_01_T1",
"LC08_L1TP_033033_20140609_20170305_01_T1", "LC08_L1TP_033033_20140625_20170304_01_T1", "LC08_L1TP_033033_20140711_20170304_01_T1",
]

urls = []
for i in images:
    urls.append(["https://storage.googleapis.com/gcp-public-data-landsat/LC08/01/033/033/{}/{}_B5.TIF".format(i, i),  # nir
                 "https://storage.googleapis.com/gcp-public-data-landsat/LC08/01/033/033/{}/{}_B4.TIF".format(i, i),  # red
                 "https://storage.googleapis.com/gcp-public-data-landsat/LC08/01/033/033/{}/{}_MTL.txt".format(i, i)])  # mtl

## Step 3: Create XArray datasets

In [3]:
import xarray as xr

red = []
nir = []
for i in urls:
    red.append(rioxarray.open_rasterio(i[1], chunks={'band': 1, 'x': 1024, 'y': 1024}))
    nir.append(rioxarray.open_rasterio(i[0], chunks={'band': 1, 'x': 1024, 'y': 1024}))

## Step 4: Create a Dask cluster on the remote virtual machine

In [4]:
import dask
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=12)
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 12
Total threads: 36,Total memory: 118.04 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43443,Workers: 12
Dashboard: http://127.0.0.1:8787/status,Total threads: 36
Started: Just now,Total memory: 118.04 GiB

0,1
Comm: tcp://127.0.0.1:37957,Total threads: 3
Dashboard: http://127.0.0.1:40883/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:38605,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-2lsz4x_t,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-2lsz4x_t

0,1
Comm: tcp://127.0.0.1:41157,Total threads: 3
Dashboard: http://127.0.0.1:44567/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:35227,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-lrquq2as,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-lrquq2as

0,1
Comm: tcp://127.0.0.1:44107,Total threads: 3
Dashboard: http://127.0.0.1:32853/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:46593,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-sa6f0ssr,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-sa6f0ssr

0,1
Comm: tcp://127.0.0.1:40607,Total threads: 3
Dashboard: http://127.0.0.1:46495/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:33409,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-zk5fujrj,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-zk5fujrj

0,1
Comm: tcp://127.0.0.1:42135,Total threads: 3
Dashboard: http://127.0.0.1:32797/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:33817,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-j5lwlam6,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-j5lwlam6

0,1
Comm: tcp://127.0.0.1:33371,Total threads: 3
Dashboard: http://127.0.0.1:39471/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:39237,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-afrz1g5z,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-afrz1g5z

0,1
Comm: tcp://127.0.0.1:45849,Total threads: 3
Dashboard: http://127.0.0.1:36985/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:34505,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-i_ujddey,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-i_ujddey

0,1
Comm: tcp://127.0.0.1:35585,Total threads: 3
Dashboard: http://127.0.0.1:45205/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:38797,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-7ez43qtc,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-7ez43qtc

0,1
Comm: tcp://127.0.0.1:44201,Total threads: 3
Dashboard: http://127.0.0.1:34719/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:45717,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-lyns63t0,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-lyns63t0

0,1
Comm: tcp://127.0.0.1:45731,Total threads: 3
Dashboard: http://127.0.0.1:45521/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:45883,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-ncdgxdah,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-ncdgxdah

0,1
Comm: tcp://127.0.0.1:45529,Total threads: 3
Dashboard: http://127.0.0.1:45709/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:33715,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-_j63f_sl,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-_j63f_sl

0,1
Comm: tcp://127.0.0.1:40391,Total threads: 3
Dashboard: http://127.0.0.1:39863/status,Memory: 9.84 GiB
Nanny: tcp://127.0.0.1:35503,
Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-pk7tvx8p,Local directory: /home/jupyter/scaling-python-on-gcp/2-medium-scale/dask-worker-space/worker-pk7tvx8p


## Step 5: Rescale bands using Landsat metadata and Dask

The Landsat Level 1 images are delivered in a quantized format. This has to be [converted to top-of-atmosphere reflectance](https://landsat.usgs.gov/using-usgs-landsat-8-product) using the provided metadata. First we define convenience functions to load the rescaling factors and transform a dataset. The red band is band 4 and near infrared is band 5.

In [5]:
def load_scale_factors(filename, band_number):
    metadata = {}
    response = requests.get(filename)
    data = response.text.splitlines()
    for line in data:
        name, var = line.partition("=")[::2]
        metadata[name.strip()] = var
    
    M_p = float(metadata["REFLECTANCE_MULT_BAND_{}".format(band_number)])
    A_p = float(metadata["REFLECTANCE_ADD_BAND_{}".format(band_number)])
    
    return M_p, A_p

def calculate_reflectance(ds, band_number, metafile):
    M_p, A_p = load_scale_factors(metafile, band_number)
    toa = M_p * ds + A_p
    return toa

red_toa = []
nir_toa = []
for i, j, k in zip(red, nir, urls):
    red_toa.append(calculate_reflectance(i, band_number=4, metafile=k[2]))
    nir_toa.append(calculate_reflectance(j, band_number=5, metafile=k[2]))

Because the transformation is composed of arithmetic operations, execution is delayed and the operations are parallelized automatically.

In [6]:
print(red_toa[0].variable.data)

dask.array<add, shape=(1, 7451, 7501), dtype=float64, chunksize=(1, 1024, 1024), chunktype=numpy.ndarray>


The resulting image has floating point data with magnitudes appropriate to reflectance. This can be checked by computing the range of values in an image using Dask:

In [7]:
red_max, red_min, red_mean = dask.compute(
    red_toa[0].max(dim=['x', 'y']), 
    red_toa[0].min(dim=['x', 'y']),
    red_toa[0].mean(dim=['x', 'y'])
)
print(red_max.item())
print(red_min.item())
print(red_mean.item())

1.2107
-0.1
0.11357749183855971


## Step 6: Calculate normalized difference vegetation index (NDVI) using Dask

Now that we have the image as reflectance values, we are ready to compute the NDVI using Dask.

$$
\text{NDVI} = \frac{\text{NIR} - \text{Red}}{\text{NIR} + \text{Red}}
$$

This highlights areas of healthy vegetation with high NDVI values, which appear as green in the image below.

In [8]:
def compute_ndvi(nir_toa_single, red_toa_single, images):
    # Calculate NDVI
    ndvi = (nir_toa_single - red_toa_single) / (nir_toa_single + red_toa_single)
    ndvi2d = ndvi.squeeze()

    # Generate and save figure
    fig = plt.figure(figsize=[12,12])
    im = ndvi2d.plot.imshow(cmap='BrBG', vmin=-0.5, vmax=1)
    plt.axis('equal')
    fig.savefig("output/" + images + ".png")
    plt.close(fig)
    return 1

In [9]:
results = []
for i, j, k in zip(nir_toa, red_toa, images):
    y = dask.delayed(compute_ndvi)(i, j, k)
    results.append(y)

In [10]:
from dask.distributed import progress

res = client.compute(results)
progress(res)

VBox()

In [11]:
# Close Dask client
# client.close()

In [12]:
# Close and terminate Dask cluster
# cluster.close()