# s3 range-get requests w/ AWS SDK

Try to manually execute some S3 range-get requests 
using lower-level Python code (e.g., Python AWS SDK — 
I think this is the boto library) and decompress and 
decode them into numpy arrays using the information in the 
Kerchunk reference files (byte order, dimensionality, etc.). 
This will help you understand the steps youll eventually need to 
do in C++, and help you make sure what youre doing is actually possible 
(e.g., there arent weird web issues that arent your fault) in a 
friendlier interactive environment.

In [1]:

import zlib
import json
import boto3
import boto
import fsspec
import fsspec.utils
import numpy as np
import xarray as xr
import zarr
import os
import ujson
import s3fs
import numcodecs

import kerchunk.combine
from kerchunk.zarr import single_zarr
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
from pathlib import Path

### Stream sample bytes from generic NetCDF

In [2]:
# url: 's3://era5-pds/2020/*/data/air_pressure_at_mean_sea_level.nc'[:2]
# replace with 01 etc prefixes 

s3 = boto3.client('s3')
s3_url = 's3://era5-pds/2020/01/data/air_pressure_at_mean_sea_level.nc'
byte_range = 'bytes=0-64'

s3_parts = s3_url.split("/")
bucket_name = s3_parts[2]
object_key = "/".join(s3_parts[3:])

response = s3.get_object(Bucket=bucket_name, Key=object_key, Range=byte_range)

# Read and print the content of the specified byte range
content = response['Body'].read()

print(content[:40])
binary_string = "{:08b}".format(int(content.hex(),16))
print(binary_string[:40])

b'\x89HDF\r\n\x1a\n\x00\x00\x00\x00\x00\x08\x08\x00\x04\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff'
1000100101001000010001000100011000001101


### Now directly apply metadata to extract a Zarr chunk

#### Zarr storage sec:
Multiple arrays can be stored in the same array store by 
associating each array with a different logical path.
A logical path is simply an ASCII string. The logical 
path is used to form a prefix for keys used by the array. 
For example, if an array is stored at logical path “foo/bar” 
then the array metadata will be stored under the key “foo/bar/.zarray”,
the user-defined attributes will be stored under the key “foo/bar/.zattrs”, 
and the chunks will be stored under keys like “foo/bar/0.0”, “foo/bar/0.1”, etc.

The compressed sequence of bytes for each chunk is stored 
under a key formed from the index of the chunk within 
the grid of chunks representing the array. To form a string key 
for a chunk, the indices are converted to strings and concatenated
with the period character (“.”) separating each index. For example, 
given an array with shape (10000, 10000) and chunk shape (1000, 1000) 
there will be 100 chunks laid out in a 10 by 10 grid. 
The chunk with indices (0, 0) provides data for rows 0-999 and columns 0-999 
and is stored under the key “0.0”; the chunk with indices (2, 4) provides 
data for rows 2000-2999 and columns 4000-4999 and is stored under the key “2.4”; etc.

In [3]:
def bytes_to_binary(data):
    binary_string = ' '.join(format(byte, '08b') for byte in data)
    return binary_string

In [4]:
# given string of bytes and the json --> recreate the reading/parsing

# load meta data into python dict
json_path = '/Users/katrinasharonin/Downloads/kerchunkC/jsons/01_air_pressure_at_mean_sea_level.json'
f = open(json_path)
meta = json.load(f)

print('metadata display')
print(meta.keys())
# print(meta['refs'].keys())
# print(len(meta['refs'].keys()))

from itertools import islice

def take(n, iterable):
    """Return the first n items of the iterable as a list."""
    return list(islice(iterable, n))

n_items = take(10, meta['refs'].items())
print('sample dict values')
print(n_items[0])
print(n_items[1])
print(n_items[2])
print(n_items[3])
print(n_items[4])
print(n_items[5])

print(meta['refs']['air_pressure_at_mean_sea_level/.zarray'])
print(meta['refs']['air_pressure_at_mean_sea_level/0.0.0'])
print(meta['refs']['air_pressure_at_mean_sea_level/0.0.1'])
print(meta['refs']['air_pressure_at_mean_sea_level/0.0.2'])

# compare to actual sub sections from zarr engine - first chunk
start_byte = meta['refs']['air_pressure_at_mean_sea_level/0.0.0'][1]
num_bytes = meta['refs']['air_pressure_at_mean_sea_level/0.0.0'][2]

byte_range = f'bytes={start_byte}-{start_byte+num_bytes}'
response = s3.get_object(Bucket=bucket_name, Key=object_key, Range=byte_range)
content = response['Body'].read()
print('\n')
print('fetched sample bytes from nc')
print(content[:50])
binary_representation = bytes_to_binary(content[:40])
print(binary_representation)

