# Testing ROS3 and fsspec with h5py on cloud optimized HDF5 files 

This notebook tests both I/O drivers on cloud optimized HDF5 files from the ICESat-2 mission. 

> Note: The ROS3 driver is only available in the Conda distribution of h5py

In [None]:
import fsspec
import pandas as pd
import matplotlib.pyplot as plt
import h5py

from dask.distributed import Client, LocalCluster
import dask.bag as db
from dask.diagnostics import ProgressBar

from h5logger import parse_fsspec_log, read_file


for library in (h5py, fsspec):
    print(f'{library.__name__} v{library.__version__}')

In [None]:
test_dict = {
    "ATL06": {
        "files": {
            "original": "s3://its-live-data/test-space/cloud-experiments/h5cloud/atl06/ATL06_20200811143458_07210811_006_01.h5",
            "page-only-8mb": "s3://its-live-data/test-space/cloud-experiments/h5cloud/atl06/ATL06_20200811143458_07210811_006_01_page_8mb.h5",
            "rechunked-2mb": "s3://its-live-data/test-space/cloud-experiments/h5cloud/atl06/ATL06_20200811143458_07210811_006_01_rechunked-2mb-repacked.h5",
            "rechunked-4mb": "s3://its-live-data/test-space/cloud-experiments/h5cloud/atl06/ATL06_20200811143458_07210811_006_01_rechunked-4mb-repacked.h5",
            "rechunked-8mb": "s3://its-live-data/test-space/cloud-experiments/h5cloud/atl06/ATL06_20200811143458_07210811_006_01_rechunked-4mb-repacked.h5",
        },
        "variables": ["/gt1l/land_ice_segments/h_li", "/gt1l/land_ice_segments/latitude", "/gt1l/land_ice_segments/longitude"]
    }
}

In [None]:
if "dask_client" not in locals():
    cluster = LocalCluster(threads_per_worker=1)
    dask_client = Client(cluster)
    dask_client

The importance of caching and over-reads with remote files

* **simple**: Caches entire files on disk.
* **blockcache**: Caches file data in chunks (blocks) on disk.
* **bytes**: Caches entire files in memory.
* **none**: Does not use caching on any request

In [None]:
num_runs = 1
benchmarks = []
ranges = []

#the real default is readahead with 5MB of block sizes, we disabled to test real times without caching anything
default_io_params = {
    "fsspec_params": {
        "skip_instance_cache": True,
        "cache_type": "none"
        # "cache_type": "first", # could be first, or cachiing the entier file with simple, 
        # "block_size": 4*1024*1024
    },
    "h5py_params": {}
}

# we can fine-tune these
optimized_io_params ={
    "fsspec_params": {
        "cache_type": "blockcache", # could be first, or cachiing the entier file with simple, 
        "block_size": 8*1024*1024
    },
    "h5py_params" : {
        "page_buf_size": 16*1024*1024,
        "rdcc_nbytes": 4*1024*1024
    }
}

for optimized_read in [False, True]:
    for driver in ["ros3", "fsspec"]:
        for run in range(num_runs):  # Running N times
            for dataset_name, dataset_item in test_dict.items():
                # Inner loop (parallelized)
                urls = dataset_item["files"].items()            
                benchmark_list = [(run, dataset_name, dataset_item["variables"], flavor, url, optimized_read, driver, default_io_params, optimized_io_params) for flavor, url in urls]
                bag = db.from_sequence(benchmark_list, npartitions=len(benchmark_list))
                result = bag.map(read_file)
                with ProgressBar():
                    results = result.compute()
                    for result in results:
                        if len(result["benchmark"]):
                            benchmarks.append(result["benchmark"])
                        # For now we can only log I/O with fsspec
                        if result["benchmark"]["driver"] == "fsspec":
                            ranges.append(result["ranges"])
                        
df = pd.DataFrame.from_dict(benchmarks)

In [None]:
plt.figure(figsize=(10, 6)) 
plt.style.use('ggplot')

