目前阶段，era5数据在minio仍然以文件的形式存储。

**存在问题：**
1. 如果文件很大读取效率低。
2. 跨文件读取不方便；


**解决思路：**写块（chunk）

**实现目标：**将数据转化为[zarr](https://zarr.readthedocs.io/en/stable/)格式

**实现方法：**使用[kerchunk](https://fsspec.github.io/kerchunk/)

# 关于kerchunk

简单说，能够更高效地读取本地或s3（如minio）上的部分格式（如NetCDF/HDF5, GRIB2, TIFF, …）的数据（解决问题1），并且能够跨文件创建虚拟数据集（解决问题2）。

# 安装kerchunk

In [None]:
!pip install git+https://github.com/fsspec/kerchunk

# 生成JSON文件
将minio上的每个era5文件分块，并生成对应的json文件

In [None]:
import kerchunk.netCDF3
import fsspec
import s3fs

In [None]:
access_key='{minio的access_key}'
secret_key='{minio的secret_key}'
storage_options = {
    'client_kwargs': {'endpoint_url': 'http://minio.waterism.com:9000'}, 
    'key': access_key, 
    'secret': secret_key
}

读取minio上的era5文件

In [None]:
fs = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "http://minio.waterism.com:9000"}, key=access_key, secret=secret_key)
fs

In [None]:
flist = fs.glob('watermodel-pub/geodata/era5_land/*/*/*.nc')
flist

将每个era5对应的nc文件写块并生成对应json文件

In [None]:
from pathlib import Path
import os
import ujson

In [None]:
def gen_json(file_url):
    h5chunks = kerchunk.netCDF3.netcdf_recording_file('s3://'+file_url, storage_options=storage_options)

    outf = file_url[:-3] + '.json' #file name to save json to
    outf = outf.replace('watermodel-pub', 'test')
    with fs.open(outf, 'wb') as f:
        f.write(ujson.dumps(h5chunks.translate()).encode());

In [None]:
%%time
for file in flist:
    gen_json(file)

通过读取json文件获得era5数据，读取效率更高

In [None]:
import xarray as xr

In [None]:
%%time
ds = xr.open_dataset(
    "reference://", 
    engine="zarr", 
    backend_kwargs={
        "consolidated": False,
        "storage_options": {"fo": fs.open('s3://test/geodata/era5_land/2021/01/01.json'), 
                            "remote_protocol": "s3",
                            "remote_options": storage_options
                            }
                    }
    )
ds

# 创建多文件数据集
通过合并json文件生成包含多文件数据集的json文件

In [None]:
from kerchunk.combine import MultiZarrToZarr

获取多个单文件json

In [None]:
json_list = fs.glob("test/geodata/era5_land/*/*/*.json")
json_list = ['s3://'+str for str in json_list]
json_list

合并多个json形成多文件json

In [None]:
mzz = MultiZarrToZarr(
    json_list,
    target_options = storage_options,
    remote_protocol = 's3',
    remote_options = storage_options,
    concat_dims = ['time'],
    identical_dims = ['latitude', 'longitude'])

d = mzz.translate()

with fs.open('s3://test/geodata/era5_land/era5_land.json', 'wb') as f:
    f.write(ujson.dumps(d).encode())

跨文件读取数据

In [None]:
%%time
ds = xr.open_dataset("reference://", engine="zarr", backend_kwargs={
                    "consolidated": False,
                    "storage_options": {"fo": fs.open('s3://test/geodata/era5_land/era5_land.json'), "remote_protocol": "s3","remote_options": {'client_kwargs': {'endpoint_url': 'http://minio.waterism.com:9000'}, 'key': 'JKhbLNL0jNKqbjn4', 'secret': '0RDubDRBIrC2WOHAP4nHtYP28TXtVj8H'}}}
                    )
ds