# Seafloor Bacterial Floc Analysis

This notebook shows an example of doing an analysis of water column "floc" using Pangeo. The goal of this work is to understand changes in the concentration of floc, which is bacterial material that has been flushed from the hydrothermal system into the ocean. Changes in floc are a potential indicator of changes in the hydrothermal system, possibly resulting from a magmatic event or seismic swarm.

In this notebook we analyze a large number of OOI HD video camera frames to establish a proxy for the floc concentration, and then display the results using a two-dimensional multivariate histogram.

#### Open CamHD Database

In [None]:
import pandas as pd

In [None]:
dbcamhd = pd.read_json('dbcamhd.json', orient="records", lines=True)
dbcamhd.tail()

In [None]:
print("Total files: %i" % len(dbcamhd))
print("Total frames: %i" % dbcamhd.frame_count.sum())

In [None]:
blob_urls = list(dbcamhd.blob_url[(dbcamhd.deployment == 2) & (dbcamhd.frame_count > 5000) & (dbcamhd.frame_count < 30000)])
#blob_urls = list(dbcamhd.blob_url[(dbcamhd.frame_count > 5000) & (dbcamhd.frame_count < 30000)])
blob_urls.sort()
blob_urls[0]

In [None]:
len(blob_urls)

#### Define the frame numbers from each file to process

In [None]:
frame_numbers = [3841, 3933, 4052, 4171, 4263, 4382]

These frame numbers correspond to times when the camera system is looking over the "shoulder" of Mushroom vent.

#### Set up a delayed Dask array of images

In [None]:
import pycamhd as camhd
import numpy as np
from dask import delayed
import dask.array as dsa

In [None]:
delayed_frame_list = []
for blob_url in blob_urls:
    delayed_moov_atom = delayed(camhd.get_moov_atom)(blob_url)         
    for frame_number in frame_numbers:
        delayed_frame = delayed(camhd.get_frame)(blob_url, frame_number, 'gray16le', delayed_moov_atom)
        delayed_frame_list.append(dsa.from_delayed(delayed_frame, (1080, 1920), np.uint16))
delayed_frame_array = dsa.stack(delayed_frame_list)
delayed_frame_array

