In [1]:
import boto3
import os
from pathlib import Path
import pandas as pd
from tqdm.notebook import tqdm
from concurrent.futures import ThreadPoolExecutor

client = boto3.client('s3')

bucket = "ml-for-bem"
# experiment_name = "single-climate-zone/v1/nyc"
# experiment_name = "full_climate_zone/v1/train"
# experiment_name = "full_climate_zone/v2/test"
# experiment_name = "full_climate_zone/v3/train"
# experiment_name = "full_climate_zone/v3/test"
# experiment_name = "full_climate_zone/v4/train"
# experiment_name = "full_climate_zone/v4/test"
# experiment_name = "full_climate_zone/v5/train"
# experiment_name = "full_climate_zone/batch-test/train"
# experiment_name = "full_climate_zone/v6/test"
# experiment_name = "single-climate-zone/v2/train"
# experiment_name = "full_climate_zone/v7/train"
# experiment_name = "full_climate_zone/v7/test"
# experiment_name = "single-climate-zone/toronto/v7/test"
experiment_name = "full_climate_zone/v8/train"
experiment_name = "full_climate_zone/test/with-hourly"
experiment_name = "full_climate_zone_hourly/v1/test"

time_resolution = "hourly"

In [2]:
paginator = client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=experiment_name + "/errors/")

error_ids = []
files = []
for page in pages:
    try:
        contents = page["Contents"]
    except KeyError:
        print("No errors reported!")
    else:
        for obj in page['Contents']:
            files.append(obj['Key'])
            error_ids.append(obj['Key'].split('/')[-1].split('.')[0])
        
print(len(error_ids))

No errors reported!
0


In [3]:
# get a list of all the files in the /monthly/ folder

pages = paginator.paginate(Bucket=bucket, Prefix=experiment_name + f"/{time_resolution}/")

timestep_files = []
for page in pages:
    for obj in page['Contents']:
        timestep_files.append(obj['Key'])
print(len(timestep_files))

1000


In [4]:
def download_and_open(file):
    client.download_file(bucket, file, "data/hdf5/" + file)
    with pd.HDFStore("data/hdf5/" + file) as store:
        df = store["batch_results"]
    os.remove("data/hdf5/" + file)
    return df

In [5]:
# use a thread pool executor to download all files in timestep_files
os.makedirs("data/hdf5/" + experiment_name + f"/{time_resolution}", exist_ok=True)
with ThreadPoolExecutor(max_workers=8) as executor:
    dfs = list(tqdm(executor.map(download_and_open, timestep_files), total=len(timestep_files)))

dfs = pd.concat(dfs, axis=0)
print(len(dfs))


  0%|          | 0/1000 [00:00<?, ?it/s]

10000


In [6]:
ids = dfs.index.get_level_values("id")    
error_mask = ids.isin(error_ids)
dfs = dfs.set_index(error_mask,  append=True, )
dfs.index.names = dfs.index.names[:-1] + ["error"]
dfs.index.to_frame(index=False).head()
dfs.to_hdf("data/hdf5/" + experiment_name + f"/{time_resolution}.hdf", key="batch_results", mode="w")

In [None]:
# upload the monthly.hdf file to s3
client.upload_file("data/hdf5/" + experiment_name + f"/{time_resolution}.hdf", bucket, experiment_name + f"/{time_resolution}.hdf")

In [None]:
# Concatenate with another df if desired
# dfs_7 = pd.read_hdf("data/lightning/full_climate_zone/v7/train/monthly.hdf", key="batch_results")
# df_all = pd.concat([dfs, dfs_7], axis=0)
# df_all.head()
# df_all.to_hdf("data/hdf5/" + experiment_name + "/monthly.hdf", key="batch_results", mode="w")
# client.upload_file("data/hdf5/" + experiment_name + "/monthly.hdf", bucket, experiment_name + "/monthly.hdf")
# len(df_all)