In [2]:
from utility.Loader.DataLoaderService import load_data
from model.UNet import UNetModel
from DownscalingPipeline import DownscalingPipeline
from IPython.display import display
import xarray as xr
import dask

from joblib import Parallel, delayed
from sklearn.model_selection import train_test_split


import tensorflow as tf
from keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard

.
## ERA5 and CERRA Data
I've loaded the data via cloud from https://storage.ecmwf.europeanweather.cloud/Code4Earth/. In total I used the data for the time period from 2019-01 - 2021-06 for both data sets (ERA5 and CERRA). While fetching the data from the cloud I've realized that ERA5 and CERRA data only exists for the time period till 2021-06.

<br>

### Which Parameters do I need?
From both data set types (ERA5 and CERRA):
- 2m temperature
- time (year, month, day, time)
- longitude
- latitude

In addition to the ERA5 and CERRA datasets containing the t2m-parameter, there are two datasets containting some additional parameters. The parameters are the following:
<br>

**ERA5**
<br>era5_topography_ds contains lsm and z as data variables
- **lsm**  => Land Surface Model. It refers to land surface variables that describe various properties and processes related to the land surface
- **z** => refers to geopotential height, it provides information about the vertical structure of the atmosphere and is often used to analyze weather patterns, identify atmospheric circulation features, and understand large-scale atmospheric dynamics
<br>

**CERRA**
<br>cerra_orography_ds contains lsm and orog as data variables
- **lsm** => refers to land surface variables that describe various properties and processes related to the land surface
- **orog** => topography or relief of the Earth's surface

These datasets are only available for one time period. But since the orography etc. doesn't change during such a short time period, I fetched it once from the cloud and the added it as a new dimension to the exisitng ERA5 and CERRA datasets.

<br>

### How did I split the data?
As discussed with Irene I'm using for the time period, only few years and the following splitting (ca. 70 - 15 - 15):

- **trainig:** 2015 - 2019 
- **testing:** 2020 
- **validation:** 2021

In [3]:
era5_list, cerra_list = load_data()

Cannot find the ecCodes library


.
## Data Preperation
Currently loaded 
- CERRA 01-2019 - 2021-06
- ERA5 01-2019 - 2021-06

### 01 Combine 
Combine list of cerra xarrays to single xarray (same for era5) and save it to disk.<br><br>


In [4]:
def store_to_disk(file_name, data, file_path="./data/"):
    """
    Store data to disk in a specified folder.

    Parameters:
    - file_name (str): The name of the file to be saved.
    - data: The data to be stored.
    - folder_path (str, optional): The path to the folder where the file will be saved.
      Default is "data".
    """
    file = f"{file_path}{file_name}.nc"

    #data.load().to_netcdf(file)
    #data.to_netcdf(f"{file_path}{file_name}.nc")

    # Assuming 'ds' is your xarray dataset
    #data.to_zarr(f"{file_path}{file_name}.nc", mode='w', compute=True, scheduler='threads', encoding={'variable_name': {'compressor': {'id': 'zstd', 'level': 3}}})

    data.to_zarr(file, mode='w', compute=False)

    # Use dask to parallelize the computation and writing
    dask.compute(data.compute())



def load_from_disk(file_name, file_path="./data/"):
    """
    Load climate data from disk.

    Parameters:
    - file_path (str): Path to the directory containing the file. Default is "data".
    - file_name (str): Name of the file.

    Returns:
    - xr.Dataset: Loaded climate data.
    """
    file_path = file_path + file_name + ".nc"
    return xr.open_dataset(file_path)


try to store the single xarray of cerra & era after cropping to the disk. So I will save some time. 

In [5]:
# Combine 
cerra_ds = xr.concat(cerra_list, dim='time')
era5_ds = xr.concat(era5_list, dim='time')

### 02 - Crop the Data set
**Problem** The input shape is not dividable by 32 => so i have crop it, so the model works proberly. 

#### Explaination
In many convolutional neural network architectures, especially those that involve multiple downsampling and upsampling operations, it is common to design the network such that the spatial dimensions are divisible by certain factors (e.g., 32). This ensures that the dimensions can be downsampled and upsampled without resulting in fractional spatial dimensions.

For example, if you're using strides of 2 in downsampling operations and upsampling by factors of 2, the spatial dimensions of your feature maps need to be divisible by 2.<br>

#### TODO: 
later cut it a little bit better (from each edge)


In [6]:
# ERA 5 
# Desired divisible factor (e.g., 32)
divisible_factor = 32