A dask array is in many ways like a numpy array, except in this case it holds a set of instructions for how to acquire each chunk of the array, which makes it easy to farm this array out to workers in the cloud using the [distributed](http://distributed.readthedocs.io/en/latest/#) scheduler.

#### Show one of the images

In [None]:
frame = delayed_frame_array[1700*6].compute()
frame.shape

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
import matplotlib.pyplot as plt
import matplotlib.patches as patches
plt.rc('figure', figsize=(7, 3.3))
fig, ax = plt.subplots()
im1 = ax.imshow(frame)
im1.set_cmap('gray')
plt.yticks(np.arange(0,1081,270))
plt.xticks(np.arange(0,1921,480));

#### Show the filter that will be used to filter images in the frequency domain
To deal with variations in lighting and high-frequency noise, we filter each subimage using a Butterworth bandpass filter.

In [None]:
def butterworth(d1, d2, n):
    x = np.arange(-1024/2+0.5,1024/2+1-0.5)
    xx, yy = np.meshgrid(x, x)
    d = np.sqrt(xx**2+yy**2)
    bff = (1 - (1./(1 + (d/d1)**(2*n))))*(1/(1 + (d/d2)**(2*n)))
    return bff

In [None]:
d1 = 20 # low cut wavenumber
d2 = 400 # high cut wavenumber
n = 4
bff = butterworth(d1, d2, n)
plt.rc('figure', figsize=(6, 6))
imgplot = plt.imshow(bff, cmap='gray')

#### Define the floc proxy function
The floc proxy is simply the number of pixels in each filtered subimage that have a value greater than 4000.

In [None]:
def frame_filter(frame, d1, d2, n):
    if frame.ndim == 3 and frame.shape[0] == 1:
        I = np.squeeze(frame[0, 0:1024, 0:1024])
    else:
        I = frame[0:1024, 0:1024]
    bff = butterworth(d1, d2, n)
    I_fft = np.fft.fft2(I)
    I_fft_shift = np.fft.fftshift(I_fft)
    I_fft_shift_filt = I_fft_shift*bff # filter with the Butterworth filter
    I_fft_filt = np.fft.ifftshift(I_fft_shift_filt)
    I_filt = np.fft.ifft2(I_fft_filt)
    return I_filt

In [None]:
def calc_floc_proxy(frame, d1, d2, n):
    I_filt = frame_filter(frame, d1, d2, n)
    return np.array([(np.absolute(I_filt)>4000).sum()])

#### Show example for one frame

In [None]:
I_filt = frame_filter(frame, d1, d2, n)

In [None]:
plt.rc('figure', figsize=(6, 6))
imgplot = plt.imshow(np.absolute(I_filt)>4000, cmap='gray')
plt.title('floc_proxy value = %i' % (np.absolute(I_filt)>4000).sum());

#### Assemble a new Dask array including our computation using map_blocks

In [None]:
floc_proxy = dsa.map_blocks(calc_floc_proxy, delayed_frame_array, d1, d2, n, dtype='i8', drop_axis=[1,2])
floc_proxy

#### Compute the floc_proxy (subset)

In [None]:
%%time
results = floc_proxy[0:10].compute()

#### Start a Dask cluster
Use the new Dask extension!

In [None]:
client

In [None]:
%%time
results = floc_proxy[0::10].compute()

#### Calculate all the results

In [None]:
print('Number of images: %i' % len(floc_proxy))
print('Size of dataset (GB): %i' % round(len(floc_proxy)*1080*1920*2/1024/1024/1024))

In [None]:
%%time
results = floc_proxy.compute()

#### Get a timestamp for each frame

In [None]:
import datetime, math
import matplotlib.dates as dates
frame_timestamp = []
for blob_url in blob_urls:
    timestamp = dbcamhd['timestamp'][dbcamhd.blob_url == blob_url].iloc[0]
    for frame_number in frame_numbers:
        timestamp = timestamp + frame_number/29.97
        dt = datetime.datetime.fromtimestamp(timestamp)
        frame_timestamp.append(dates.date2num(dt))

#### Plot a two-dimensional multivariate histogram of the results

In [None]:
plt.rc('font', size=11)
fig, ax = plt.subplots()
fig.set_size_inches(14, 6)
fig.frameon = False
hb1 = ax.hexbin(frame_timestamp[0::10], results, vmin=0.1, vmax=10, bins='log', linewidths=0.25,
  gridsize=(200, 4000), mincnt=1, cmap=plt.cm.BuPu)
fig.colorbar(hb1)
ax.set_ylim([0, 8000])
ax.set_xlim([frame_timestamp[0],frame_timestamp[-1]])
ax.yaxis.grid(True)
ax.xaxis.grid(True)
months = dates.MonthLocator(interval=6)  # every month
monthsFmt = dates.DateFormatter('%b %Y')
ax.xaxis.set_major_locator(months)
ax.xaxis.set_major_formatter(monthsFmt)
plt.ylabel('Floc Proxy Value');

Starting in mid-June a large "floc event" occurs where the floc proxy values increase on average by about a factor of ten. The cause of this floc event is being investigated.

### References

 - [Pangeo](http://pangeo-data.org/)
 - [PyCamHD](https://github.com/tjcrone/pycamhd)
 - [CamHD Raw Data Archive](https://rawdata.oceanobservatories.org/files/RS03ASHS/PN03B/06-CAMHDA301)
 - [AGU Abstract](https://agu.confex.com/agu/fm16/meetingapp.cgi/Paper/192670)
 - [AGU Poster](https://drive.google.com/open?id=0B-dWW4GM434obGpTM0FZME10Nkk)
 - [Dask](http://dask.pydata.org/en/latest/)