### SHRED for ROMs

We first randomly select 3 sensor locations and set the trajectory length (lags) to 52, which is hyperparameter tuned.

In [None]:
import scipy.io as sio
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation

u_total = np.load('u_total.npy')
v_total = np.load('v_total.npy')
s_total = np.load('s_total.npy')

plasma_data = sio.loadmat('data14fields/ne')
utemp = plasma_data['Data']
X = utemp - np.mean(utemp, axis=0)
Xnorm= np.max(np.abs(X))
X = X/Xnorm


print(u_total.shape)
print(v_total.shape)
print(s_total.shape)
print(X.shape)
n2 = (X).shape[0]
print(n2)
m2 = s_total.shape[1]  # svd modes used
print(m2)

In [None]:
np.max(np.abs(X))

In [None]:
from processdata import load_data
from processdata import TimeSeriesDataset
import models
import torch
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

num_sensors = 3 
lags = 52

nx = 257
ny = 256

load_X = v_total.T

sensor_locations_ne = np.random.choice(n2, size=num_sensors, replace=False)
sensor_locations = [0, 1, 2]

load_X = np.hstack((X[sensor_locations_ne,:].T,load_X))
np.save('load_X.npy', load_X)

plt.imshow(load_X)
n = (load_X).shape[0]
m = (load_X).shape[1]


In [None]:
from matplotlib.colors import LinearSegmentedColormap

print(sensor_locations_ne)
mask = np.zeros(n2)
mask[sensor_locations_ne[0]]=1
mask[sensor_locations_ne[1]]=1
mask[sensor_locations_ne[2]]=1

mask2 = mask.reshape((nx,ny))


fig = plt.figure(figsize=(25, 20))
ax = fig.add_subplot(2, 1, 1)
plt.imshow(mask2, cmap='gray')

plt.savefig('measure.pdf')

#fig = plt.figure(figsize=(15, 10))
#x = fig.add_subplot(2, 1, 2)
#plt.plot(X[sensor_locations_ne,:].T)

#plt.savefig('sensing.pdf')


We now select indices to divide the data into training, validation, and test sets.

In [None]:
# RECONSTRUCTION MODE
train_indices = np.random.choice(n - lags, size=500, replace=False)
mask = np.ones(n - lags)
mask[train_indices] = 0
valid_test_indices = np.arange(0, n - lags)[np.where(mask!=0)[0]]
valid_indices = valid_test_indices[::2]
test_indices = valid_test_indices[1::2]

# FORECASTING MODE
# train_indices = np.arange(0, int(n*0.85))
# valid_indices = np.arange(int(n*0.85), int(n*0.85) + 20)
# test_indices = np.arange(int(n*0.85) + 20, n - lags)


sklearn's MinMaxScaler is used to preprocess the data for training and we generate input/output pairs for the training, validation, and test sets. 

In [None]:
sc = MinMaxScaler()
sc = sc.fit(load_X[train_indices])
transformed_X = sc.transform(load_X)

### Generate input sequences to a SHRED model
all_data_in = np.zeros((n - lags, lags, num_sensors))
for i in range(len(all_data_in)):
    all_data_in[i] = transformed_X[i:i+lags, sensor_locations]

### Generate training validation and test datasets both for reconstruction of states and forecasting sensors
device = 'cuda' if torch.cuda.is_available() else 'cpu'

train_data_in = torch.tensor(all_data_in[train_indices], dtype=torch.float32).to(device)
valid_data_in = torch.tensor(all_data_in[valid_indices], dtype=torch.float32).to(device)
test_data_in = torch.tensor(all_data_in[test_indices], dtype=torch.float32).to(device)

### -1 to have output be at the same time as final sensor measurements
train_data_out = torch.tensor(transformed_X[train_indices + lags - 1], dtype=torch.float32).to(device)
valid_data_out = torch.tensor(transformed_X[valid_indices + lags - 1], dtype=torch.float32).to(device)
test_data_out = torch.tensor(transformed_X[test_indices + lags - 1], dtype=torch.float32).to(device)

train_dataset = TimeSeriesDataset(train_data_in, train_data_out)
valid_dataset = TimeSeriesDataset(valid_data_in, valid_data_out)
test_dataset = TimeSeriesDataset(test_data_in, test_data_out)

We train the model using the training and validation datasets.

In [None]:
shred = models.SHRED(num_sensors, m, hidden_size=64, hidden_layers=2, l1=350, l2=400, dropout=0.1).to(device)
validation_errors = models.fit(shred, train_dataset, valid_dataset, batch_size=64, num_epochs=3000, lr=1e-3, verbose=True, patience=5)

Finally, we generate reconstructions from the test set and print mean square error compared to the ground truth.

In [None]:
test_recons = sc.inverse_transform(shred(test_dataset.X).detach().cpu().numpy())
test_ground_truth = sc.inverse_transform(test_dataset.Y.detach().cpu().numpy())
print(np.linalg.norm(test_recons - test_ground_truth) / np.linalg.norm(test_ground_truth))

In [None]:
fig = plt.figure(figsize=(15, 10))
ax = fig.add_subplot(2, 3, 1)
plt.plot(test_recons[10])
ax = fig.add_subplot(2, 3, 4)
plt.plot(test_ground_truth[10])
ax = fig.add_subplot(2, 3, 2)
plt.plot(test_recons[50])
ax = fig.add_subplot(2, 3, 5)
plt.plot(test_ground_truth[50])
ax = fig.add_subplot(2, 3, 3)
plt.plot(test_recons[150])
ax = fig.add_subplot(2, 3, 6)
plt.plot(test_ground_truth[150])




