## Preprocess widefield calcium imaging data using Spark
This notebook demonstrates how to read binary raw data files stored on UZH Swift object storage into a Spark RDD, convert it into a Numpy array and perform preprocessing to generate a DFF array. Both the raw data and DFF arrays are stored as output HDF5 files on the Swift object storage.

### Imports

In [None]:
# Import Python modules
from __future__ import print_function
import os, sys
import numpy as np
from matplotlib import pylab as plt
from scipy.io import savemat
import getpass
import h5py
import tempfile
import shutil
import time

%matplotlib inline

# the notebook backend: 'local' or 'openstack'
nbBackend = 'openstack'

# add folder 'utils' to the Python path
# this folder contains custom written code that is required for data import and analysis
utils_dir = os.path.join(os.getcwd(), 'utils')
sys.path.append(utils_dir)

In [None]:
# Import custom-written modules
import SwiftStorageUtils
import WidefieldDataUtils as wf
import BehaviourAnalysisUtils
import CalciumAnalysisUtils as calciumTools
import parseDCIMGheader as parseDCIMGheader
from SwiftStorageUtils import uploadItems

### File paths and directories

In [None]:
# start of name for matching files
filename_start = '20170214_' # all files with names starting like this will be processed

# behaviour log file
behaviour_log = '2869L1_gordito01_b20170214.txt'

# swift file system
swift_container = 'dayra' # specify name of container in Swift (do not use _ etc. in container names!)
swift_provider = 'SparkTest' # in general, this should not change

# derive the Swift base URI
swift_basename = "swift://" + swift_container + "." + swift_provider + "/"

In [None]:
# OpenStack credentials for accessing Swift storage
os_username = 'hluetc'
os_tenant_name = 'helmchen.hifo.uzh'
os_auth_url = 'https://cloud.s3it.uzh.ch:5000/v2.0'
# provide OS password
os_password = getpass.getpass()

In [None]:
# put all these parameters in a dictionary, so that we can pass them conveniently to functions
file_params = dict()
file_params['filename_start'] = filename_start
file_params['swift_container'] = swift_container
file_params['swift_provider'] = swift_provider
file_params['swift_basename'] = swift_basename
file_params['os_username'] = os_username
file_params['os_tenant_name'] = os_tenant_name
file_params['os_auth_url'] = os_auth_url
file_params['os_password'] = os_password

### Experiment parameters

In [None]:
# list of stimuli and appropriate decisions
stim_decision = [
    ['Texture 1 P100', 'Go'],
    ['Texture 7 P1200', 'No Go']
    ]

# image dimensions for analysis (aspect ratio MUST be preserved)
dims_analysis = (256,256) # use None to skip resizing

# sampling rate and trial times
sample_rate = 20.0 # Hz
t_stim = -1.9 # stimulus cue (auditory)
t_textIn = 0 # texture in (i.e. stimulus onset)
t_textOut = 2 # texture starting to move out (stimulus offset)
t_response = 4.9 # response cue for licking (auditory)
t_base = -2 # baseline end (for F0 calculation)

### Analysis parameters

In [None]:
bg_smooth = 30 # SD of Gaussian smoothing kernel for background estimation (in pixel) 

seg_cutoff = 0.0002 # Segmentation threshold; larger value = bigger mask; 
# smaller value = smaller mask (i.e. more pixels ignored); suggested = 0.0002

### Import behaviour log and analyse performance

In [None]:
# Download and import behaviour log file
# local storage directory --> remember to delete afterwards
temp_dir = tempfile.mkdtemp()

# download options
down_opts = {
    'skip_identical': True,
    'out_directory': temp_dir,
}

from SwiftStorageUtils import downloadItems
downloadItems(swift_container, [behaviour_log], file_params, down_opts)

from BehaviourAnalysisUtils import parseBehaviourLog
trial_list = parseBehaviourLog('%s%s%s' % (temp_dir, os.path.sep, behaviour_log), print_table=False)

# delete temp dir
shutil.rmtree(temp_dir)

In [None]:
# Analyse performance
from BehaviourAnalysisUtils import analyzeBehaviourPerformance
go_trials, nogo_trials, corr_response, corr_reject, miss_response, false_alarm = \
analyzeBehaviourPerformance(trial_list, stim_decision, print_summary=True)

