In [116]:
import random
import numpy as np
import torch

def set_seed(seed: int):
    """
    Set seeds for reproducibility.

    Parameters:
    seed (int): Seed value.
    """
    # Pythonの組み込み乱数生成器のシードを固定
    random.seed(seed)

    # NumPyの乱数シードを固定
    np.random.seed(seed)

    # PyTorchの乱数シードを固定
    torch.manual_seed(seed)

    # GPUが利用可能な場合の追加設定
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # 複数GPU用

    # PyTorchの再現性を確保する追加設定
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# 再現性のためのシードを固定
set_seed(42)

In [117]:
import torch
from torch.utils.data import DataLoader, TensorDataset, random_split
import numpy as np

# np.random.seed(42)
# torch.manual_seed(42)

# Parameters for synthetic data
num_samples = 1000
train_ratio = 0.7
val_ratio = 0.2
batch_size = 32
time_series_dim = 100  # Dimensionality of time series data
nlp_vector_dim = 768  # Dimensionality of NLP vector data
z_dim = 64  # Latent variable dimension

# Generate synthetic time series data (e.g., sensor data)
time_series_data = np.random.rand(num_samples, time_series_dim).astype(np.float32)

# Generate synthetic NLP vector data (e.g., embeddings from a language model)
nlp_vector_data = np.random.rand(num_samples, nlp_vector_dim).astype(np.float32)

# Convert data to tensors
time_series_tensor = torch.tensor(time_series_data)
nlp_vector_tensor = torch.tensor(nlp_vector_data)

# Create TensorDataset and DataLoader
dataset = TensorDataset(time_series_tensor, nlp_vector_tensor)

# Split dataset into train, validation, and test sets
train_size = int(train_ratio * num_samples)
val_size = int(val_ratio * num_samples)
test_size = num_samples - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Create DataLoaders
generator = torch.Generator()
generator.manual_seed(42)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, generator=generator)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


# Display sample data
sample_time_series, sample_nlp_vector = next(iter(train_loader))
print("Sample Time Series Shape:", sample_time_series.shape)
print("Sample NLP Vector Shape:", sample_nlp_vector.shape)


Sample Time Series Shape: torch.Size([32, 100])
Sample NLP Vector Shape: torch.Size([32, 768])


In [4]:
!pip install pixyz

Collecting pixyz
  Downloading pixyz-0.3.3-py3-none-any.whl.metadata (14 kB)
Requested pixyz from https://files.pythonhosted.org/packages/8f/c3/35083628485cd09c2be0216f8c434b40a877561d44dde2f2b1fe934412d3/pixyz-0.3.3-py3-none-any.whl has invalid metadata: Expected matching RIGHT_PARENTHESIS for LEFT_PARENTHESIS, after version specifier
    flake8 (==3.9.2pytest-cov) ; extra == 'dev'
           ~~~~~~~~^
