# Floc Analysis

This page shows an example of doing an analysis of the watercolumn floc using the CamHDHub and Dask. The goal of this work is to understand changes in the concentration of "floc", or bacterial material that has been flushed from the hydrothermal system into the ocean. Changes in floc are an indicator of changes in the hydrothermal system, often as a result of a magmatic event or seismic swarm.

This notebook uses [Dask](http://dask.pydata.org/en/latest/) to analyze a large number of frames to establish a proxy for the floc concentration, then plots this value using a two-dimensional multivariate histogram.

This version of the floc analysis uses the new regions [metadata](https://github.com/CamHD-Analysis/CamHD_motion_metadata) in CSV format generated by Aaron Marburg.

#### Setup environment

In [None]:
%matplotlib inline
import pycamhd.pycamhd as camhd
import numpy as np
import matplotlib.pyplot as plt

In [None]:
from dask.distributed import Client, progress
from daskernetes import KubeCluster
cluster = KubeCluster(n_workers=20)
cluster

In [None]:
client = Client(cluster)
client

#### Get a list of CamHD files to process and metadata

In [None]:
import csv
import requests
csv_url = 'https://raw.githubusercontent.com/CamHD-Analysis/CamHD_motion_metadata/master/datapackage/regions.csv'

with requests.Session() as s:
    download = s.get(csv_url)
    decoded_content = download.content.decode('utf-8')
    reader = csv.DictReader(decoded_content.splitlines())
    d2_p2_z0 = [r for r in reader if r['scene_tag'] == 'd2_p2_z0']

filenames = list(set([d['mov_basename'] for d in d2_p2_z0]))
filenames.sort()

#### Set up a function to return a frame number based on region metadata and a "relative" frame number
Here we build a list of every frame from a particular scene in a movie using the regions metadata, and assuming that n_frames will be requested, we break this list into groupings of n_frames, and return the first frame from one of these groups based on relative_frame_number. This method quasi-randomly samples the scene, allowing for multiple instances of a scene per movie.

In [None]:
def real_frame_number(filename, relative_frame_number, n_frames):
    scenes = [r for r in d2_p2_z0 if r['mov_basename'] == filename]
    frames = []
    for i in scenes:
        #range(int(i['start_frame']), int(i['end_frame']))
        frames = frames + list(range(int(i['start_frame']), int(i['end_frame'])+1))
    return frames[int(round(float(relative_frame_number)/(n_frames-1)*len(frames)))-1]

#### Get a list of frames to process

In [None]:
n_frames = 2
proc_list = []
for filename in filenames:
    tmplist = []
    tmplist.append('https://rawdata.oceanobservatories.org/files/RS03ASHS/PN03B/06-CAMHDA301/%s/%s/%s/%s.mov'
       % (filename[10:14], filename[14:16], filename[16:18], filename))    
    for i in range(n_frames):
        tmplist.append(real_frame_number(filename, i, n_frames))
    proc_list.append(tmplist)
proc_list[0]

In [None]:
len(proc_list)

#### Create the filter for filtering images in the frequency domain
To deal with variations in lighting and high-frequency noise, we filter each subimage using a Butterworth bandpass filter.

In [None]:
plt.rc('figure', figsize=(6, 6))
d1 = 20; # low cut wavenumber
d2 = 400; # high cut wavenumber
n = 4;
x = np.arange(-1024/2+0.5,1024/2+1-0.5)
xx, yy = np.meshgrid(x, x)
d = np.sqrt(xx**2+yy**2);
bff = (1 - (1./(1 + (d/d1)**(2*n))))*(1/(1 + (d/d2)**(2*n))); # Butterworth bandpass filter
imgplot = plt.imshow(bff, cmap='gray')

In [None]:
for i in proc_list:
    for j in range(n_frames):
        frame = camhd.get_frame(i[0], i[j+1], 'gray16le')
        print(frame.shape)
    break

#### Setup the Dask delayed functions
The floc proxy is simply the number of pixels in each filtered subimage that have a value greater than 4000.

In [None]:
# from dask.multiprocessing import get
from dask import delayed # , compute

#@delayed
#def delayed_real_frame_number(filename, relative_frame_number, n_frames):
#    return real_frame_number(filename, relative_frame_number, n_frames)

@delayed
def delayed_get_frame(filename, frame_number, pix_fmt):
    return camhd.get_frame(filename, frame_number, pix_fmt)

@delayed
def delayed_get_floc_proxy(frame):
    I = frame[0:1024, 0:1024]
    I_fft = np.fft.fft2(I);
    I_fft_shift = np.fft.fftshift(I_fft);
    I_fft_shift_filt = I_fft_shift*bff; # filter with the Butterworth filter
    I_fft_filt = np.fft.ifftshift(I_fft_shift_filt);
    I_filt = np.fft.ifft2(I_fft_filt);
    fp = (np.absolute(I_filt)>4000).sum()
    #print(fp)
    return fp

#### Calculate the floc_proxy using Dask parallelization
We use Dask to handle load balancing among processors on the system.

In [None]:
%%time
delayed_floc_proxy = []
#n_frames = 2 # number of frames to use in for each video
#relative_frame_numbers = range(0,n_frames)
keys = []

results = []
for element in proc_list:
    filename = element[0]
    for i in range(n_frames):
        frame_number = element[i+1]
        frame = delayed_get_frame(filename, frame_number, 'gray16le')
        dfp = delayed_get_floc_proxy(frame)
        results.append(dfp)
        keys.append((filename, frame))
    
    #for relative_frame_number in relative_frame_numbers:
    #    frame_number = delayed_real_frame_number(filename, relative_frame_number, n_frames)
    #    frame = delayed_get_frame('/data/' + filename + '.mov', frame_number, 'gray16le')
    #    delayed_floc_proxy.append(delayed_get_floc_proxy(frame))
# floc_proxy = compute(*delayed_floc_proxy)

In [None]:
from dask.dot import dot_graph
dot_graph(results)

In [None]:
results = dask.compute(results[:200])
results_all = dict(zip(keys[:200], results))

In [None]:
import dask
import dask.multiprocessing

#     dask.compute(results[:300])
    
    
with dask.set_options(get=dask.get):
    for key, res in zip(keys[200:300], results[200:300]):
        print(key)
        res.compute()

In [None]:
%time results_persisted = dask.compute(results[:200])

In [None]:
results_persisted

In [None]:
results_computed

#### Get a timestamp for each frame

In [None]:
import datetime, math
import matplotlib.dates as dates
frame_timestamp = []
for filename in filenames:
    for relative_frame_number in relative_frame_numbers:
        frame_number = real_frame_number(filename, relative_frame_number, n_frames)
        year = int(filename[10:14])
        month = int(filename[14:16])
        day = int(filename[16:18])
        hour = int(filename[19:21])
        minute = int(math.floor(frame_number/29.95/60))
        second = int(math.floor(frame_number/29.95-minute*60))
        microsecond = int(round((frame_number/29.95-second-minute*60)*1000000))
        dt = datetime.datetime(year, month, day, hour, minute, second, microsecond)           
        frame_timestamp.append(dates.date2num(dt))

#### Plot a two-dimensional multivariate histogram of the results

In [None]:
plt.rc('font', size=11)
fig, ax = plt.subplots();
fig.set_size_inches(14, 6);
fig.frameon = False
hb1 = ax.hexbin(frame_timestamp, floc_proxy, vmin=0, vmax=2, bins='log', linewidths=0.25,
  gridsize=(240,80), mincnt=1, cmap=plt.cm.BuPu)
fig.colorbar(hb1)
ax.set_ylim([0, 8000])
ax.set_xlim([frame_timestamp[0],frame_timestamp[-1]])
ax.yaxis.grid(True)
ax.xaxis.grid(True)
months = dates.MonthLocator()  # every month
monthsFmt = dates.DateFormatter('%b %Y')
ax.xaxis.set_major_locator(months)
ax.xaxis.set_major_formatter(monthsFmt)
plt.ylabel('Floc Proxy Value');

# print
#fig_dpi = 300
#fig.savefig('floc_hexbin.png', bbox_inches='tight', transparent=True,
#  pad_inches=0, orientation='portrait', format='png', dpi=fig_dpi);

Starting in late-June a large "floc event" occurs where the floc proxy values increase on average by nearly a factor of ten. The cause of this floc event is being investigated.

### References

PyCamHD: https://github.com/tjcrone/pycamhd<br>
CamHDHub: https://github.com/tjcrone/camhdhub<br>
Raw Data Archive: https://rawdata.oceanobservatories.org/files/RS03ASHS/PN03B/06-CAMHDA301/<br>
AGU Abstract: https://agu.confex.com/agu/fm16/meetingapp.cgi/Paper/192670<br>
AGU Poster: https://drive.google.com/open?id=0B-dWW4GM434obGpTM0FZME10Nkk<br>
Dask: http://dask.pydata.org/en/latest/