### Save behaviour performance on Swift

In [None]:
# Write performance to temp. file and push to Swift
temp_dir = tempfile.mkdtemp()
perf_file = temp_dir + os.path.sep + 'BehaviourPerformance.txt'
with open(perf_file, 'w') as fid:
    fid.write('Go trials (%s): %1.0f\n' % ([a[0] for a in stim_decision if a[1] == 'Go'][0], go_trials))
    fid.write('No Go trials (%s): %1.0f\n' % ([a[0] for a in stim_decision if a[1] == 'No Go'][0], nogo_trials))
    fid.write('Correct responses: %1.0f\n' % (corr_response))
    fid.write('Correct rejects: %1.0f\n' % (corr_reject))
    fid.write('Missed responses: %1.0f\n' % (miss_response))
    fid.write('False alarms: %1.0f\n' % (false_alarm))
from SwiftStorageUtils import uploadItems
uploadItems(swift_container, '', temp_dir, [perf_file], file_params)
# delete temp dir
shutil.rmtree(temp_dir)

### Save trial list as text file

In [None]:
# Write performance to temp. file and push to Swift
temp_dir = tempfile.mkdtemp()
trial_list_file = temp_dir + os.path.sep + 'TrialList.txt'
with open(trial_list_file, 'w') as fid:
        for i_trial in trial_list:
            fid.write('%1.0f\t%1.0f\t%s\t%s\t%s\n' % 
                      (i_trial[0], i_trial[1], str(i_trial[2]), i_trial[3], i_trial[4]))
from SwiftStorageUtils import uploadItems
uploadItems(swift_container, '', temp_dir, [trial_list_file], file_params)
# delete temp dir
shutil.rmtree(temp_dir)

### Start SparkContext

In [None]:
def startSparkContext(max_cores=16):
    from setupSpark import initSpark
    
    executor_cores = 8 # the number of cores to be used on each worker
    executor_memory = '25G' # the amount of memory to be used on each worker
    max_cores = max_cores # the max. number of cores Spark is allowed to use overall

    # returns the SparkContext object 'sc' which tells Spark how to access the cluster
    sc = initSpark(nbBackend, executor_cores=executor_cores, \
                   max_cores=max_cores, executor_memory=executor_memory)
    
    # provide OpenStack credentials to the Spark Hadoop configuration
    sc._jsc.hadoopConfiguration().set('fs.swift.service.SparkTest.username', os_username)
    sc._jsc.hadoopConfiguration().set('fs.swift.service.SparkTest.tenant', os_tenant_name)
    sc._jsc.hadoopConfiguration().set('fs.swift.service.SparkTest.password', os_password)
    
    # add Python files in 'utils' folder to the SparkContext 
    # this is required so that all files are available on all the cluster workers
    for filename in os.listdir(utils_dir):
        if filename.endswith('.py'):
            sc.addPyFile(os.path.join(utils_dir, filename))
            
    return sc

In [None]:
sc = startSparkContext(max_cores=8)
time.sleep(10) # wait till setup completes
print("Parallelism: %1.0f" % (sc.defaultParallelism))

### Load files into RDD

In [None]:
# list of relevant binary files
from SwiftStorageUtils import listItems
container_items = listItems(file_params['swift_container'], file_params)
binary_files = [a for a in container_items if a.startswith(file_params['filename_start'])]

In [None]:
binary_file_rdd = sc.parallelize(binary_files)

In [None]:
def readDCAMfromSwift(file_name, swift_container, file_params):
    """
    Download binary file from Swift
    """

    # local storage directory --> remember to delete afterwards
    temp_dir = tempfile.mkdtemp()

    # download options
    down_opts = {
        'skip_identical': True,
        'out_directory': temp_dir,
    }

    from SwiftStorageUtils import downloadItems
    downloadItems(swift_container, [file_name], file_params, down_opts)
    
    path_to_local_file = os.path.join(temp_dir, file_name)
    
    with open(path_to_local_file, mode='rb') as fid:
        byte_stream = fid.read()

    # delete temp dir
    shutil.rmtree(temp_dir)
    
    return byte_stream

