# Multispectral processing Experiment

## Data aquisition

There is more than one way to deal with multispectral data. You can process each spectrum individually, arriving at a conclusion for each, and combine the conclusions in a post-processing step. You can also create an algorithm that has
access to each spectrum and processes them all together. This experiment will compare the two approaches.

This program downloads imagery from Google Earth Engine. It will save the imagery in a binary protobuf file in a way that will make it easy to access each spectrum individually later. This way, we can use the same data for each experiment.

In [1]:
%matplotlib inline
%load_ext autoreload 
%autoreload 2

# Math Stuff
import scipy.misc, random, os
import numpy as np

# Google Earth Engine
import ee
from gee_library import *
ee.Initialize()

# debug stuff
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

# Threads
import time, Queue
from tqdm import trange
from threading import Thread

# Tensorflow
import tensorflow as tf

# The python protobuf libraries can be implemented in C++ or Python. They
# both work the same, although the Python implementation can be a bit slow. This line
# lets you know which implementation is being used (the Docker image
# should be using the 'cpp' implementation.)
from google.protobuf.internal import api_implementation
print "Protobuf implementation:", api_implementation._default_implementation_type


Protobuf implementation: python


## Global Variables

In [2]:

DATA_DIR="data/experiment"
TEST_PROTO_FILENAME = os.path.join(DATA_DIR,"multi_spectrum_test.tfrecords")
TEST_EXAMPLES_PER_CLASS=300

TRAINING_PROTO_FILENAME = os.path.join(DATA_DIR,"multi_spectrum_train.tfrecords")
TRAINING_EXAMPLES_PER_CLASS=1000

METERS = 600 # Each image patch will cover a 600x600 meter section of earth
PIXELS = 50 # each image patch will measure 50x50 pixels

# Create the data directory if necessary
if not os.path.exists(DATA_DIR):
    os.makedirs(DATA_DIR)

#
# Just like before, we will define geographic areas to select imagery from.
#

# Cities
brooklyn= ((-73.965471, 40.614974), (-73.920207, 40.693991))
longisland= ((-73.918610, 40.713007), (-73.840551, 40.775980))
queens= ((-73.821792, 40.749724), (-73.760813, 40.780303))
sf1= ((-122.453085, 37.719024), (-122.394265, 37.789567))
sanjose= ((-122.033408, 37.243222), (-121.832452, 37.414198))
sandiego= ((-117.144966, 32.743224), (-117.098079, 32.761772))
sandiego2= ((-117.098079, 32.690284), (-117.021168, 32.743224))
denver= ((-105.127158, 39.569603), (-104.890206, 39.825191))
neworleans= ((-90.229627, 29.967342), (-90.034561, 30.016651))
baltimore= ((-76.651899, 39.287861), (-76.609940, 39.311176))
# Farmland
kentucky= ((-84.479444, 38.110622), (-84.335569, 38.258371))
kansas= ((-97.533941, 38.105647), (-96.815051, 38.366043))
montana= ((-108.994821, 45.875502), (-108.770538, 46.105918))
california= ((-121.789047, 38.223409), (-121.575699, 38.473852))
virginia= ((-76.838359, 36.483186), (-76.609497, 36.684365))
# Mountains
cascades = ((-121.575448, 48.224966), (-120.395554, 48.955637))
sierranevadas = ((-120.479266, 38.206113), (-120.198767, 39.346931))
yellowstone = ((-110.042831, 43.716602), (-109.379713, 44.437358))
rockies = ((-106.790375, 38.610576), (-106.352140, 39.315902))
rockies2 = ((-107.872873, 37.627164), (-106.433726, 38.047526))
yosemite = ((-119.956063, 37.535445), (-119.282902, 38.176927))


training_cities = [brooklyn, longisland, queens, sf1, sanjose, sandiego, sandiego2]
test_cities = [baltimore, neworleans]

training_mountains = [cascades, sierranevadas, yellowstone, rockies]
test_mountains=[yosemite, rockies2]

training_farms=[montana, kansas, kentucky]
test_farms=[virginia, california]

## Multi-threaded download pipeline

Since our multispectral data will have more than 3 channels, traditional image formats will not be the best choices for storing our data. While previous chapters have included multi-threaded imagery downloaders, they saved their data to the file system as flat files, a thread-safe operation. This time we will be saving our data directly to the protobuf file. Since the tf.python_io.TFRecordWriter is not explicitly documented as thread-safe, we will run it in its own thread. This means that the "downloader" threads have to communicate with the "writer" thread. Luckily Python included thread-safe queues that we can use for message passing. "Downloader" threads will add their data to `img_queue` and  the "writer" thread consumes the data and writes it to disk.

