First we need to import and verify all correct packages are present.

In [1]:
import os
import pandas as pd
import xarray as xr
import numpy as np
import tensorflow as tf
from DLWP.data import ERA5Reanalysis
from DLWP.model import Preprocessor
from DLWP.model import DLWPFunctional
from DLWP.model.preprocessing import prepare_data_array
from DLWP.model import ArrayDataGenerator
from DLWP.model import tf_data_generator
# from DLWP.custom import CubeSpherePadding2D, CubeSphereConv2D

Next we need to make sure that the data is present able to be loaded correctly.

In [2]:
def check_dataset_variables(file_path):
    try:
        with xr.open_dataset(file_path, engine='netcdf4') as ds:  # Specify the engine explicitly
            print("Variables available in the dataset:")
            print(ds.variables)
    except ValueError as e:
        print(f"Failed to open the dataset at {file_path} with error: {e}")

Now, we are finding the data and validating it matches what we are looking for.

In [3]:
# Configuration
data_directory = './TrainingData'
processed_file_path = os.path.join(data_directory, 'tutorial_z500_t2m.nc')
variables = ['z', 't2m']  # Adjusted for actual variable names
levels = [500, 0]  # Match variables to levels pair-wise
root_directory = './TrainingData'
predictor_file = os.path.join(root_directory, 'tutorial_z500_t2m.nc')
model_file = os.path.join(root_directory, 'dlwp-cs_tutorial')
log_directory = os.path.join(root_directory, 'logs', 'dlwp-cs_tutorial')

# Ensure the output directory exists
os.makedirs(data_directory, exist_ok=True)
    
# Initialize ERA5 Reanalysis with custom parameters
era = ERA5Reanalysis(root_directory=data_directory, file_id='tutorial')
era.set_variables(variables)
era.set_levels([l for l in levels if l != 0])  # Exclude single-level from levels for ERA5 setup

In [4]:
# Initialize the Preprocessor
pp = Preprocessor(era, predictor_file=processed_file_path)

# Process the data into a series format suitable for DLWP model
pp.data_to_series(
        batch_samples=10000,
        variables=variables,
        levels=levels,
        pairwise=True,
        scale_variables=True,
        overwrite=True,
        verbose=True
    )

# Drop 'varlev' coordinate after processing and save the processed data
processed_data = pp.data.drop_vars('varlev')  # Updated method call
processed_data.to_netcdf(processed_file_path + '_nocoord.nc')  # Save to new file
print(f"Data processed and saved to {processed_file_path}_nocoord.nc")

    # Optionally, print data for verification
print(processed_data)

    # Close resources
era.close()
pp.close()

Generated file path: ./TrainingData/tutorial_t2m.nc
Generated file path: ./TrainingData/tutorial_z.nc
Preprocessor.data_to_samples: opening and formatting raw data
Preprocessor.data_to_samples: creating output file ./TrainingData/tutorial_z500_t2m.nc
Preprocessor.data_to_samples: variable/level pair 1 of 2 (z/500)
Preprocessor.data_to_samples: calculating mean and std


  lat_dim = 'lat' if 'lat' in ds.dims.keys() else 'latitude'
  lon_dim = 'lon' if 'lon' in ds.dims.keys() else 'longitude'
  ds.dims[lat_dim], ds.dims[lon_dim])


Preprocessor.data_to_samples: writing batch 1 of 2
Preprocessor.data_to_samples: writing batch 2 of 2
Preprocessor.data_to_samples: variable/level pair 2 of 2 (t2m/0)
Preprocessor.data_to_samples: calculating mean and std
Preprocessor.data_to_samples: writing batch 1 of 2
Preprocessor.data_to_samples: writing batch 2 of 2
Data processed and saved to ./TrainingData/tutorial_z500_t2m.nc_nocoord.nc
<xarray.Dataset> Size: 2GB
Dimensions:     (lat: 91, lon: 180, sample: 14608, varlev: 2)
Coordinates:
  * lat         (lat) float32 364B 90.0 88.0 86.0 84.0 ... -86.0 -88.0 -90.0
  * lon         (lon) float32 720B 0.0 2.0 4.0 6.0 ... 352.0 354.0 356.0 358.0
  * sample      (sample) datetime64[ns] 117kB 2013-01-01 ... 2017-12-31T21:00:00