In [None]:
def convertDCAMtoMov(byte_stream):
    """
    Convert raw DCAM byte-stream to movie. 
    
    Note that parameters (e.g. dims_analysis) are provided as global variables in the notebook.
    Image dimensions are obtained by parsing the file header.
    """
    # parse the header and get image dimensions
    hdr = parseDCIMGheader.main(byte_stream)
    dims = [hdr['xsize'], hdr['ysize'], hdr['nframes']]
    
    byte_stream = byte_stream[232:] # 232 bytes is the file header
    A = np.fromstring(byte_stream, dtype=np.uint16)
    A = A[:dims[0]*dims[1]*dims[2]] # remove data points at the end
    
    # re-arrange data into the correct shape
    mov = np.fliplr(A.reshape([dims[0], dims[1], dims[2]], order='F'))
    # hack to remove strange pixels with very high intensity
    mov[np.where(mov > 60000)] = 0
    
    # resize to analysis dimensions
    mov = wf.resizeMovie(mov, resolution=dims_analysis, interp='bilinear')
    
    return mov

In [None]:
# test with without Spark
first_file = binary_file_rdd.first()
byte_stream = readDCAMfromSwift(first_file, file_params['swift_container'], file_params)
mov = convertDCAMtoMov(byte_stream)
plt.imshow(mov[:,:,0], cmap='gray', interpolation='none')

In [None]:
# Create byte-stream RDD
# This will download the binary file and read it from local disk
byte_stream_rdd = binary_file_rdd.map(lambda v: (v, readDCAMfromSwift(v, file_params['swift_container'], 
                                                                          file_params)))

In [None]:
# Convert the byte-stream RDD to a numpy array
mov_rdd = byte_stream_rdd.map(lambda (k,v): (k, convertDCAMtoMov(v)))

In [None]:
# Read binary file content directly from Swift using Spark's binaryFiles reader
# This part turned out to be relatively error-prone
# file_rdd = sc.binaryFiles(file_params['swift_basename'], minPartitions=100)
# file_rdd = file_rdd.filter(lambda (k,v): file_params['filename_start'] in k)

In [None]:
# get first movie (return key-value tuple)
mov1 = mov_rdd.first()

To check if the data has been imported correctly, display some frames as images. This will also produce an average image of the first movie (avg). This will be used below to create the reference image as mat file (refImg.mat).

In [None]:
path, file_id = os.path.split(mov1[0])
print('File: %s' % (file_id))
dat = mov1[1]
avg = np.nanmean(dat, axis=2)
xy = (dat.shape[0]/1.05, dat.shape[1] - (dat.shape[1]/1.1))
f, axes = plt.subplots(1, 3, figsize=(15, 5))
axes[0].imshow(dat[:,:,0], cmap='gray', interpolation='none')
axes[0].annotate('Frame %1.0f' % 0, xy=xy, fontsize=14, color='yellow', horizontalalignment='right')
axes[1].imshow(avg, cmap='gray', interpolation='none')
axes[1].annotate('Mean', xy=xy, fontsize=14, color='yellow', horizontalalignment='right')
axes[2].imshow(np.nanmax(dat, axis=2), cmap='gray', interpolation='none')
axes[2].annotate('Max', xy=xy, fontsize=14, color='yellow', horizontalalignment='right')

Get number of frames from the first movie. Setup the time axis. Specify frames for F0 calculation.

In [None]:
timepoints = dat.shape[2]
t = (np.array(range(timepoints)) / sample_rate) - 3.0

# Frames for F0 calculation
f0_frames = t<t_base # F0 as time before baseline

f0_frames[:] = False
f0_frames[9:12] = True # F0 as certain specified frames

### Preprocess movie
The preprocessing pipeline currently consists of 3 steps: estimation and subtraction of background, segmentation of area of interest, normalization (dF/F calculation). As for conversion, we first define a function that is then applied to the Spark RDD. These transformations are only registered, not executed.

In [None]:
def preprocMovie(mov, bg_smooth=bg_smooth, seg_cutoff=seg_cutoff):
    """
    Perform preprocessing steps for a movie. 
    """
    
    # estimate background signal intensity
    print('Estimating background', end="")
    bg_estimate = wf.estimateBackground(mov[:,:,0], bg_smooth)
    print(' - Done (%1.2f)' % bg_estimate)
    
    # subtract the background (set negative to 0)
    mov = mov - bg_estimate
    mov[mov<0] = 0
    
    # segment out the background (set to np.nan)
    print('Segmenting background', end="")
    mov = wf.segmentBackground(mov, seg_cutoff, plot=False)
    print(' - Done')
    
    # baseline normalization (Dff)
    print('Calculating Dff', end="")
    dff = calciumTools.calculateDff(mov , f0_frames)
    print(' - Done')
    
    return dff