In [None]:
fig = plt.figure(figsize=(15, 10))
ax = fig.add_subplot(2, 1, 1)
plt.imshow(test_recons[:,3:])
ax = fig.add_subplot(2, 1, 2)
plt.imshow(test_ground_truth[:,3:])


In [None]:
print(test_ground_truth.shape)
print(test_recons.shape)

In [None]:

fig = plt.figure(figsize=(15, 10))
ax = fig.add_subplot(2, 1, 1)
x=test_recons[:,[40]]
plt.plot(x)
ax = fig.add_subplot(2, 1, 2)
x=test_ground_truth[:,[40]]
plt.plot(x)


In [None]:
fig = plt.figure(figsize=(15, 10))
ax = fig.add_subplot(2, 1, 1)
x=test_recons[:,[140]]
plt.plot(x)
ax = fig.add_subplot(2, 1, 2)
x=test_ground_truth[:,[140]]
plt.plot(x)


In [None]:
fig = plt.figure(figsize=(15, 20))
mpoint = 20000

for jj in range(14):
    ax = fig.add_subplot(7, 2, jj+1)
    upca = u_total[:, jj*m2:(jj+1)*m2]
    spca = s_total[jj, :]
    vpca1 = test_ground_truth[:, jj*m2+3:(jj+1)*m2+3]
    vpca2 = test_recons[:,jj*m2+3:(jj+1)*m2+3]
    
    u1svd = upca @ np.diag(spca) @ vpca1.T
    u2svd = upca @ np.diag(spca) @ vpca2.T
    
    plt.plot(u1svd[mpoint,100:400], color='gray')
    plt.plot(u2svd[mpoint,100:400])
    ax.set_xticks([])
    ax.set_xticklabels([])
    ax.axis('off')

   # ax.set_title(f"Plot {jj+1}")

# Adjust layout for better spacing
plt.tight_layout()

# Show the plot
plt.savefig('timeseries.pdf')
plt.show()

In [None]:
fig = plt.figure(figsize=(20, 20))

loop1=[0,1,2,3,4,5,6]
loop2=[7,8,9,10,11,12,13]

for jj in loop1:
    ax = fig.add_subplot(1, 7, jj+1)

    upca = u_total[:, jj*m2:(jj+1)*m2]
    spca = s_total[jj, :]
    vpca1 = test_ground_truth[:, jj*m2+3:(jj+1)*m2+3]

    u1svd = upca @ np.diag(spca) @ vpca1.T
    snap_true = u1svd[0:nx*ny, j].reshape((nx, ny)).T
    ax.imshow(snap_true,cmap='RdBu_r', interpolation='bilinear')
    ax.axis('off')
    plt.tight_layout()

plt.savefig('comp1.pdf')
plt.show()    

fig = plt.figure(figsize=(20, 20))
for jj in loop1:
    ax = fig.add_subplot(1, 7, jj+1)
    upca = u_total[:, jj*m2:(jj+1)*m2]
    spca = s_total[jj, :]
    #vpca1 = test_ground_truth[:, jj+3:jj+m2+3]
    vpca2 = test_recons[:,jj*m2+3:(jj+1)*m2+3]

    #u1svd = upca @ np.diag(spca) @ vpca1.T
    u2svd = upca @ np.diag(spca) @ vpca2.T
    
    #snap_true = u1svd[0:nx*ny, j].reshape((nx, ny))
    snap_test = u2svd[0:nx*ny,j].reshape((nx,ny)).T
    ax.imshow(snap_test,cmap='RdBu_r', interpolation='bilinear')

    ax.axis('off')
    plt.tight_layout()

plt.savefig('comp2.pdf')
plt.show()    




fig = plt.figure(figsize=(20, 20))
for jj in loop2:
    ax = fig.add_subplot(1, 7, jj-6)

    upca = u_total[:, jj*m2:(jj+1)*m2]
    spca = s_total[jj, :]
    vpca1 = test_ground_truth[:, jj*m2+3:(jj+1)*m2+3]

    u1svd = upca @ np.diag(spca) @ vpca1.T
    snap_true = u1svd[0:nx*ny, j].reshape((nx, ny)).T
    ax.imshow(snap_true,cmap='RdBu_r', interpolation='bilinear')
    ax.axis('off')
    plt.tight_layout()

plt.savefig('comp3.pdf')
plt.show()    

fig = plt.figure(figsize=(20, 20))
for jj in loop2:
    ax = fig.add_subplot(1, 7, jj-6)
    upca = u_total[:, jj*m2:(jj+1)*m2]
    spca = s_total[jj, :]
    #vpca1 = test_ground_truth[:, jj+3:jj+m2+3]
    vpca2 = test_recons[:, jj*m2+3:(jj+1)*m2+3]

    #u1svd = upca @ np.diag(spca) @ vpca1.T
    u2svd = upca @ np.diag(spca) @ vpca2.T
    
    #snap_true = u1svd[0:nx*ny, j].reshape((nx, ny))
    snap_test = u2svd[0:nx*ny,j].reshape((nx,ny)).T
    ax.imshow(snap_test,cmap='RdBu_r', interpolation='bilinear')

    ax.axis('off')
    plt.tight_layout()

plt.savefig('comp4.pdf')
plt.show()    
    
    
    