There are a few approaches to selecting and downloading different bands. We could have made the functions more general and "smart," automatically resizing the protobuf features to fit different numbers of bands. Instead I chose to hard-code the bands and feature sizes, which is easier to read. While it takes more work to change, the explicit dimention definitions will throw errors instead of silently "fixing" the network and possibly resulting in difficult-to-find silent errors.

In [3]:
# Each element of img_queue will be a dictionary in the form {'band': numpy_array}
img_queue = Queue.Queue()


def get_RGB_tiles(resolution, tile_bounds):
    geCollection=ee.ImageCollection('USDA/NAIP/DOQQ')
    bands = ['R', 'G', 'B']
    img_dict = img_at_region(geCollection, resolution, bands, tile_bounds)
    return img_dict['R'], img_dict['G'], img_dict['B']

def get_elevation_tiles(resolution, tile_bounds):
    elevation_image = ee.Image('USGS/NED')
    geCollection = ee.ImageCollection(elevation_image)
    bands=['elevation']
    img_dict = img_at_region(geCollection, resolution, bands, tile_bounds)
    return img_dict['elevation']

def get_nightlight_tiles(resolution, tile_bounds):
    geCollection=ee.ImageCollection('NOAA/DMSP-OLS/NIGHTTIME_LIGHTS')
    bands=['stable_lights']
    img_dict = img_at_region(geCollection, resolution, bands, tile_bounds)
#     plt.imshow(np.array(img_dict['stable_lights']), cmap='gray', vmin=0, vmax=1); plt.show()
    
    return img_dict['stable_lights']





#
# Download worker
#
def imagery_getter(label, meters, pixels, coords):
    """
    Downloader thread. Downloads imagery, decodes into numpy data, and adds the matrix data to img_queue.
    
    geCollection: ee.ImageCollection object
    meters: Each image will depict and area measuring meters x meters 
    pixels: Each image will measure pixels x pixels
    coords: Two gps points describing a rectangle in the form:
    ((longitude_min, latitude_min),(longitude_max, latitude_max))
    """
    try:
        # Get random location in box
        ((longmin, latmin),(longmax, latmax)) = coords

        # get random coords
        longitude = random.uniform(longmin, longmax)
        latitude = random.uniform(latmin, latmax)

        # Calculate resolution
        resolution = meters/pixels

        # Build the GPS box Geometry object
        tile_bounds = square_centered_at(
            point = (longitude, latitude),
            half_distance = meters / 2
        )

        # request imagery
        R,G,B, = get_RGB_tiles(resolution, tile_bounds)
        elevation = get_elevation_tiles(resolution, tile_bounds)
        nightlights = get_nightlight_tiles(resolution, tile_bounds)

        # Add tiles to queue for writing
        imagery_item = {'label': label,
                        'R': scipy.misc.imresize(R, (pixels, pixels)),
                        'G': scipy.misc.imresize(G, (pixels, pixels)),
                        'B': scipy.misc.imresize(B, (pixels, pixels)),
                        'elevation': scipy.misc.imresize(elevation, (pixels, pixels)),
                        'nightlights': scipy.misc.imresize(nightlights, (pixels, pixels))}
        img_queue.put(imagery_item)
        
    # Error Handling
    except ServerError as e:
        print e, coords
    except Exception as e:
        print e, coords
    return
    

#
# Protobuf creator worker
#
def proto_writer_worker(PROTO_FILENAME):
    """
    Writer thread. This function consumes data from img_queue and writes it to disk in the TFRecordWriter
    binary format.
    
    PROTO_FILENAME: Filename of the binary TFRecordWriter file.
    """
    
    print "Started protobuf writer."

    # Open a protobuffer writer
    proto_writer = tf.python_io.TFRecordWriter(PROTO_FILENAME)


    # Consume things from the queue.
    try: 
        while True:
            
            # Consume one item from the queue. Raise exception if empty for 60 seconds
            example = img_queue.get(block=True, timeout=60)
            
            R = example['R'].flatten()
            G = example['G'].flatten()
            B = example['B'].flatten()
            elevation = example['elevation'].flatten()
            nightlights = example['nightlights'].flatten()
            label=example['label']

            proto_example = tf.train.Example(
                features=tf.train.Features( # a map of string to Feature proto objects
                    feature={
                        # A Feature contains one of either a int64_list,
                        # float_list, or bytes_list
                        'label':       tf.train.Feature(int64_list=tf.train.Int64List(value=[int(label)])),
                        'R':           tf.train.Feature(int64_list=tf.train.Int64List(value=R.astype("int64"))),
                        'G':           tf.train.Feature(int64_list=tf.train.Int64List(value=G.astype("int64"))),
                        'B':           tf.train.Feature(int64_list=tf.train.Int64List(value=B.astype("int64"))),
                        'elevation':   tf.train.Feature(int64_list=tf.train.Int64List(value=elevation.astype("int64"))),
                        'nightlights': tf.train.Feature(int64_list=tf.train.Int64List(value=nightlights.astype("int64"))),
                    }
                )
            )

            # use the proto object to serialize the example to a string
            serialized = proto_example.SerializeToString()
            # write the serialized object to disk
            proto_writer.write(serialized)
            
    except Queue.Empty:
        print "Queue empty. Protobuf worker stopped."