Dimensions without coordinates: varlev
Data variables:
    predictors  (sample, varlev, lat, lon) float32 2GB dask.array<chunksize=(1, 2, 91, 180), meta=np.ndarray>
    mean        (varlev) float32 8B dask.array<chunksize=(2,), meta=np.ndarray>
    std         (

Now we will process the data or something like that I think we are gonna send it into a model or attempt to build a model or some shit

In [14]:
cnn_model_name = 'unet2'
base_filter_number = 32
min_epochs = 0
max_epochs = 10
patience = 2
batch_size = 64
shuffle = True
io_selection = {'varlev': ['z/500', 't2m/0']}
add_solar = False
io_time_steps = 2
integration_steps = 2
data_interval = 2
loss_by_step = None


train_set = list(pd.date_range('2013-01-01', '2014-12-31 21:00', freq='3h'))
validation_set = list(pd.date_range('2015-01-01', '2016-12-31 21:00', freq='3h'))

dlwp = DLWPFunctional(is_convolutional=True, time_dim=io_time_steps)
data = xr.open_dataset(predictor_file)
train_data = data.sel(sample=train_set)
validation_data = data.sel(sample=validation_set)

print('Loading data to memory...')
train_array, input_ind, output_ind, sol = prepare_data_array(train_data, input_sel=io_selection,
                                                            output_sel=io_selection, add_insolation=add_solar)
generator = ArrayDataGenerator(
dlwp,
train_array,
rank=3,
input_slice=input_ind,
output_slice=output_ind,
input_time_steps=io_time_steps,
output_time_steps=io_time_steps,
sequence=integration_steps,
interval=data_interval,
# insolation_array=None,
batch_size=batch_size,
shuffle=shuffle,
channels_last=True,
drop_remainder=True
)

Loading data to memory...


In [15]:
import numpy as np
import tensorflow as tf
from DLWP.model.generators import ArrayDataGenerator
from DLWP.model.preprocessing import prepare_data_array

# Assuming these variables are defined somewhere in your script:
# dlwp, validation_data, io_selection, integration_steps, add_solar, 
# io_time_steps, data_interval, batch_size, channels_last

input_solar = (integration_steps > 1 and add_solar)

# Prepare the validation data
print('Loading validation data to memory...')
val_array, input_ind, output_ind, sol = prepare_data_array(
    validation_data,
    input_sel=io_selection,
    output_sel=io_selection,
    add_insolation=add_solar
)

# Instantiate the validation data generator
val_generator = ArrayDataGenerator(
    dlwp,
    val_array,
    rank=3,
    input_slice=input_ind,
    output_slice=output_ind,
    input_time_steps=io_time_steps,
    output_time_steps=io_time_steps,
    sequence=integration_steps,
    interval=data_interval,
    insolation_array=sol,
    batch_size=batch_size,
    shuffle=False,
    channels_last=True
)

# Define a simple data generator for demonstration
def my_data_generator():
    while True:
        inputs = np.random.random((10, 2, 2, 2)).astype(np.float32)  # Example input batch
        outputs = np.random.random((10, 2, 2, 2)).astype(np.float32)  # Example output batch
        yield (inputs, outputs)

# TensorFlow dataset from the custom generator
output_signature = (
    tf.TensorSpec(shape=(None, 2, 2, 2), dtype=tf.float32),
    tf.TensorSpec(shape=(None, 2, 2, 2), dtype=tf.float32)
)

dataset = tf.data.Dataset.from_generator(
    generator=my_data_generator,
    output_signature=output_signature
)

# Example usage of the dataset
for data in dataset.take(1):
    print(data)  # Outputs the generated data for verification

# Define a wrapper function for TensorFlow's Dataset API using the generator
def tf_data_generator(generator):
    def gen():
        for inputs, outputs in generator:
            yield (inputs, outputs)
    return tf.data.Dataset.from_generator(
        generator=gen,
        output_signature=(
            tf.TensorSpec(shape=(None, 2, 2, 2), dtype=tf.float32),
            tf.TensorSpec(shape=(None, 2, 2, 2), dtype=tf.float32)
        )
    )

# Instantiate the data generator
data_gen_instance = my_data_generator()

# Create a TensorFlow dataset for training data
tf_train_data = tf_data_generator(data_gen_instance)


Loading validation data to memory...
(<tf.Tensor: shape=(10, 2, 2, 2), dtype=float32, numpy=
array([[[[0.77889913, 0.48702195],
         [0.02883874, 0.8847574 ]],

        [[0.03438853, 0.28276706],
         [0.55697274, 0.01757421]]],


       [[[0.5032731 , 0.8122299 ],
         [0.74255896, 0.9718702 ]],

        [[0.24370481, 0.7055796 ],
         [0.45572832, 0.931594  ]]],


       [[[0.34183827, 0.69231576],
         [0.20078963, 0.36029312]],

        [[0.86367744, 0.96073234],
         [0.63765675, 0.97424525]]],


       [[[0.37238586, 0.23333843],
         [0.80616033, 0.29663953]],

        [[0.24276634, 0.3667157 ],
         [0.04228544, 0.786869  ]]],


       [[[0.2313752 , 0.07551286],
         [0.17696486, 0.3855166 ]],

        [[0.01171225, 0.7824096 ],
         [0.04857456, 0.571879  ]]],


       [[[0.05471338, 0.04035477],
         [0.700167  , 0.37426528]],

        [[0.95976245, 0.28749397],
         [0.53857994, 0.70854264]]],


       [[[0.48428163, 0.3078162

2024-04-24 01:47:54.744533: W tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


In [17]:
from tensorflow.keras.layers import Input, Conv2D, UpSampling2D, AveragePooling2D, ReLU, ZeroPadding2D ,Concatenate
from tensorflow.keras.models import Model

# Create TensorFlow datasets for training and validation


# Example dimensions for 'cs'
cs = (2, 2, 2)  # Example: 128x128 RGB images

# Define input
main_input = Input(shape=cs, name='main_input')

# Define standard convolutional layers
conv1 = Conv2D(2, 10, padding='same', activation='relu')(main_input)
pool1 = AveragePooling2D((2, 2))(conv1)
up1 = UpSampling2D((2, 2))(pool1)
relu = ReLU()(up1)

# Define output channels for a binary segmentation task
output_channels = 2
output = Conv2D(output_channels, 1, activation='sigmoid', name='output')(relu)

# Create model using the Functional API
model = Model(inputs=main_input, outputs=output)

model.summary()  # Display the structure of the model



In [18]:
model.compile(optimizer='adam', loss='mean_squared_error', metrics=['accuracy'])


In [19]:
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, History, TensorBoard
from tensorflow.keras.layers import ZeroPadding2D
from DLWP.custom import EarlyStoppingMin, GeneratorEpochEnd

# Directory for saving logs
log_directory = './logs'

# Callbacks setup
checkpoint = ModelCheckpoint('path_to_save_model.keras', save_best_only=True, monitor='val_loss', mode='min')
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10)
history = History()
early = EarlyStoppingMin(monitor='val_loss' if validation_data is not None else 'loss', min_delta=0., min_epochs=min_epochs, max_epochs=max_epochs, patience=patience, restore_best_weights=True, verbose=1)
tensorboard = TensorBoard(log_dir=log_directory, update_freq='epoch')

validation_data = tf_val_data  # This should be defined somewhere in your script if used
min_epochs = 1
max_epochs = 5
patience = 5


In [21]:
# Fit the model with the training data
model.fit(tf_train_data, validation_data=tf_val_data, epochs=max_epochs, callbacks=[history, early, tensorboard, checkpoint, reduce_lr])

Epoch 1/5
33252456/Unknown [1m31820s[0m 957us/step - accuracy: 0.5301 - loss: 0.0849

TypeError: Expected `context` argument in EagerTensor constructor to have a `_handle` attribute but it did not. Was eager Context initialized?