x_max = max(df["time"])
pivot_df = df.pivot_table(index=['driver', 'optimized-read'], columns=['format', ], values='time', aggfunc='mean')
baseline_original = pivot_df['original'].max()

# Plotting
pivot_df.plot(kind='barh', figsize=(20, 8), fontsize=14, width=0.5)

plt.xlim(0, x_max)

plt.suptitle('Cloud-optimized HDF5 performance (less is better)', fontsize=18)
# plt.title("Default I/O parameters (ATL03_20181120182818_08110112_006_02.h5: 7GB)", fontsize=14)
plt.xlabel('Mean Time (S)')
plt.ylabel('Access Pattern', fontsize=16)
plt.xticks(rotation=0)
plt.legend(title='Format', fontsize=14, loc='upper right', bbox_to_anchor=(1.15, 1.015))
plt.grid(False)

plt.axvline(x=baseline_original, color='red', linestyle='--', linewidth=2, label=f"Baseline: {baseline_original:.2f}")

plt.tight_layout()
plt.savefig("stats-default.png", transparent=True, dpi=150)

plt.show()

In [None]:
df.to_csv("h5py-benchmarks.csv")

In [None]:
ranges[0]

In [None]:
from matplotlib.lines import Line2D
import matplotlib.patches as patches
import numpy as np

fig, axs = plt.subplots(ncols=1, nrows=len(ranges), figsize=(18, 18), sharex=True)

for index, range_stats in enumerate(ranges):
    rdf = range_stats["ranges"]
    file_size = range_stats["file_size"]

    bins = [0, 1 * 1024, 10 * 1024, 100 * 1024, np.inf]
    colors = ['red', 'orange', 'purple', 'blue']
    labels = ['< 1KB', '1KB - 10KB', '10KB - 100KB', '> 100KB']
    rdf['color'] = pd.cut(rdf['size'], bins=bins, labels=colors)
    rdf['label'] = pd.cut(rdf['size'], bins=bins, labels=labels)

    for i, row in rdf.iterrows():
        rect = patches.Rectangle((row['start'], 0), row['end']-row['start'], 1, 
                                 linewidth=1, edgecolor=row['color'], facecolor=row['color'], alpha=0.3)
        axs[index].add_patch(rect)

    axs[index].set_xlim(0, 1.1e8)
    axs[index].set_ylim(0, 1)      
    axs[index].set_yticklabels("")
    axs[index].set_yticks([])


# The last axis will retain the x-ticks
axs[-1].tick_params(axis='x', which='both', bottom=True, labelbottom=True)

# Create custom legend handles
legend_elements = [Line2D([0], [0], color=color, lw=2, label=label) for color, label in zip(colors, labels)]
# plt.legend(handles=legend_elements, title="Request Size",  loc='upper right')

handles, labels = axs[0].get_legend_handles_labels()
fig.legend(handles=legend_elements, loc='upper right')

plt.suptitle(f'ATL06 Read Pattern. File Size: {round(file_size/1e6,2)} MB, Total Requests:{len(rdf)}, Requests <10kb: {len(rdf[rdf["size"]<10000])}', fontsize=18)
plt.tight_layout()

plt.show()

In [None]:
# import holoviews as hv
# hv.extension("bokeh")

# xticks = [
#     (1024, '1KB'),
#     (1024*1024, '1MB'),
#     (10*1024*1024, '10MB'),
#     (100*1024*1024, '100MB'),
#     (1024*1024*1024, '1GB')
# ]

# rectangles = hv.Overlay()

# for index, row in rdf.iterrows():
#     # Create a rectangle for each row
#     rect = hv.Rectangles((row['start'], 0, row['end'], 1), label=row['label']).opts(
#         color=row['color'],
#         line_color=row['color'],
#         line_width=1,
#         alpha=0.7  # Optional: Set transparency for better visibility
#     )
#     rectangles *= rect  # Overlay the rectangle on top of the previous ones

# # Customize and display the plot
# rectangles.opts(
#     width=1200, height=300, xlim=(0, file_size), ylim=(0, 1),
#     xlabel='File Offset', ylabel='', xticks=xticks, show_legend=True, legend_position='top_right'
# )