# Calculate the new size that is divisible by the factor for both longitude and latitude
new_longitude_size = (era5_ds.longitude.size // divisible_factor) * divisible_factor
new_latitude_size = (era5_ds.latitude.size // divisible_factor) * divisible_factor

# Crop the ERA5 dataset
era5 = era5_ds.sel(
    longitude=slice(era5_ds.longitude.values[0], era5_ds.longitude.values[new_longitude_size - 1]),
    latitude=slice(era5_ds.latitude.values[0], era5_ds.latitude.values[new_latitude_size - 1])
)

# Verify the new dimensions
print(era5_ds.dims)
print(era5.dims)


# CERRA 
# Calculate the new size that is divisible by the factor for both longitude and latitude
new_longitude_size = (cerra_ds.longitude.size // divisible_factor) * divisible_factor
new_latitude_size = (cerra_ds.latitude.size // divisible_factor) * divisible_factor

# Crop the ERA5 dataset
cerra = cerra_ds.sel(
    longitude=slice(cerra_ds.longitude.values[0], cerra_ds.longitude.values[new_longitude_size - 1]),
    latitude=slice(cerra_ds.latitude.values[0], cerra_ds.latitude.values[new_latitude_size - 1])
)

# Verify the new dimensions
print(cerra_ds.dims)
print(cerra.dims)


Frozen({'longitude': 281, 'latitude': 221, 'time': 7296})
Frozen({'longitude': 256, 'latitude': 192, 'time': 7296})
Frozen({'longitude': 801, 'latitude': 501, 'time': 7296})
Frozen({'longitude': 800, 'latitude': 480, 'time': 7296})


In [None]:
# save cropped data to disk
#store_to_disk("cerra_01_2019_06_2021", cerra)
#store_to_disk("era5_01_2019_06_2021", era5)

#jobs = [("cerra_01_2019_06_2021", cerra), ("era5_01_2019_06_2021", era5)]
#Parallel(n_jobs=-1)(delayed(store_to_disk)(job[0], job[1]) for job in jobs)

store_to_disk("era5_01_2019_06_2021", era5)

zarr is better suited to parallel writes. Even with arrays backed with distributed task clusters, to_netcdf brings the array to the local thread (in chunks, but still) to write to the netcdf in the main thread. Writing with zarr schedules the write, then the workers write to the storage in parallel.

In [6]:
file = "era5_01_2019_06_2021"
write_job = era5.to_netcdf(file, compute=False) #try zarr

print(f"Writing to {file}")
write_job.compute()

# time: 10.3s

Writing to era5_01_2019_06_2021


unfortunantly the data is completely gone D: 
When you use mode='w' with to_zarr, it means you are writing a new dataset, and if files with the same name already exist, they will be replaced.  

In [8]:
import dask.array as da
import dask.dataframe as dd

path = "./data/"

#write_job = cerra.to_netcdf(file, compute=False) 
# try zipping (to save storage on disk )

write_job = cerra.to_zarr(path, mode='w', compute=False)

print(f"Writing to {file}")
write_job.compute(progress_bar=True)



### 03 Align Spatial Dimension 
So both cerra & era5 cover almost same spatial space<br>
initially, no alignment needed => ERA5 covers a little bit more spatial space (but thats okay, as in covers orginally the whole globe, at a padding is desired so the boardes of cerra can be correctly predicted )


In [None]:
# SHOW MAX & MIN OF COORDINATES
print("ERA")
print("latitude: ")
print(cropped_era5_data.latitude.max().values)
print(cropped_era5_data.latitude.min().values)
print("longitude: ")
print(cropped_era5_data.longitude.max().values)
print(cropped_era5_data.longitude.min().values)
print("latitude: ")
print(era5.latitude.max().values)
print(era5.latitude.min().values)
print("longitude: ")
print(era5.longitude.max().values)
print(era5.longitude.min().values)

print("\nCERRA")
print("latitude: ")
print(cerra.latitude.max().values)
print(cerra.latitude.min().values)
print("longitude: ")
print(cerra.longitude.max().values)
print(cerra.longitude.min().values)



### 04 Split Data Set 
Split both sets into training, testing, validation 

**Disclaimer:** Splitting of cerra data takes very long. I try to speed it up, by paralizing the job

##### Parallelizing the job
To speed up the data splitting process, you can consider parallelizing the splitting operations using the joblib library. The joblib library allows you to parallelize the computation, which can be particularly beneficial when dealing with large datasets. <br><br> The split_data function is defined to split the time dimension for a given dataset (ERA5 or CERRA). The Parallel function from joblib is then used to parallelize the splitting process for both ERA5 and CERRA datasets. <br><br> 
In the context of the joblib library, the parameter -1 for the n_jobs argument means to use all available CPU cores. It is a shorthand for specifying the number of parallel jobs to run in parallel processes. 

In [None]:
# load from disk (no need to combine & crop it everytime)
cerra = load_from_disk("cerra_01_2019_06_2021")
era5 = load_from_disk("era5_01_2019_06_2021")

In [15]:


def split_(t, data):
    return data.sel(time=t)

def split(args):
    t, data = args
    return data.sel(time=t)

def split_data(common_time, data):
    train_time, test_time = train_test_split(common_time, test_size=0.2, shuffle=False)
    train_time, val_time = train_test_split(train_time, test_size=0.2, shuffle=False)

    # Use joblib to parallelize the following operations
    train_data, val_data, test_data = Parallel(n_jobs=-1)(delayed(split)((t, data)) for t in [train_time, val_time, test_time]) # type: ignore
    return train_data, val_data, test_data



In [17]:
# Parallelize the splitting for both ERA5 and CERRA
common_time = era5['time'].values

era5_train, era5_val, era5_test = split_data(common_time, era5)
#print(era5_train, era5_val, era5_test)

store_to_disk("era5_train_2019_2021", era5_train)
store_to_disk("era5_val_2019_2021", era5_val)
store_to_disk("era5_test_2019_2021", era5_test)

<xarray.Dataset>
Dimensions:    (longitude: 256, latitude: 192, time: 4668)
Coordinates:
  * longitude  (longitude) float32 -25.0 -24.75 -24.5 ... 38.25 38.5 38.75
  * latitude   (latitude) float32 75.0 74.75 74.5 74.25 ... 27.75 27.5 27.25
  * time       (time) datetime64[ns] 2019-01-01 ... 2020-08-06T09:00:00
Data variables:
    t2m        (time, latitude, longitude) float32 248.5 248.6 ... 304.7 304.8
    z          (time, latitude, longitude) float32 1.797e+04 ... 9.578e+03
    lsm        (time, latitude, longitude) float32 1.0 1.0 1.0 ... 1.0 1.0 1.0
Attributes:
    Conventions:  CF-1.6
    history:      2023-05-26 07:39:28 GMT by grib_to_netcdf-2.28.0: grib_to_n... <xarray.Dataset>
Dimensions:    (longitude: 256, latitude: 192, time: 1168)
Coordinates:
  * longitude  (longitude) float32 -25.0 -24.75 -24.5 ... 38.25 38.5 38.75
  * latitude   (latitude) float32 75.0 74.75 74.5 74.25 ... 27.75 27.5 27.25
  * time       (time) datetime64[ns] 2020-08-06T12:00:00 ... 2020-12-30T09:00:0

In [18]:

cerra_train, cerra_val, cerra_test = split_data(common_time, cerra)
print(cerra_train, cerra_val, cerra_test)

store_to_disk("cerra_train_2019_2021", cerra_train)
store_to_disk("cerra_val_2019_2021", cerra_val)
store_to_disk("cerra_test_2019_2021", cerra_test)

IOStream.flush timed out
IOStream.flush timed out


## Build Model
### 01 - Find Input Shape 

In [7]:

# Extracting dimensions from the dataset
time_steps = len(era5['time'])
latitude_points = len(era5['latitude'])
longitude_points = len(era5['longitude'])
num_variables = len(era5.data_vars)  # Assuming all data variables are used as input

# Reshaping into the input shape
input_shape = (time_steps, latitude_points, longitude_points, num_variables)
input_shape = (latitude_points, longitude_points, num_variables)  # like irenes model (she doesn't use time)


print(input_shape)

(192, 256, 3)


### 02 - Build Model

In [None]:
model_service = UNetModel()
model = model_service.create_model(input_shape)
model.summary()


## Train Model 
For Later: look into TimeSeriesSplit 

#### Why I stored my data to disk:
Storing your preprocessed and split dataset to disk is a common and efficient practice, especially when working with large datasets. This can save you time during debugging and testing phases, as you won't need to repeatedly preprocess and split the data, which can be time-consuming.

In [None]:
# load the data
# CERRA
cerra_train = load_from_disk("cerra_train_2019_2021")
cerra_val = load_from_disk("cerra_val_2019_2021")
cerra_test = load_from_disk("cerra_test_2019_2021")

# ERA5
era5_train = load_from_disk("era5_train_2019_2021")
era5_val = load_from_disk("era5_val_2019_2021")
era5_test = load_from_disk("era5_test_2019_2021")

In [None]:
# Define callbacks
early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
model_checkpoint = ModelCheckpoint('best_model.h5', save_best_only=True)
tensorboard = TensorBoard(log_dir='logs')

# Train the model
history = model.fit(
    x=era5_train,  # Replace with your training data
    y=cerra_train,  # Replace with your training labels
    validation_data=(validation_data, validation_labels),  # Replace with your validation data
    epochs=num_epochs,
    callbacks=[early_stopping, model_checkpoint, tensorboard]
)


## Hyperparameter optimization

## Model Evaluation