Please use pip<24.1 if you need to use this version.[0m[33m
[0m  Downloading pixyz-0.3.2-py3-none-any.whl.metadata (13 kB)
  Using cached pixyz-0.3.3-py3-none-any.whl.metadata (14 kB)
Requested pixyz from https://files.pythonhosted.org/packages/8f/c3/35083628485cd09c2be0216f8c434b40a877561d44dde2f2b1fe934412d3/pixyz-0.3.3-py3-none-any.whl has invalid metadata: Expected matching RIGHT_PARENTHESIS for LEFT_PARENTHESIS, after version specifier
    flake8 (==3.9.2pytest-cov) ; extra == 'dev'
           ~~~~~~~~^
Please use pip<24.1 if you need to use this version.[0m[33m
Collecting jed

In [118]:
x_dim_ts = 100  # 時系列データの次元数
y_dim_nlp = 768  # 言語ベクトルの次元数 (例: BERT埋め込み)
z_dim = 64  # 潜在変数の次元数

In [119]:
from pixyz.distributions import Normal, Bernoulli, Categorical
from pixyz.losses import KullbackLeibler
from pixyz.models import VAE
from pixyz.utils import print_latex
# from __future__ import print_function
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
from torchvision import datasets, transforms
# from tensorboardX import SummaryWriter

from tqdm import tqdm

class Inference(Normal):
    def __init__(self):
        super(Inference, self).__init__(var=["z"], cond_var=["x_ts", "y_nlp"], name="q")

        self.fc1 = nn.Linear(x_dim_ts + y_dim_nlp, 512)
        self.fc2 = nn.Linear(512, 512)
        self.fc31 = nn.Linear(512, z_dim)
        self.fc32 = nn.Linear(512, z_dim)

    def forward(self, x_ts, y_nlp):
        h = F.relu(self.fc1(torch.cat([x_ts, y_nlp], 1)))
        h = F.relu(self.fc2(h))
        return {"loc": self.fc31(h), "scale": F.softplus(self.fc32(h))}

class InferenceX(Normal):
    def __init__(self):
        super(InferenceX, self).__init__(var=["z"], cond_var=["x_ts"], name="q")

        self.fc1 = nn.Linear(x_dim_ts, 512)
        self.fc2 = nn.Linear(512, 512)
        self.fc31 = nn.Linear(512, z_dim)
        self.fc32 = nn.Linear(512, z_dim)

    def forward(self, x_ts):
        h = F.relu(self.fc1(x_ts))
        h = F.relu(self.fc2(h))
        return {"loc": self.fc31(h), "scale": F.softplus(self.fc32(h))}

class InferenceY(Normal):
    def __init__(self):
        super(InferenceY, self).__init__(var=["z"], cond_var=["y_nlp"], name="q")

        self.fc1 = nn.Linear(y_dim_nlp, 512)
        self.fc2 = nn.Linear(512, 512)
        self.fc31 = nn.Linear(512, z_dim)
        self.fc32 = nn.Linear(512, z_dim)

    def forward(self, y_nlp):
        h = F.relu(self.fc1(y_nlp))
        h = F.relu(self.fc2(h))
        return {"loc": self.fc31(h), "scale": F.softplus(self.fc32(h))}

class GeneratorX(Bernoulli):
    def __init__(self):
        super(GeneratorX, self).__init__(var=["x_ts"], cond_var=["z"], name="p")

        self.fc1 = nn.Linear(z_dim, 512)
        self.fc2 = nn.Linear(512, 512)
        self.fc3 = nn.Linear(512, x_dim_ts)

    def forward(self, z):
        h = F.relu(self.fc1(z))
        h = F.relu(self.fc2(h))
        return {"probs": torch.sigmoid(self.fc3(h))}

class GeneratorY(Normal):
    def __init__(self):
        super(GeneratorY, self).__init__(var=["y_nlp"], cond_var=["z"], name="p")

        self.fc1 = nn.Linear(z_dim, 512)
        self.fc2 = nn.Linear(512, 512)
        self.fc3 = nn.Linear(512, y_dim_nlp)

    def forward(self, z):
        h = F.relu(self.fc1(z))
        h = F.relu(self.fc2(h))
        return {"loc": self.fc3(h), "scale": torch.ones_like(self.fc3(h))}  # 分散は固定値

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# prior model p(z)
prior = Normal(loc=torch.tensor(0.), scale=torch.tensor(1.),
               var=["z"], features_shape=[z_dim], name="p_{prior}").to(device)

p_x = GeneratorX().to(device)
p_y = GeneratorY().to(device)

q = Inference().to(device)
q_x = InferenceX().to(device)
q_y = InferenceY().to(device)

p = p_x * p_y

kl = KullbackLeibler(q, prior)
kl_x = KullbackLeibler(q, q_x)
kl_y = KullbackLeibler(q, q_y)

regularizer = kl + kl_x + kl_y

print(p)
print_latex(p)

print(regularizer)
print_latex(regularizer)

Distribution:
  p(x_{ts},y_{nlp}|z) = p(y_{nlp}|z)p(x_{ts}|z)
Network architecture:
  p(x_{ts}|z):
  GeneratorX(
    name=p, distribution_name=Bernoulli,
    var=['x_ts'], cond_var=['z'], input_var=['z'], features_shape=torch.Size([])
    (fc1): Linear(in_features=64, out_features=512, bias=True)
    (fc2): Linear(in_features=512, out_features=512, bias=True)
    (fc3): Linear(in_features=512, out_features=100, bias=True)
  )
  p(y_{nlp}|z):
  GeneratorY(
    name=p, distribution_name=Normal,
    var=['y_nlp'], cond_var=['z'], input_var=['z'], features_shape=torch.Size([])
    (fc1): Linear(in_features=64, out_features=512, bias=True)
    (fc2): Linear(in_features=512, out_features=512, bias=True)
    (fc3): Linear(in_features=512, out_features=768, bias=True)
  )
D_{KL} \left[q(z|x_{ts},y_{nlp})||p_{prior}(z) \right] + D_{KL} \left[q(z|x_{ts},y_{nlp})||q(z|x_{ts}) \right] + D_{KL} \left[q(z|x_{ts},y_{nlp})||q(z|y_{nlp}) \right]


<IPython.core.display.Math object>

In [120]:
model = VAE(q, p, other_distributions=[q_x, q_y],
            regularizer=regularizer, optimizer=optim.Adam, optimizer_params={"lr":1e-3})
print(model)
print_latex(model)

Distributions (for training):
  q(z|x_{ts},y_{nlp}), p(x_{ts},y_{nlp}|z), q(z|x_{ts}), q(z|y_{nlp})
Loss function:
  mean \left(D_{KL} \left[q(z|x_{ts},y_{nlp})||p_{prior}(z) \right] + D_{KL} \left[q(z|x_{ts},y_{nlp})||q(z|x_{ts}) \right] + D_{KL} \left[q(z|x_{ts},y_{nlp})||q(z|y_{nlp}) \right] - \mathbb{E}_{q(z|x_{ts},y_{nlp})} \left[\log p(x_{ts},y_{nlp}|z) \right] \right)
Optimizer:
  Adam (
  Parameter Group 0
      amsgrad: False
      betas: (0.9, 0.999)
      capturable: False
      differentiable: False
      eps: 1e-08
      foreach: None
      fused: None
      lr: 0.001
      maximize: False
      weight_decay: 0
  )


<IPython.core.display.Math object>

In [123]:
# Training loop
def train_model(model, train_loader, val_loader, num_epochs=20):
    best_val_loss = float("inf")
    best_model_state = None

    for epoch in range(num_epochs):
        # model.train()
        train_loss = 0

        for x_ts, y_nlp in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
            x_ts, y_nlp = x_ts.to(device), y_nlp.to(device)
            loss = model.train({"x_ts": x_ts, "y_nlp": y_nlp})
            train_loss += loss

        train_loss /= len(train_loader)

        # model.eval()
        val_loss = 0

        latent_vars = []
        with torch.no_grad():
            for x_ts, y_nlp in val_loader:
                x_ts, y_nlp = x_ts.to(device), y_nlp.to(device)
                loss = model.test({"x_ts": x_ts, "y_nlp": y_nlp})
                val_loss += loss

                posterior_params = q(x_ts=x_ts, y_nlp=y_nlp)
                loc = posterior_params["loc"]
                scale = posterior_params["scale"]
                set_seed(42) # 再現性の担保のため、ここに記載必要あり
                z = loc + scale * torch.randn_like(scale)
                latent_vars.append(z.cpu().numpy())

        val_loss /= len(val_loader)

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.save('jmvae')

    print("Training complete. Best validation loss: {:.4f}".format(best_val_loss))
    return np.concatenate(latent_vars, axis=0)
    # return best_model_state

# Extract latent variables
def extract_latent_variables(model, data_loader):
    # model.eval()
    latent_vars = []

    with torch.no_grad():
        for x_ts, y_nlp in data_loader:
            x_ts, y_nlp = x_ts.to(device), y_nlp.to(device)
            posterior_params = q(x_ts=x_ts, y_nlp=y_nlp)
            loc = posterior_params["loc"]
            scale = posterior_params["scale"]
            set_seed(42) # 再現性の担保のため、ここに記載必要あり
            z = loc + scale * torch.randn_like(scale)
            latent_vars.append(z.cpu().numpy())

    return np.concatenate(latent_vars, axis=0)

# Train the model
train_model(model, train_loader, test_loader, num_epochs=1)
model.load('jmvae')

# Extract latent variables from the test dataset
z_test = extract_latent_variables(model, test_loader)
print("Extracted latent variables shape:", z_test.shape)

Epoch 1/1: 100%|██████████| 22/22 [00:00<00:00, 26.12it/s]


Epoch 1/1, Train Loss: 808.8531, Val Loss: 808.1407
Training complete. Best validation loss: 808.1407
Extracted latent variables shape: (100, 64)


In [51]:
z_test

array([[-0.36885163, -1.0428203 ,  1.0490055 , ...,  1.3436681 ,
         0.15581864,  1.6411437 ],
       [-0.7159389 ,  0.48006743,  0.48653144, ..., -0.5092131 ,
         0.69968843, -0.23747133],
       [-0.93877834,  1.3061308 ,  0.86554307, ..., -0.20429112,
        -1.0778545 ,  0.01420994],
       ...,
       [ 1.5985991 ,  0.47357184,  1.981343  , ...,  1.5515734 ,
        -1.649648  , -1.3414232 ],
       [-1.5919024 ,  1.5516188 ,  1.5557631 , ...,  0.7525108 ,
        -0.04126738,  0.8054824 ],
       [-0.9444447 ,  0.47623128,  0.5947096 , ..., -0.28634986,
         0.2976985 , -0.42944926]], dtype=float32)

In [33]:
z_test

array([[ 0.80054146,  0.34293288, -1.9371177 , ..., -0.67033124,
         1.7463284 ,  0.9668151 ],
       [-0.9628307 ,  1.9619235 ,  0.72652674, ...,  1.0903841 ,
        -0.20802382,  0.53745055],
       [-0.5212683 ,  2.9598465 , -0.3258235 , ...,  0.22248146,
         1.1281545 , -1.2441776 ],
       ...,
       [ 1.0885067 ,  0.52142954, -0.8099768 , ...,  0.10907855,
         1.5081668 ,  0.5507123 ],
       [ 1.3673116 , -0.15788765,  1.1597885 , ..., -0.16008048,
        -0.16548574,  0.07460947],
       [ 0.5198291 ,  0.47770068, -1.1652261 , ...,  1.8560479 ,
         1.453478  , -0.00462941]], dtype=float32)

In [115]:
import torch
import numpy as np

def set_seed(seed: int):
    torch.manual_seed(seed)
    np.random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# シードを固定
set_seed(42)

# loc と scale の設定
loc = torch.tensor([0.5, 1.0, 1.5])
scale = torch.tensor([0.1, 0.2, 0.3])

# サンプリング
set_seed(42)
z = loc + scale * torch.randn_like(scale)
print("Sampled z:", z)

# 再実行しても同じ結果になる
set_seed(42)
z_repeated = loc + scale * torch.randn_like(scale)
print("Sampled z (repeated):", z_repeated)

Sampled z: tensor([0.5337, 1.0258, 1.5703])
Sampled z (repeated): tensor([0.5337, 1.0258, 1.5703])
