## Testing staircase

In [None]:
from moisture_rnn import staircase, staircase_2
import numpy as np
from data_funcs import plot_data

In [None]:
# training data shape   [batches,batch_size, sequence_length, features] 
# also called           [samples,timesteps,features]
# input data size       [trainsteps,features] 

In [None]:
datapoints=10
timesteps=3
features=1
outputs=1
x=np.tile(range(datapoints), (features, 1)).T
y=np.tile(range(datapoints), (outputs, 1)).T
# print('x =',x)
print('x shape =',x.shape)
# print('y =',y)
print('y shape =',y.shape)

In [None]:
# the original staircase, offset by one, all in one batch, no hidden state passed
x_train, y_train = staircase(x,y,timesteps,datapoints,return_sequences=False, verbose = True)
print('x_train shape =',x_train.shape)
print('y_train shape =',y_train.shape)
# print('x_train =',x_train)
# print('y_train =',y_train)


In [None]:
# new staircase, hidden state passed between batches
x_train, y_train = staircase_2(x,y,timesteps,batch_size=3,return_sequences=False, verbose = True)
print('x_train shape =',x_train.shape)
print('y_train shape =',y_train.shape)
print('x_train =',x_train)
print('y_train =',y_train)

## Testing RNN training on time lag problem

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import LSTM, Dense, SimpleRNN
from data_funcs import plot_data

# Generate sample time series data (replace with your actual data)
hours = 500  #Ensure divisible by batch size and lookback
x = 10*(1+np.cos(np.linspace(0, hours, hours)*2*np.pi/24))  # daily
x = x+5*np.exp(np.sin(np.linspace(0, hours, hours)*2*np.pi/240))# 10 day cycle
x = x + 1.0*np.random.randn(*x.shape) # random
x = x/35
y = np.zeros((hours))
z = np.zeros((hours))
for i in range(1,hours):
    y[i] = y[i-1] + (x[i-1] - y[i-1])/10.
    z[i] = z[i-1] + (x[i-1] - z[i-1])/3.
y = (y + z)/2
x=np.reshape(x,[-1,1])
y=np.reshape(y,[-1,1])
print('x.shape',x.shape)
print('y.shape',y.shape)

In [None]:
plot_data({'E':x,'m':y},title="Generated equilibrium  ")

In [None]:
# Create training data with lookback, offset by one, all one batch
x_train, y_train = [], []
timesteps = 10
for i in range(len(x) - timesteps):
    x_train.append(x[i:i+timesteps])  # Create sequences of 5 timesteps
    y_train.append(y[i+timesteps])
x_train, y_train = np.array(x_train), np.array(y_train)

# Reshape input data for RNN
x_train = x_train.reshape(-1, timesteps, 1)  # Print x_train.shape to verify
print(x_train)
print('x_train.shape',x_train.shape)
print('y_train.shape',y_train.shape)

In [None]:
batch_size=32
x_train, y_train = staircase_2(x,y,timesteps,batch_size,return_sequences=False, verbose = True)

In [None]:
from keras.callbacks import Callback

class ResetStatesCallback(Callback):
    def on_epoch_end(self, epoch, logs=None):
        self.model.reset_states()
        
from tensorflow.keras.optimizers import Adam

# Instantiate an optimizer with a custom learning rate
optimizer = Adam(learning_rate=0.001)  

# Define the stateful RNN model
RNN=SimpleRNN
#RNN=LSTM
#RNN=GRU
epochs=50
activation='tanh'
activation='linear'
cells=1
training = 2

model = Sequential()
model.add(RNN(cells, stateful=True, batch_input_shape=(batch_size, timesteps, 1),activation=activation))  
model.add(Dense(1))  # Output layer for single-value prediction
model.compile(loss='mean_squared_error', optimizer=optimizer)

# Train the model (manual state resetting)'print('x_train.shape',x_train.shape)
print('x_train.shape',x_train.shape)
print('y_train.shape',y_train.shape)

if training == 1:
    for i in range(epochs):
        model.fit(x_train, y_train, epochs=1, batch_size=batch_size, verbose=1, shuffle=False)
        model.reset_states()
elif training == 2:
    model.fit(x_train, y_train, epochs=epochs, batch_size=batch_size, verbose=1, shuffle=False, 
              callbacks=[ResetStatesCallback()])
elif training == 3:
    sequences = x_train.shape[0]
    batches = sequences // batch_size
    print('x_train has',sequences,'sequences',batches,'batches')
    for i in range(epochs):
        for j in range(batches):
            print('iteration',i,'batch',j,'size',batch_size)
            batch_start = j*batch_size
            batch_next = batch_start + batch_size 
            model.fit(x_train[batch_start:batch_next], y_train[batch_start:batch_next], 
                      epochs=1, batch_size=batch_size, verbose=1, shuffle=False)
        model.reset_states()  # at the end of each iteration = epoch
else:
    raise ValueError('training must be 1 or 2 or 3')
# print('weights',model.get_weights())

In [None]:
# Define the stateless RNN model - to be used on data in a single sequence
model2 = Sequential()
model2.add(RNN(cells, stateful=False,input_shape=(None,1),activation=activation)) 
model2.add(Dense(1))  # Output layer for single-value prediction
model2.compile(loss='mean_squared_error', optimizer='adam')

# transfer weights, predict, plot
w=model.get_weights()
model2.set_weights(w)
z = model2.predict(x.reshape((-1,1))) # inout and output need to be shape (-1,1), ie columns
plot_data({'x':x,'y':y,'z':z},xlabel='',ylabel='',title='test')