In [None]:
# apply transformation to the RDD
dff_rdd = mov_rdd.map(lambda (k,v): (k, preprocMovie(v)))

### Save data as HDF5 files
Now we can save the data back to the Swift storage. This will finally kick-off the whole processing pipeline that has been defined so far.

In [None]:
# Set the names for the output folders
output_folder_mov = 'mov_out'
output_folder_dff = 'dff_out'
output_folder_mat = 'mat_out'

Check if the folders exist already. If a folder exists, will display the contents and ask for confirmation to delete.

In [None]:
from SwiftStorageUtils import deleteExistingFolder
deleteExistingFolder(swift_container, output_folder_mov, file_params, confirm=False)
deleteExistingFolder(swift_container, output_folder_dff, file_params, confirm=False)
deleteExistingFolder(swift_container, output_folder_mat, file_params, confirm=False)

In [None]:
def getFileNameFromKey(key):
    """
    Return the file name from the RDD key (i.e. split of the swift URL)
    """
    path, name = os.path.split(key)
    return name

In [None]:
# Save the image data as HDF5 on Swift storage. 
# This will run all the transformations that have been registered for mov_rdd.
from SwiftStorageUtils import saveAsH5
mov_rdd.foreach(lambda (k,v): (k, saveAsH5(v, getFileNameFromKey(k), 'mov', output_folder_mov, file_params)))

In [None]:
# Save the dFF data as HDF5 on Swift storage. 
# This will run all the transformations that have been registered for dff_rdd.
dff_rdd.foreach(lambda (k,v): (k, saveAsH5(v, getFileNameFromKey(k), 'dff', output_folder_dff, file_params)))

### Save mat-files for OCIA

In [None]:
# Save the image data as Matlab mat-file on Swift storage.
from SwiftStorageUtils import saveAsMat
dff_rdd.foreach(lambda (k,v): (k, saveAsMat(v, getFileNameFromKey(k), 'tr', output_folder_mat, file_params, 
                                            trial_list=trial_list)))

In [None]:
# stop SparkContext to free cluster resources
sc.stop()

### Create trials_ind.mat and ref_img.mat for processing with OCIA

In [None]:
# Make a temporary directory
temp_dir = tempfile.mkdtemp()
if temp_dir.endswith(os.path.sep):
    pass
else:
    temp_dir = temp_dir + os.path.sep

# Create trials_ind.mat
# TODO: make it work with one or more than 2 stimulus types
stim_type_ids = [[],[]]
for i_trial in trial_list:
    if i_trial[3] == stim_decision[0][0]:
        stim_type_ids[0].append(i_trial[0])
    elif i_trial[3] == stim_decision[1][0]:
        stim_type_ids[1].append(i_trial[0])
    else:
        raise ValueError('Stim type %s unknown!' % (i_trial[3]))
var_dict = dict()
for ix, i_id in enumerate(stim_type_ids):
    trial_stim = stim_decision[ix][0]
    dataset_name = "tr_%s" % (trial_stim[trial_stim.rfind(' ')+2:])
    var_dict[dataset_name] = i_id
matfile = os.path.join(temp_dir, 'trials_ind.mat')

# save mat-file to temp dir
savemat(matfile, var_dict)
uploadItems(file_params['swift_container'], output_folder_mat, temp_dir, [matfile], file_params)

# Process reference image (average of first movie) and save as mat
matfile = os.path.join(temp_dir, 'refImg.mat')

# scale between 0 and 0.1 (roughly)
avg = (avg / np.nanmax(avg)) / 10
# put in dict and save
var_dict = dict()
var_dict['refImg'] = avg
savemat(matfile, var_dict, do_compression=True)
uploadItems(file_params['swift_container'], output_folder_mat, temp_dir, [matfile], file_params)

# delete temp dir
shutil.rmtree(temp_dir)