buf = zlib.decompress(content)

print('\n')
print('after decompressing:')
for value in buf[:10]:
    print(bin(value)[2:].zfill(8) + " ", end = '')

# use filter + fill value to restore proper buf
# filters":[{"elementsize":4,"id":"shuffle"}],
buf = numcodecs.shuffle.Shuffle(4).decode(buf)

print('\n')
print('after shuffle:')
for value in buf[:20]:
    print(bin(value)[2:].zfill(8) + " ", end = '')

chunk = np.frombuffer(buf, dtype='<f4')

print('\n')
print('chunk 1 using 0.0.0 indexing')
# print(buf)
print(chunk)

metadata display
dict_keys(['version', 'refs'])
sample dict values
('.zgroup', '{"zarr_format":2}')
('.zattrs', '{"institution":"ECMWF","source":"Reanalysis","title":"ERA5 forecasts"}')
('air_pressure_at_mean_sea_level/.zarray', '{"chunks":[24,100,100],"compressor":{"id":"zlib","level":4},"dtype":"<f4","fill_value":9.969209968386869e+36,"filters":[{"elementsize":4,"id":"shuffle"}],"order":"C","shape":[744,721,1440],"zarr_format":2}')
('air_pressure_at_mean_sea_level/.zattrs', '{"_ARRAY_DIMENSIONS":["time0","lat","lon"],"least_significant_digit":2,"long_name":"Mean sea level pressure","nameCDM":"Mean_sea_level_pressure_surface","nameECMWF":"Mean sea level pressure","product_type":"analysis","shortNameECMWF":"msl","standard_name":"air_pressure_at_mean_sea_level","units":"Pa"}')
('air_pressure_at_mean_sea_level/0.0.0', ['era5-pds/2020/01/data/air_pressure_at_mean_sea_level.nc', 19226, 256358])
('air_pressure_at_mean_sea_level/0.0.1', ['era5-pds/2020/01/data/air_pressure_at_mean_sea_level.

### Compare against actual xarray Zarr engine; contents should match!

In [5]:
# find the corresponding chunk in the xarray reading w/ zarr engine

# contrast against: xarray reading of zarr with engine
# chunk sizing: chunks":[24,100,100]

ds = xr.open_dataset("reference://", engine="zarr", backend_kwargs={
                    "consolidated": False,
                    "storage_options": {"fo": json_path, "remote_protocol": "s3","remote_options": {"anon": True}}
                    })

# print(ds)

first_chunk = ds.isel(time0=slice(0, 24), lat=slice(0, 100), lon=slice(0, 100))
print(first_chunk.to_array())
print(first_chunk.to_array().size)
print('should equal...')
print(len(chunk))

first_flatten = first_chunk['air_pressure_at_mean_sea_level'].values.flatten()
matching_mask = (first_flatten == chunk)

# Check if any True values exist in the mask
print(matching_mask[:10])
if matching_mask.any():
    print("flatten: the target array exists in the 'first_chunk' dataset.")
else:
    print("flatten: the target array does not exist in the 'first_chunk' dataset.")

for ex in chunk:
    if ex in first_flatten:
            print("match was found in flattened!")
            break
print("All search options exhausted, see above for results")

<xarray.DataArray (variable: 1, time0: 24, lat: 100, lon: 100)>
array([[[[ 99968.06,  99968.06,  99968.06, ...,  99968.06,  99968.06,
           99968.06],
         [ 99949.81,  99950.06,  99950.06, ...,  99965.81,  99965.81,
           99966.06],
         [ 99925.56,  99925.81,  99926.31, ...,  99958.56,  99958.81,
           99959.31],
         ...,
         [ 99567.56,  99568.81,  99571.81, ..., 100111.81, 100103.06,
          100097.56],
         [ 99708.31,  99709.56,  99710.56, ..., 100131.56, 100123.81,
          100118.31],
         [ 99839.56,  99840.81,  99841.56, ..., 100153.56, 100145.56,
          100140.06]],

        [[100021.31, 100021.31, 100021.31, ..., 100021.31, 100021.31,
          100021.31],
         [100008.06, 100008.06, 100008.31, ..., 100021.06, 100021.06,
          100021.31],
         [ 99986.31,  99986.56,  99986.81, ..., 100014.06, 100014.31,
          100014.56],
...
         [ 98630.56,  98661.31,  98692.06, ...,  98561.81,  98563.56,
           98567.5

In [11]:
print(chunk)

[99968.06 99968.06 99968.06 ... 98759.75 98760.75 98763.5 ]


In [10]:
print('test xarray sizing - find variation in sizing')
print(first_chunk.to_array()[0][0][0])
print(len(first_chunk.to_array()[0][0][0]))
print(len(first_chunk.to_array()[0][0]))
print(len(first_chunk.to_array()[0]))

print('observation; lon aka last is varying on the deepest level')
print('observation; time is iterating on the highest level')

test xarray sizing - find variation in sizing
<xarray.DataArray (lon: 100)>
array([99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.06, 99968.06,
       99968.06, 99968.06, 99968.06, 99968.06, 99968.

In [19]:
print(first_chunk.to_array()[0][0])

<xarray.DataArray (lat: 100, lon: 100)>
array([[ 99968.06,  99968.06,  99968.06, ...,  99968.06,  99968.06,
         99968.06],
       [ 99949.81,  99950.06,  99950.06, ...,  99965.81,  99965.81,
         99966.06],
       [ 99925.56,  99925.81,  99926.31, ...,  99958.56,  99958.81,
         99959.31],
       ...,
       [ 99567.56,  99568.81,  99571.81, ..., 100111.81, 100103.06,
        100097.56],
       [ 99708.31,  99709.56,  99710.56, ..., 100131.56, 100123.81,
        100118.31],
       [ 99839.56,  99840.81,  99841.56, ..., 100153.56, 100145.56,
        100140.06]], dtype=float32)
Coordinates:
  * lat       (lat) float32 90.0 89.75 89.5 89.25 89.0 ... 66.0 65.75 65.5 65.25
  * lon       (lon) float32 0.0 0.25 0.5 0.75 1.0 ... 24.0 24.25 24.5 24.75
    time0     datetime64[ns] 2020-01-01
    variable  <U30 'air_pressure_at_mean_sea_level'
Attributes:
    institution:  ECMWF
    source:       Reanalysis
    title:        ERA5 forecasts


In [6]:
# spare code - compression recreation for zlib

"""
// temporary compression function
std::vector<Bytef> compressData(Bytef* inputData, 
                                uLong inputSize, 
                                int compressionLevel = Z_BEST_COMPRESSION) {
    std::vector<Bytef> compressedData;

    z_stream stream;
    stream.zalloc = Z_NULL;
    stream.zfree = Z_NULL;
    stream.opaque = Z_NULL;

    int result = deflateInit(&stream, compressionLevel);
    if (result != Z_OK) {
        return compressedData;
    }

    stream.avail_in = static_cast<uInt>(inputSize);
    stream.next_in = const_cast<Bytef*>(inputData);
    
    uLong outputBufferSize = deflateBound(&stream, inputSize);
    compressedData.resize(outputBufferSize);
    stream.avail_out = static_cast<uInt>(outputBufferSize);
    stream.next_out = &compressedData[0];

    result = deflate(&stream, Z_FINISH);
    if (result != Z_STREAM_END) {
        deflateEnd(&stream);
        return compressedData;
    }

    deflateEnd(&stream);

    compressedData.resize(outputBufferSize - stream.avail_out);

    std::cout << std::endl;
    std::cout << "Compressed Data (Binary): ";
    for (size_t i = 0; i < 100 && i < compressedData.size(); ++i) {
        Bytef byte = compressedData[i];
        // std::cout << "\\x";
        for (int bit = 7; bit >= 0; --bit) {
            std::cout << ((byte >> bit) & 1);
        }
        std::cout << " ";
    }
    std::cout << std::dec << std::endl;

    return compressedData;
}

"""

'\n\n// temporary compression function\nstd::vector<Bytef> compressData(Bytef* inputData, \n                                uLong inputSize, \n                                int compressionLevel = Z_BEST_COMPRESSION) {\n    std::vector<Bytef> compressedData;\n\n    z_stream stream;\n    stream.zalloc = Z_NULL;\n    stream.zfree = Z_NULL;\n    stream.opaque = Z_NULL;\n\n    int result = deflateInit(&stream, compressionLevel);\n    if (result != Z_OK) {\n        return compressedData;\n    }\n\n    stream.avail_in = static_cast<uInt>(inputSize);\n    stream.next_in = const_cast<Bytef*>(inputData);\n    \n    uLong outputBufferSize = deflateBound(&stream, inputSize);\n    compressedData.resize(outputBufferSize);\n    stream.avail_out = static_cast<uInt>(outputBufferSize);\n    stream.next_out = &compressedData[0];\n\n    result = deflate(&stream, Z_FINISH);\n    if (result != Z_STREAM_END) {\n        deflateEnd(&stream);\n        return compressedData;\n    }\n\n    deflateEnd(&stream);\