## Data upload to PelicanFS
PelicanFS is a file system interface (fsspec) for the Pelican Platform. For more information about pelican, see PelicanFS [main website](https://pelicanplatform.org/) or [Github page](https://github.com/pelicanplatform/pelicanfs). For more information about fsspec, visit the [filesystem-spec](https://filesystem-spec.readthedocs.io/en/latest/index.html) page.

In [1]:
import fsspec
import pelicanfs
import aiohttp
import certifi
from pelicanfs.core import PelicanFileSystem
import ssl
ssl_ctx = ssl.create_default_context(cafile=certifi.where())
conn = aiohttp.TCPConnector(ssl=ssl_ctx, loop=fsspec.asyn.get_loop())
pelfs = PelicanFileSystem("pelican://osg-htc.org", client_kwargs={"connector": conn})
hello_world = pelfs.cat('/jkb-lab-public/downloaded-test.txt')
print(str(hello_world))

b'testing'


In [2]:
certifi.where()

'/Users/jessiekb/Documents/GitHub/pyologger/venv/lib/python3.12/site-packages/certifi/cacert.pem'

In [3]:
hello_world = pelfs.cat('/ospool/uc-shared/public/OSG-Staff/validation/test.txt')
print(hello_world)

b'Hello, World!\n'


In [4]:
#hello_world = pelfs.cat(f"/jkb-lab-public/{foo}?authz=Bearer%20{token}")

# Placeholder for importing data into DiveDB

In [5]:
import dotenv
# Import necessary pyologger utilities
from pyologger.utils.folder_manager import *
from pyologger.utils.event_manager import *
from pyologger.plot_data.plotter import *
from pyologger.calibrate_data.zoc import *
from pyologger.io_operations.base_exporter import *

# Load important file paths and configurations
config, data_dir, color_mapping_path, montage_path = load_configuration()
# Streamlit load data
animal_id, dataset_id, deployment_id, dataset_folder, deployment_folder, data_pkl, config_manager = select_and_load_deployment(
    data_dir, dataset_id="oror-adult-orca_hr-sr-vid_sw_JKB-PP", deployment_id="2023-10-18_oror-001"
    )
pkl_path = os.path.join(deployment_folder, 'outputs', 'data.pkl')

In [6]:
data_pkl.sensor_info['ecg']

{'channels': ['ecg'],
 'metadata': {'ecg': {'original_name': 'ecg',
   'unit': 'unknown',
   'sensor': 'ecg'}},
 'sensor_start_datetime': Timestamp('2023-10-18 11:46:36-0700', tz='America/Los_Angeles'),
 'sensor_end_datetime': Timestamp('2023-10-18 15:34:16.470000-0700', tz='America/Los_Angeles'),
 'max_value': np.int64(4093),
 'min_value': np.int64(1),
 'mean_value': np.float64(1343.7549332087892),
 'data_type': 'int64',
 'original_units': ['unknown'],
 'sampling_frequency': 100,
 'logger_id': 'UF-01',
 'logger_manufacturer': 'UFI',
 'processing_step': 'Raw data uploaded',
 'last_updated': datetime.datetime(2025, 3, 15, 0, 59, 13, 759934, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>),
 'details': 'Initial, raw sensor-specific data and metadata loaded.'}

In [7]:
import xarray as xr

# Define the path to the NetCDF file
netcdf_path = os.path.join(deployment_folder, 'outputs', f'{deployment_id}_output.nc')

# Open the NetCDF file
data = xr.open_dataset(netcdf_path)

# Display the contents of the NetCDF file
display(data)

## Example uploading netCDF file to DiveDB

In [8]:
LOCAL_DELTA_LAKE = config['paths']['delta_lake']['local']
logger_ids = '_'.join(data_pkl.logger_info.keys())

from DiveDB.services.data_uploader import DataUploader
from DiveDB.services.duck_pond import DuckPond

duckpond = DuckPond(LOCAL_DELTA_LAKE, connect_to_postgres=False)
data_uploader = DataUploader(duckpond=duckpond)

metadata = {
            "animal": animal_id,
            "deployment": deployment_id,
            "recording": f"{deployment_id}_{animal_id}_{logger_ids}"
        }

No OpenStack credentials found. SwiftClient will not be initialized.


In [9]:
data_uploader.upload_netcdf(netcdf_path, metadata)

Creating file record for 2023-10-18_oror-001_output.nc and uploading to OpenStack...
Processing ['light_samples', 'magnetometer_samples', 'temperature_samples', 'accelerometer_samples', 'pressure_samples', 'gyroscope_samples', 'ecg_samples', 'depth_samples', 'prh_samples', 'corrected_acc_samples', 'corrected_mag_samples', 'corrected_gyr_samples', 'sr_smoothed_samples', 'stroke_rate_samples', 'odba_samples', 'hr_normalized_samples', 'heart_rate_samples'] datasets in the netCDF file.


Processing variables: 100%|██████████| 17/17 [00:06<00:00,  2.70it/s]

Upload complete.





In [10]:
conn = duckpond.conn.sql("SELECT count(*) FROM DataLake").df()

display(conn)

Unnamed: 0,count_star()
0,224828020


In [11]:
duckpond.get_delta_data(
    labels=["derived_data_depth"],
    animal_ids = animal_id,
    frequency = 1,
)

Unnamed: 0,datetime,derived_data_depth
0,2023-10-18 21:20:12+00:00,0.0
1,2023-10-18 21:20:13+00:00,0.0
2,2023-10-18 21:20:14+00:00,0.0
3,2023-10-18 21:20:15+00:00,0.0
4,2023-10-18 21:20:16+00:00,0.0
...,...,...
505,2023-10-18 21:28:37+00:00,0.0
506,2023-10-18 21:28:38+00:00,0.0
507,2023-10-18 21:28:39+00:00,0.0
508,2023-10-18 21:28:40+00:00,0.0


In [12]:
df = duckpond.conn.sql(f"""
    SELECT label, avg(value) as mean_data
    FROM (
        SELECT label, value.float as value
        FROM DataLake
        WHERE label = 'sensor_data_light'
        OR label = 'sensor_data_temperature'
    )
    GROUP BY label
""").df()

display(df)

Unnamed: 0,label,mean_data
0,sensor_data_light,0.0
1,sensor_data_temperature,34.439059


In [13]:
# Get the filtered data
filtered_data = duckpond.get_delta_data(    
    animal_ids="apfo-001a",
    # Resample values to 10 Hz and make sure each signal has the same time intervals
    frequency=10,
    # Aggregation of events (think state events - behaviors) type: state (has state and end dates)
    classes="sensor_data_accelerometer",
    
)

display(filtered_data)

Unnamed: 0,datetime


In [14]:
label = "sensor_data_temperature"
df = duckpond.conn.execute(f"""
    SELECT label, avg(value) as mean_data
    FROM (
        SELECT label, value.float as value
        FROM DataLake
        WHERE label = $1
    )
    GROUP BY label
""", [label]).df()

display(df)

Unnamed: 0,label,mean_data
0,sensor_data_temperature,34.439059


## Example Export to EDF

When it's easier to work with EDF files, we can export the data to an EDF file. This is useful for working with the data in other software packages.

Calling `export_to_edf(output_dir)` on a `DiveData` object creates one output EDF file for each recording in the `DiveData` relation, saved to `output_dir` with filename `<recording_id>.edf`. 

*Note: it currently requires a lot of memory. Can be improved.*<br/>
*Note: it's lacking support for most info fields in the EDF file.*

##### Example:

In [17]:
import os
import importlib
import DiveDB.services.duck_pond
import DiveDB.services.dive_data
importlib.reload(DiveDB.services.duck_pond)
importlib.reload(DiveDB.services.dive_data)

duckpond = DuckPond(LOCAL_DELTA_LAKE, connect_to_postgres=False)

dive_data = duckpond.get_delta_data(    
    animal_ids="apfo-001a",
    labels=["sensor_data_temperature", "derived_data_depth"],
    limit=1000000,
)

output_edf_paths = dive_data.export_to_edf(".tmp/my_output_dir/")
display(output_edf_paths)

ModuleNotFoundError: No module named 'DiveDB.services.dive_data'

## Example importing NetCDF data

In [None]:
import xarray as xr
#import netcdf4

# Load the NetCDF file
file_path = f"{data_dir}/2004001_TrackTDR_RawCurated.nc"
dataset = xr.open_dataset(file_path)

# Print the dataset information
print(dataset)