In [4]:
#
# Download TRAINING data
#

# When spawning threads, the main program will continue to run while the threads run in the background.
# If we want to ever wait for all threads to complete before continuing with the main program
# we have to keep track of all the threads we spawn, and call .join() on each of them. This
# list is where we will keep track of them.
threads = []

#
# Start writer worker
#
t = Thread(target=proto_writer_worker, args=[TRAINING_PROTO_FILENAME])
t.start()
threads.append(t)


#
# Start downloader workers
#

# Farms
label = 0
coord_list = training_farms

for i in trange(TRAINING_EXAMPLES_PER_CLASS):
    t = Thread(target=imagery_getter,
               args=(label, METERS, PIXELS, random.choice(coord_list)))
    t.start()
    threads.append(t)
    time.sleep(4)
    
# Cities
label = 1
coord_list = training_cities

for i in trange(TRAINING_EXAMPLES_PER_CLASS):
    t = Thread(target=imagery_getter,
               args=(label, METERS, PIXELS, random.choice(coord_list)))
    t.start()
    threads.append(t)
    time.sleep(4)
               
# Mountains
label = 2
coord_list = training_mountains

for i in trange(TRAINING_EXAMPLES_PER_CLASS):
    t = Thread(target=imagery_getter,
               args=(label, METERS, PIXELS, random.choice(coord_list)))
    t.start()
    threads.append(t)
    time.sleep(4)

# wait for all threads to finish
for t in threads:
    t.join()

  0%|          | 0/1000 [00:00<?, ?it/s]

Started protobuf writer.


  1%|          | 10/1000 [00:40<1:06:09,  4.01s/it]

KeyboardInterrupt: 

Queue empty. Protobuf worker stopped.


In [None]:
#
# Download TEST data
#

threads = []

#
# Start writer worker
#
t = Thread(target=proto_writer_worker, args=[TEST_PROTO_FILENAME])
t.start()
threads.append(t)


#
# Start downloader workers
#

# Farms
label = 0
coord_list = test_farms

for i in trange(TEST_EXAMPLES_PER_CLASS):
    t = Thread(target=imagery_getter,
               args=(label, METERS, PIXELS, random.choice(coord_list)))
    t.start()
    threads.append(t)
    time.sleep(4)
    
# Cities
label = 1
coord_list = test_cities

for i in trange(TEST_EXAMPLES_PER_CLASS):
    t = Thread(target=imagery_getter,
               args=(label, METERS, PIXELS, random.choice(coord_list)))
    t.start()
    threads.append(t)
    time.sleep(4)
               
# Mountains
label = 2
coord_list = test_mountains

for i in trange(TEST_EXAMPLES_PER_CLASS):
    t = Thread(target=imagery_getter,
               args=(label, METERS, PIXELS, random.choice(coord_list)))
    t.start()
    threads.append(t)
    time.sleep(4)

# wait for all threads to finish
for t in threads:
    t.join()

In [None]:
#
# Just to get a warm fuzzy that the data was recorded correctly, let's look inside
# the protobuff and plot some images.
#

i = 0
for serialized_example in tf.python_io.tf_record_iterator(PROTO_FILENAME):
    i = i + 1
    if i > 1:
        break
    example = tf.train.Example()
    example.ParseFromString(serialized_example)

    # traverse the Example format to get data
    R = example.features.feature['R'].int64_list.value
    G = example.features.feature['G'].int64_list.value
    B = example.features.feature['B'].int64_list.value
    elevation = example.features.feature['elevation'].int64_list.value
    nightlights = example.features.feature['nightlights'].int64_list.value
    label = example.features.feature['label'].int64_list.value[0]
    print label

    plt.imshow(np.array(R).astype("float32").reshape((50,50))); plt.show()
    plt.imshow(np.array(G).astype("float32").reshape((50,50))); plt.show()    
    plt.imshow(np.array(B).astype("float32").reshape((50,50))); plt.show()    
    plt.imshow(np.array(elevation).astype("float32").reshape((50,50))); plt.show()    
    plt.imshow(np.array(nightlights).astype("float32").reshape((50,50))); plt.show()        