# read_raw

This notebook reads the raw FlowMow2 Sentry data into Pandas dataframes and saves them to HDF5 files. This notebook does not do any processing on the data aside from assigning timestamps.

#### Setup

In [None]:
import flowmow
import pandas as pd
from dask import delayed
from dask import compute

#### Start a Dask cluster

In [None]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)
cluster

In [None]:
from dask.distributed import Client
client = Client(cluster)
client

#### Import nav

In [None]:
files = pd.read_csv('../data/info/ashes_nav.csv', header=None, names=['filename', 'blob_id', 'dive_number'])
files.head()

In [None]:
nav_delayed = []
for index, row in files.iterrows():
    nav_delayed.append(delayed(flowmow.read_nav)(row.blob_id, row.dive_number))

In [None]:
%%time
nav = pd.concat(compute(*nav_delayed)).sort_values('timestamp')

In [None]:
nav.head()

In [None]:
# save to hdf5
nav.to_hdf('../data/interim/nav.h5', 'table', append=False, data_columns=True)

#### Import Paros pressure sensor data

In [None]:
files = pd.read_csv('../data/info/ashes_paros.csv', header=None, names=['filename', 'blob_id', 'dive_number'])
files.head()

In [None]:
paros_delayed = []
for index, row in files.iterrows():
    paros_delayed.append(delayed(flowmow.read_paros)(row.blob_id, row.dive_number))

In [None]:
%%time
paros = pd.concat(compute(*paros_delayed)).sort_values('timestamp')

In [None]:
paros.head()

In [None]:
# save to hdf5
paros.to_hdf('../data/interim/paros.h5', 'table', append=False, data_columns=True)

#### Import stinger GX3-25 microstrain IMU data

In [None]:
files = pd.read_csv('../data/info/ashes_ustrain_adv.csv', header=None, names=['filename', 'blob_id', 'dive_number'])
files.head()

In [None]:
ustrain_adv_delayed = []
for index, row in files.iterrows():
    ustrain_adv_delayed.append(delayed(flowmow.read_ustrain)(row.blob_id, row.dive_number))

In [None]:
%%time
ustrain_adv = pd.concat(compute(*ustrain_adv_delayed)).sort_values('timestamp')

In [None]:
ustrain_adv.head()

In [None]:
# save to hdf5
ustrain_adv.to_hdf('../data/interim/ustrain_adv.h5', 'table', append=False, data_columns=True)

#### Import chassis GX3-25 microstrain IMU data

In [None]:
files = pd.read_csv('../data/info/ashes_ustrain_chassis.csv', header=None, names=['filename', 'blob_id', 'dive_number'])
files.head()

In [None]:
ustrain_chassis_delayed = []
for index, row in files.iterrows():
    ustrain_chassis_delayed.append(delayed(flowmow.read_ustrain)(row.blob_id, row.dive_number))

In [None]:
%%time
ustrain_chassis = pd.concat(compute(*ustrain_chassis_delayed)).sort_values('timestamp')

In [None]:
ustrain_chassis.head()

In [None]:
# save to hdf5
ustrain_chassis.to_hdf('../data/interim/ustrain_chassis.h5', 'table', append=False, data_columns=True)

#### Import SBE3 data

In [None]:
files = pd.read_csv('../data/info/ashes_sbe3.csv', header=None, names=['filename', 'blob_id', 'dive_number'])
files.head()

In [None]:
sbe3_delayed = []
for index, row in files.iterrows():
    sbe3_delayed.append(delayed(flowmow.read_sbe3)(row.blob_id, row.dive_number))

In [None]:
%%time
sbe3 = pd.concat(compute(*sbe3_delayed)).sort_values('timestamp')

In [None]:
sbe3.head()

In [None]:
# save to hdf5
sbe3.to_hdf('../data/interim/sbe3.h5', 'table', append=False, data_columns=True)