In [None]:
from pathlib import Path
import itertools as it
import datetime as dt
from dateutil import parser as dtparser
import pandas as pd
import fsspec
import xarray as xr
import matplotlib.pyplot as plt
import hvplot.xarray
import csv
import echopype as ep
import os
import warnings
from echopype import open_raw
import dask
from dask.distributed import Client
from tqdm import tqdm
import requests
from datetime import datetime
from dateutil import parser as dtparser
from importlib.metadata import version
from urllib.parse import urljoin
from datetime import timedelta


warnings.simplefilter("ignore", category=DeprecationWarning)
fs = fsspec.filesystem('https')

In [None]:
version('echopype')


Making Data folder in current working directory to store files. Please note this folder won't be pushed on github as it's included in .gitignore.
From now on the code will use 'current_directory' variable for accessing the files, and 'data_folder' for accessing data files.


In [None]:
# Get the current working directory
current_directory = os.getcwd()

# Define the relative path to the "data" folder
data_folder = os.path.join(current_directory, 'data')

# If the "data" folder doesn't exist, create it
if not os.path.exists(data_folder):
    os.makedirs(data_folder)

In [None]:
ooi_raw_url = "https://rawdata.oceanobservatories.org/files/CE04OSPS/PC01B/ZPLSCB102_10.33.10.143/"

In [None]:
def in_range(raw_file: str, start: datetime, end: datetime) -> bool:
    ## Check if file url is in datetime range
    file_name = Path(raw_file).name
    print('File Name:', file_name)
    
    if 'OOI-' in file_name:
        format_string = "OOI-D%Y%m%d-T%H%M%S.raw"
        file_datetime = datetime.strptime(file_name, format_string)
        print('Parsed Datetime:', file_datetime)
        return start <= file_datetime <= end

    return False

In [None]:
start_datetime = dt.datetime(2016, 8, 21, 0, 0)
end_datetime = dt.datetime(2016, 8, 22, 0, 0)

In [None]:
desired_day_urls = []
current_date = start_datetime

while current_date <= end_datetime:
    day_url = urljoin(
        ooi_raw_url,
        f"{current_date.year}/{current_date.month:02d}/{current_date.day:02d}"
    )
    desired_day_urls.append(day_url)
    current_date += timedelta(days=1)


In [None]:
#all_raw_file_urls = it.chain.from_iterable([fs.glob(f"{day_url}/*.raw") for day_url in desired_day_urls])
all_raw_file_urls = it.chain.from_iterable([fs.glob(f"{day_url}/*.raw") for day_url in desired_day_urls if  requests.get(day_url).status_code ==200 ])

In [None]:
desired_raw_file_urls = list(filter(
    lambda raw_file: in_range(
        raw_file,
        start_datetime-dt.timedelta(hours=0),  # 3 hour buffer to select files
        end_datetime+dt.timedelta(hours=0)
    ),
    all_raw_file_urls
))
print(f"There are {len(desired_raw_file_urls)} raw files within the specified datetime range.")

In [None]:
len(desired_raw_file_urls)

In [None]:

# Creating new CSV file for storing raw files urls

csv_file_path =  os.path.join(data_folder, 'raw_files.csv')

# Create an empty CSV file
with open(csv_file_path, 'w', newline='') as csvfile:
    # Create a CSV writer object
    csv_writer = csv.writer(csvfile)

    # Write an empty row to the CSV file (optional)
    csv_writer.writerow([])

print(f"Empty CSV file created at: {csv_file_path}")

In [None]:
pd.DataFrame(desired_raw_file_urls).to_csv(csv_file_path)

In [None]:
desired_raw_file_urls = pd.read_csv(csv_file_path)
desired_raw_file_urls.drop(['Unnamed: 0'], axis =1 , inplace=True)
desired_raw_file_urls = list(desired_raw_file_urls['0'])
desired_raw_file_urls

In [None]:
%%time

client = Client()

def process_raw_file(raw_file_url, output_dpath):
    try:
        print(f"Processing: {raw_file_url}")
        ed = ep.open_raw(raw_file=raw_file_url, sonar_model='ek60', use_swap=True)
        ed.to_zarr(save_path=data_folder, overwrite=True)
        print(f"Completed processing: {raw_file_url}")
    except Exception as e:
        print(f"Error processing {raw_file_url}: {e}")


# Create Dask delayed objects for processing each raw file
delayed_processing = [dask.delayed(process_raw_file)(raw_file_url, data_folder) for raw_file_url in tqdm(desired_raw_file_urls)]

# Trigger Dask computations
dask.compute(*delayed_processing)

In [None]:
client.close()


In [None]:
# output_dpath = Path(data_folder)
from pathlib import Path
data_folder_path = Path(data_folder)  # Convert the string to a Path object

print(data_folder_path)
ed_list = []
for converted_file in sorted(data_folder_path.glob("*.zarr")):
    print(converted_file)
    ed_list.append(ep.open_converted(converted_file))

In [None]:
ed = ep.combine_echodata(ed_list)

In [None]:
ds_Sv = ep.calibrate.compute_Sv(ed).compute()

In [None]:
ds_Sv