In [1]:
import os
import sys
import matplotlib.pyplot as plt
import numpy as np
import torch

sys.path.append(os.path.join(os.getenv("HOME"), "RNN_Manifold/"))
from manifold_encoder_decoder import s1_direct_product_generator, geometry_util, s1_direct_product_dimension_detecting_decoder

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

First, generate some synthetic ring data

In [3]:
encoder, decoder = s1_direct_product_generator.train(1, 12, device, n_training_iterations=3000)
angles = np.arange(start=0, stop=2 * np.pi, step=0.01)
with torch.no_grad():
    points = geometry_util.torch_angles_to_ring(torch.tensor(angles, dtype=torch.get_default_dtype()).to(device))
    points = torch.unsqueeze(points, -2)
    ring_embedded_points = encoder(points)
ring_embedded_points = ring_embedded_points.cpu().numpy()




iteration: 0, decoding loss: 1.0021473169326782, distance cost: 0.007981647737324238
iteration: 1, decoding loss: 0.6958116888999939, distance cost: 0.016865679994225502
iteration: 2, decoding loss: 0.4463106393814087, distance cost: 0.02754906937479973
iteration: 3, decoding loss: 0.36479389667510986, distance cost: 0.054413050413131714
iteration: 4, decoding loss: 0.31404340267181396, distance cost: 0.0953064039349556
iteration: 15, decoding loss: 0.31422102451324463, distance cost: 0.04604378715157509
iteration: 16, decoding loss: 0.24714291095733643, distance cost: 0.039222944527864456
iteration: 17, decoding loss: 0.20915479958057404, distance cost: 0.02600986883044243
iteration: 18, decoding loss: 0.19301258027553558, distance cost: 0.021532151848077774
iteration: 32, decoding loss: 0.15533214807510376, distance cost: 0.036485131829977036
iteration: 33, decoding loss: 0.15926557779312134, distance cost: 0.030629189684987068
iteration: 59, decoding loss: 0.16132749617099762, dista

KeyboardInterrupt: 

In [None]:
ring_embedded_points = ring_embedded_points/np.mean(np.abs(ring_embedded_points))

Now we will detect the dimension of and decode the ring data we just generated. The below code looks for the best manifold of dimension max_n_dimensions or less and fits it.

In [None]:
import importlib
max_n_dimensions = 4
importlib.reload(s1_direct_product_dimension_detecting_decoder)
encoder, decoder, geometry_profile = s1_direct_product_dimension_detecting_decoder.train(ring_embedded_points, max_n_dimensions, device, decoder_weight=10, scrambling_weight=1, order_red_weight=1, n_training_iterations=15000)


The last return of this function, geometry_profile, contains the optimized value of scrambling_weights. If training worked, this should be close to 1 everywhere except for one index, which should be close to zero. This means the training process identified the fact that the underlying manifold had only one dimension. The index of the near-zero element, the "primary dimension" is the dimension we should use to address the 1D manifold.

In [None]:
geometry_profile = geometry_profile.cpu().detach().numpy()

In [None]:
used_dimension = np.argmin(geometry_profile)
print(used_dimension)

If training worked, the encoder should not be sensitive to inputs except for the primary dimension. We can test this.

In [None]:
test_batch_size = 100
all_test_phases = np.random.uniform(-np.pi, np.pi, (test_batch_size, max_n_dimensions))
all_test_phases[:, used_dimension] = np.ones(test_batch_size) * np.random.uniform(-np.pi, np.pi)

In [None]:
with torch.no_grad():
    test_embeddings = encoder(torch.tensor(geometry_util.angles_to_ring(all_test_phases), dtype=torch.get_default_dtype()).to(device)).cpu().numpy()


In [None]:
mean_result = np.mean(test_embeddings, axis=0)
embedding_angle_shifts = np.arccos(np.einsum("j, ij -> i", mean_result, test_embeddings)/(np.sqrt(mean_result.dot(mean_result) * np.einsum("ij, ij -> i", test_embeddings, test_embeddings))))

In [None]:
fig, ax = plt.subplots()
ax.hist(embedding_angle_shifts)
ax.set_xlabel("Angular Dispalcement of Embedded Vector (rad)")
ax.set_ylabel("Counts")

We can see that the embedding vector is only displaced slightly when completely random phases are supplied to the non-primary dimensions.

We can also check that the returned decoder produces the right phases compared to ground truth values,

In [None]:
with torch.no_grad():
    decoded_points, decoded_angles = decoder(torch.tensor(ring_embedded_points, dtype=torch.get_default_dtype()).to(device))

predicted_phases = torch.squeeze(decoded_angles).cpu().numpy()[:, used_dimension]


In [None]:
def reference_phases(phases):
    phases_refd = phases - phases[0]
    phases_refd = np.arctan2(np.sin(phases_refd), np.cos(phases_refd))
    return phases_refd * np.sign(phases_refd[int(len(phases)/4)])


In [None]:
def compare_to_ground_truth(predicted_phases, ground_truth_phases, plot_ax):
    refd_test_phases = reference_phases(predicted_phases)
    refd_true_phases = reference_phases(ground_truth_phases)
    line = np.arange(start=-np.pi, stop=np.pi, step=0.01)
    plot_ax.scatter(refd_true_phases, refd_test_phases)
    plot_ax.plot(line, line, color="black", linestyle="--", label="y=x")
    plot_ax.set_xlabel("True Phase")
    plot_ax.set_ylabel("Found Phase")
    return refd_test_phases, refd_true_phases


In [None]:
%matplotlib inline
fig, ax = plt.subplots()
refd_predicted_phases, refd_true_phases = compare_to_ground_truth(predicted_phases, angles, ax)
