In [1]:
initial_path = 'peptide-QML'
# initial_path = '..'

In [2]:
import sys, torch, copy
import torch.nn as nn
import torch.nn.functional as F
sys.path.append(initial_path)

%load_ext autoreload
%autoreload 2
from my_code import helper_classes as c
from my_code import quantum_nodes as q
from my_code.vae import VAE as VAE

In [3]:
class ScorePredictor(c.Module):
    def __init__(self, latent_dim:int):
        super(ScorePredictor, self).__init__()

        # quantum circuit
        quantum_circuit = q.circuit(
            n_qubits = int(q.np.ceil(q.np.log2(latent_dim))),
            device = "default.qubit.torch",
            device_options = {'shots': None},
            embedding = q.parts.AmplitudeEmbedding,
            # embedding_ansatz = sweep_point['ansatz'],
            block_ansatz = q.parts.Ansatz_11,
            final_ansatz = q.parts.Ansatz_11, 
            measurement = q.parts.Measurement('Z', 1),
            # embedding_n_layers = sweep_point['embedding_n_layers'],
            # different_inputs_per_layer = False,
            block_n_layers = 10,
            # wrapper_qlayer = pw.QLayerEmpty,
        )
        self.quantum_descprition = str(quantum_circuit)

        # layers of the model
        self.quantum_layer = quantum_circuit()
        self.post_quantum = nn.Linear(1, 1)
    
    def forward(self, x):
        x = self.quantum_layer(x)
        x = self.post_quantum(x)
        x = x.squeeze(-1)
        return x
    
    @staticmethod
    def loss_function(SP_out, batch, reduction:str='mean'):
        x, y = batch
        return F.mse_loss(SP_out, y.float(), reduction=reduction)
    
    def save(self, path):
        copy_of_self = c.copy.deepcopy(self).to('cpu')
        copy_of_self.quantum_layer = None
        torch.save(copy_of_self, path)

In [4]:
device = "cpu"

# load the vae and define the score predictor
name = 'vae_TEST.pickle'
vae_model = VAE.load(initial_path+'/saved/Pickle/VAE-'+name).to(device)
score_predictor = ScorePredictor(latent_dim=vae_model.hyparams['latent_dim']).to(device)

#data
data = c.Data.load(initial_path=initial_path, file_name='PET_SCORES_12').to(device)


In [5]:
# data_encoded = copy.deepcopy(data)
# data_encoded.x_train = vae_model.encoder(data.x_train)[0]
# data_encoded.x_test = vae_model.encoder(data.x_test)[0]

data_encoded_noise = copy.deepcopy(data)
data_encoded_noise.x_train = VAE.reparameterize(vae_model.encoder(data.x_train))
data_encoded_noise.x_test = VAE.reparameterize(vae_model.encoder(data.x_test))

: 

In [None]:
# Define optimizer
optimizer = c.Optimizer(vae_model, torch.optim.Adam, {'lr': 3e-4})

# training
optimizer.optimize_parameters(
    data=data_encoded_noise,
    n_epochs=5,
    batch_size=64,
    validation=True,
    save=True,
    save_path=initial_path+'/saved/Pickle/SP-'+name,
    test_ptc=0.1,
    loss_fn_options={'reduction': 'sum'},
    early_stopping_options={'patience': 5, 'min_delta': 0.001},
)

In [None]:
# find vector in laten space that gives the lowest score
n_sequences = 10
latent_sequences, scores_predicted, sequences, scores = [], [], [], []
for i in range(n_sequences):
    vector = torch.randn(1, vae_model.hyparams['latent_dim'])
    vector.requires_grad = True
    optimizer = torch.optim.Adam([vector], lr=0.01)
    for i in range(100):

        optimizer.zero_grad()
        score_predicted = score_predictor(vector)
        score_predicted.backward()
        optimizer.step()

        print(f'Epoch {i+1}/1000, score={score_predicted.item():.6f}                                                       ', end='\r')

        # early stopping
        patience, min_delta = 10, 0.01
        if i > patience:
            score_difference = q.np.mean(score_predicted.item() - score_predicted.item()) - score_predicted.item()
            if score_difference < min_delta:
                print('Early stopping', end='\r')
                break
    latent_sequences.append(vector)
    scores_predicted.append(score_predicted.item())
    sequences.append(vae_model.decoder(vector).tolist())
    encoded_new_seq = torch.cat(list(vae_model.encoder(torch.tensor([sequences[-1]]))), dim=-1)
    scores.append(score_predictor(encoded_new_seq).item())
    print(f'sequence generated, score pred={score_predicted.item()}, real score={scores[-1].item()}\n')

In [None]:
new_sequence_decoded = vae_model.decoder(vector)
print(new_sequence_decoded)
new_sequence = vae_model.process_output(new_sequence_decoded)
print(new_sequence)
score_new_sequence = score_predictor(vae_model.encode(new_sequence)[0])
print(score_new_sequence)

In [None]:
class VAE_w_ScorePredictor(c.Module):
    def __init__(self, vae, score_predictor):
        super(VAE_w_ScorePredictor, self).__init__()
        self.vae = vae
        self.score_predictor = score_predictor

    def forward(self, x):
        vae_out = self.vae(x)
        score = self.score_predictor(self.vae.reparameterize(vae_out))
        return vae_out, score
    
    @staticmethod
    def loss_function(model_out, batch, **loss_fn_options):
        vae_out, score = model_out
        vae_loss = VAE.loss_function(vae_out, batch, **loss_fn_options)
        score_loss = ScorePredictor.loss_function(score, batch, **loss_fn_options)
        return vae_loss + score_loss
    
    validation_return = ['vae_loss', 'vae_acc', 'score_loss']
    def validation(self, batch, loss_fn_options:dict={}):
        x, y = batch
        with torch.no_grad():
            vae_out = self.vae(x)
            vae_loss = VAE.loss_function(vae_out, batch, **loss_fn_options)

            x_pred = VAE.process_output(self.decoder(vae_out[1]))
            vae_acc = (x_pred == x).float().mean()

            score = self.score_predictor(self.vae.reparameterize(vae_out))
            score_loss = ScorePredictor.loss_function(score, batch, **loss_fn_options)
        return vae_loss.item(), vae_acc.item(), score_loss.item()