# Ocean Observatories Initiative (OOI) Data

<img src="CEV-OOI-Cabled-Array.jpg" alt="Drawing" style="width: 600px;"/>

## Obtaining OOI Data

  - Data Portal
  - M2M
  - [Raw Data Server](https://rawdata.oceanobservatories.org/files/RS03ASHS/PN03B/06-CAMHDA301/) (CamHD Video Data)
  - IRIS (seismic data)
  - ERDDAP
  - [Xarray and Zarr on AWS S3](https://io2data.s3.us-west-2.amazonaws.com/index.html#data/) (unofficial profiler data demo)
  - Others coming soon!

## Xarray and Zarr on AWS S3
  - Demonstration subset of OOI profiler data  
  - Stored as Zarr in a cloud-performant object store  
  - Hopefully most or all of OOI data will be stored like this in the near future  
  - [OOI Data Team Inspector](https://ooi-visualocean.whoi.edu/regions/view/RS)  
  - [AWS Store Browser](https://io2data.s3.us-west-2.amazonaws.com/index.html#data/)
  
### Learning Objectives

  - Open zarr groups from Amazon S3
  - Use Dask delayed functions to speed up data access
  - Resample data using Xarray
  - Plot using hvplot

### Loading Data

#### Start Dask Cluster
  - Settings > Auto-Start Dask
  - Use the new button!

#### Build function for Dask delayed

In [12]:
from dask.distributed import Client

client = Client("tcp://10.32.26.18:44371")
client


0,1
Client  Scheduler: tcp://10.32.26.18:44371  Dashboard: /user/0000-0002-1086-2793/proxy/8787/status,Cluster  Workers: 1  Cores: 2  Memory: 11.50 GB


In [11]:
#c = Client(cluster)
client.scale(8)

AttributeError: 'Client' object has no attribute 'scale'

In [None]:
import s3fs
import xarray as xr

In [None]:
# function to open a zarr group on S3
def open_zarr_group(zarr_group):
    s3 = s3fs.S3FileSystem(anon=True)
    try:
        return xr.open_zarr(store=s3fs.S3Map(zarr_group, s3=s3), consolidated=True)
    except:
        return None

#### Create list of Delayed objects

In [None]:
import datetime
from dask import delayed

In [None]:
# zarr group base and start date
zarr_group_base = 'io2data/data/RS01SBPS-SF01A-2A-CTDPFA102-streamed-ctdpf_sbe43_sample/'
dt_start = datetime.date(year = 2019, month = 1, day = 1)

# loop and create list of Dask delayed objects
dsets_raw = []
num_days = 180
for i in range(num_days):
    dt = dt_start + datetime.timedelta(days=i)
    dt_str = dt.strftime("%Y%m%d")
    zarr_group = zarr_group_base + dt_str
    dsets_raw.append(delayed(open_zarr_group)(zarr_group))

#### Run delayed functions on the Dask cluster

In [None]:
from dask import compute

In [None]:
%%time
dsets_raw = compute(*dsets_raw)

#### Remove "None" Objects, Concatenate, and Clean Up 

In [None]:
dsets = []
for dset_raw in dsets_raw:
    if dset_raw != None:
        dsets.append(dset_raw)
len(dsets)

In [None]:
ds = xr.concat(dsets, dim='time')
ds

In [None]:
ds = ds.reset_coords('seawater_pressure')
ds

In [None]:
print('Dataset size: ~' + str(round(ds.nbytes/1024/1024)) + ' MB')

### Resample Data

In [None]:
resampling_period = '4min'
variables = ['time', 'seawater_pressure', 'corrected_dissolved_oxygen']
ds_resampled = ds[variables].resample(time=resampling_period).mean().load()

### Plot Using Hvplot

In [None]:
import hvplot.pandas

In [None]:
df = ds_resampled.to_dataframe()
#df = ds[variables].to_dataframe()
df.reset_index(inplace=True)
df.head()

In [None]:
df.hvplot.scatter('time', 'seawater_pressure', c='corrected_dissolved_oxygen',
                  cmap='viridis', size=60, colorbar=True, flip_yaxis=True,
                 ylim=(0, 200))

## Additional Resources

*From: @friedrichknuth*  
[Real-time plotting with M2M](https://github.com/friedrichknuth/ooi_axial_realtime_profiler) (click binder link)  
[Scatter plotting DO using M2M](https://github.com/ooi-data-review/ooi_datateam_notebooks/blob/master/notebooks/data_access_and_plotting/basic/netcdf_data/depth_scatter.ipynb)  
https://github.com/ooi-data-review/2018-data-workshops  
https://github.com/friedrichknuth/ooi_data_analysis