# モジュール

In [1]:
import os
import sys
import warnings


import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import pandas as pd
# TF/Keras
import tensorflow as tf
from tensorflow import keras
from keras import backend as K
from keras.layers import Input, Dense
from tensorflow.keras.callbacks import TensorBoard
from sklearn.manifold import TSNE

In [2]:
import japanize_matplotlib

# フォントの設定
plt.rcParams["font.family"] = "sans-serif"
plt.rcParams["font.sans-serif"] = [
    "Hiragino Gothic Pro",
    "Yu Gothic",
    "Meirio",
    "Takao",
    "IPAexGothic",
    "IPAPGothic",
    "VL PGothic",
    "Noto Sans CJK JP",
]
plt.rcParams["mathtext.fontset"] = "stix"
plt.rcParams["font.size"] = 17
plt.rcParams["xtick.labelsize"] = 12
plt.rcParams["ytick.labelsize"] = 12
# 軸の設定
plt.rcParams["xtick.direction"] = "in"
plt.rcParams["ytick.direction"] = "in"
plt.rcParams["axes.linewidth"] = 2.0
plt.rcParams['axes.grid'] = True
plt.rcParams['grid.linestyle'] = '--'
plt.rcParams["axes.axisbelow"] = True
plt.rcParams["grid.color"] = "black"

# 凡例の設定
plt.rcParams["legend.frameon"] = True
plt.rcParams["legend.framealpha"] = 0.0
plt.rcParams["legend.handlelength"] = 1.0
plt.rcParams["legend.labelspacing"] = 0.4
plt.rcParams["legend.handletextpad"] = 0.8
plt.rcParams["legend.markerscale"] = 1.0

# Data



In [3]:

def make_timeseries_dataset(data, target, timesteps):
    #タイムステップ分ずらす
    N = len(target[timesteps:])
    X = np.zeros((N, timesteps, 1), np.float32)
    Y = np.zeros((N, 1), np.float32)
    for i in range(N):
        X[i] = data[i:i+timesteps][:,np.newaxis]
        Y[i] = target[i+timesteps]
    return X, Y

def make_dataset_for_stateful(data_list, target_list, timesteps):
    N = len(data_list)
    X = []
    Y = []
    for i in range(N):
        x, y = make_timeseries_dataset(data_list[i], target_list[i], timesteps)
        X.append(x)
        Y.append(y)
    X = np.concatenate(X, axis=(1)).reshape(-1, timesteps, 1)
    Y = np.concatenate(Y, axis=(1)).reshape(-1, 1)

    return X, Y

# Model

In [4]:
def build_encoder(input_shape, h_dim, z_dim, name="encoder"):
    #入力層
    inputs = keras.layers.Input(shape=input_shape)
    #隠れ層，h_dim個のユニット，ReLU関数
    #入力データを非線形変換
    x = keras.layers.Dense(h_dim, activation="relu")(inputs)
    #別の隠れ層，ネットワークの表現能力を向上
    x = keras.layers.Dense(h_dim, activation="relu")(x)
    z_mean = keras.layers.Dense(z_dim, name="z_mean")(x)
    z_log_var = keras.layers.Dense(z_dim, name="z_log_var")(x)
    #潜在空間の平均と対数分散を出力
    outputs = [z_mean, z_log_var]
    
    
    return keras.Model(inputs=inputs, outputs=outputs, name=name)

#事前モデル分布
def build_prior(input_shape, h_dim, z_dim, name="prior"):
    inputs = keras.layers.Input(shape=input_shape)
    x = keras.layers.Dense(h_dim, activation="relu")(inputs) 
    prior_mean = keras.layers.Dense(z_dim, name="prior_mean")(x)
    prior_log_var = keras.layers.Dense(z_dim, name="prior_log_var")(x)
    #潜在空間の事前分布の平均を出力
    outputs = [prior_mean, prior_log_var]
    
    return keras.Model(inputs=inputs, outputs=outputs, name=name)

def build_decoder(input_shape, x_dim, name="decoder"):
    inputs = keras.layers.Input(shape=input_shape)
    x = keras.layers.Dense(x_dim, activation="relu")(inputs)
    x = keras.layers.Dense(x_dim, activation="relu")(x)
    x = keras.layers.Dense(x_dim)(x)
    outputs = x
    
    return keras.Model(inputs=inputs, outputs=outputs, name=name)

class Sampling(keras.layers.Layer):
    def __init__(self, name="sampling"):
        super().__init__(name=name)
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.random.normal(shape=(batch, dim))
        #潜在変数 = 期待値(u) + 標準偏差(σ) * 乱数
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

In [5]:
class VRNNCell(keras.layers.Layer):
    def __init__(self, h_dim, z_dim):
        super().__init__()
        self.h_dim = h_dim
        self.z_dim = z_dim
        self.state_size = h_dim

    def build(self, input_shape):
        batch_size, x_dim = input_shape

        #phi 特徴抽出
        self.phi_x_layer = keras.Sequential([
            keras.layers.Input((x_dim)),
            keras.layers.Dense(self.h_dim, activation="relu"),
            keras.layers.Dense(self.h_dim, activation="relu")
        ])
        self.phi_z_layer = keras.layers.Dense(self.z_dim, activation="relu")
        self.encoder = build_encoder(input_shape=(self.h_dim+self.h_dim), h_dim=self.h_dim, z_dim=self.z_dim)
        self.prior_layer = build_prior(input_shape=(self.h_dim), h_dim=self.h_dim, z_dim=self.z_dim)
        self.decoder = build_decoder(input_shape=(self.z_dim), x_dim=x_dim)

        self.rnn_cell = keras.layers.GRUCell(self.h_dim)

        self.sampling_layer = Sampling()

    def call(self, inputs, states):
        h = states[0]   # B x h_dim

        phi_x = self.phi_x_layer(inputs)    # B x h_dim
        x = keras.layers.Concatenate(axis=(1))([phi_x, h])  # B x h_dim+h_dim

        # encoder
        z_mean, z_log_var = self.encoder(x)     # B x z_dim, B x z_dim

        # prior
        prior_mean, prior_log_var = self.prior_layer(h)     # B x z_dim, B x z_dim

        # reparametrization trick
        z = self.sampling_layer([z_mean, z_log_var])    # B x z_dim

        phi_z = self.phi_z_layer(z)     # B x z_dim

        #decoder
        y = self.decoder(phi_z)   # B x z_dim

        #recurrence
        phi = keras.layers.Concatenate(axis=(1))([phi_x, phi_z])
        _, new_h = self.rnn_cell(phi, [h])    # B x h_dim

        outputs = [y, z_mean, z_log_var, prior_mean, prior_log_var, z, phi_z, h]
        new_states = [new_h]    # 1 x B x h_dim
        return outputs, new_states

In [6]:
class VRNN(keras.Model):
    def __init__(self, input_shape, h_dim=100, z_dim=32, return_sequences=False, name="vrnn", *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.cell = VRNNCell(h_dim=h_dim, z_dim=z_dim)
        self.axis = (1, 2) if return_sequences else (1)

        # build model
        inputs = keras.layers.Input(input_shape)
        x = keras.layers.RNN(self.cell, return_sequences=return_sequences)(inputs)
        outputs = x
        self.rnn = keras.Model(inputs=inputs, outputs=outputs)

    def call(self, inputs, training=False):
        return self.rnn(inputs)
    
    def compile(self, optimizer, loss, *args, **kwargs):
        super().compile(*args, **kwargs)

        self.optimizer = optimizer
        self.loss = loss

        # Tracker
        self.total_cost_tracker = keras.metrics.Mean(name="total_cost")
        self.loss_tracker = keras.metrics.Mean(name="loss")
        self.kl_tracker = keras.metrics.Mean(name="kl_divergence")

    @property
    def metrics(self):
        return [
            self.total_cost_tracker,
            self.loss_tracker,
            self.kl_tracker
        ]

    def train_step(self, data):
        x, y_true = data
        with tf.GradientTape() as tape:
            outputs = self.rnn(x)
            y_pred, z_mean, z_log_var, prior_mean, prior_log_var, z, phi_z, h = outputs

            # loss
            loss = tf.reduce_mean(self.loss(y_true, y_pred))

            # KL-divergence
            term1 = prior_log_var - z_log_var
            term2 = (tf.exp(z_log_var) + tf.square(z_mean - prior_mean)) / (tf.exp(prior_log_var) + 1e-12)
            term3 = -1
            kl = tf.reduce_mean(tf.reduce_sum(0.5 * (term1 + term2 + term3), axis=self.axis))

            # term1 = (prior_log_var - z_log_var)
            # term2 = (tf.exp(z_log_var) + tf.square(z_mean - prior_mean)) / (2 * tf.exp(prior_log_var) + 1e-12)
            # term3 = -0.5
            # kl = tf.reduce_mean(tf.reduce_sum((term1 + term2 + term3), axis=self.axis))

            total_cost = loss + kl

        grads = tape.gradient(total_cost, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        # Update
        self.total_cost_tracker.update_state(total_cost)
        self.loss_tracker.update_state(loss)
        self.kl_tracker.update_state(kl)
        
        return {m.name: m.result() for m in self.metrics}

#  データの取得

In [7]:

Q_876 = pd.read_csv('paQuasAr3_dia10-8_-7_-6.csv', encoding='utf-8')
G_876 = pd.read_csv('GCaMP_dia10-8_-7_-6.csv', encoding='utf-8')

# NumPy 配列に変換
Q_876_numpy = Q_876.to_numpy()
G_876_numpy = G_876.to_numpy()

# 配列の転置（N x Tにする）(27, 1199)
x_Q876 = Q_876.to_numpy().transpose((1, 0))
x_G876 = G_876.to_numpy().transpose((1, 0))


x_Q876x200 = (x_Q876-1) * 2000
x_G876x200 = (x_G876-1) * 400

In [None]:
t = np.linspace(0, 1, 1199)
for i, x_ in enumerate(x_G876x200):
    plt.plot(t, x_, linestyle="-")
    plt.title("ID:{}".format(i+1))
    plt.legend()
    plt.grid()

    plt.xticks(fontsize=14)
    plt.yticks(fontsize=14)

    plt.xlabel('Frame', fontsize=18)
    plt.ylabel('Responce', fontsize=18)
    #plt.savefig(f"")
    plt.show()

# データ確認，設定

In [None]:
cmap = plt.get_cmap("tab20")

t = np.linspace(0, 1199, 1199)
# Check
plt.figure()
for i, x_ in enumerate(x_G876x200[:]):
    plt.plot(t, x_, linestyle="-", color=cmap(i),label = "G")
    #plt.plot(t, y_, linestyle="--", color=cmap(i), label = "Q")
    plt.savefig(f"")


plt.grid()
plt.show()

In [None]:
cmap = plt.get_cmap("tab20")
f = 300
total_frames = 1199  # 実際の総フレーム数

for start in range(0, total_frames, f):
    end = min(start + f, total_frames)  # 範囲を総フレーム数で制限
    t = np.linspace(0, 1, end - start)  # 現在の区間に対応する時間軸を生成
    plt.figure()
    for i, x_ in enumerate(x_G876x200[:5, start:end]):  # 5つのサンプルを描画
        plt.plot(t, x_, linestyle="-", color=cmap(i), label=f"G{i+1}")
    plt.legend()
    plt.grid()
    plt.ylim(-20, 400)  # y軸の範囲を指定
    plt.title(f"Frames {start}:{end}")
    plt.savefig(f"")
    plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np

cmap = plt.get_cmap("tab20")
f = 50
total_frames = 1199  # 実際の総フレーム数
num_intervals = (total_frames + f - 1) // f  # 必要なサブプロットの数を計算

# グラフ全体の大きさを設定
plt.figure(figsize=(15, 5 * num_intervals))

for idx, start in enumerate(range(0, total_frames, f)):
    end = min(start + f, total_frames)  # 範囲を制限
    t = np.linspace(0, 1, end - start)  # 現在の区間の時間軸を生成
    
    # サブプロットを作成
    plt.subplot(num_intervals, 1, idx + 1)  # 行ごとに配置
    for i, x_ in enumerate(x_G876x200[:20, start:end]):  # 5つのサンプルを描画
        plt.plot(t, x_, linestyle="-", color=cmap(i), label=f"G{i+1}")
    
    plt.grid()
    plt.ylim(-20, 700)  # y軸の範囲を指定
    plt.title(f"Frames {start}:{end}")
    plt.xlabel("時間 (正規化)")  # x軸ラベル
    plt.ylabel("応答値")  # y軸ラベル

# 全体の間隔を調整
plt.tight_layout()
#plt.savefig(f"")
plt.show()


In [8]:
timesteps = 100

train_Gx = x_G876x200




x_ = train_Gx[:10, :]
y_ = train_Gx[:10, :]

train_x, train_y = make_dataset_for_stateful(x_, y_, timesteps)

# モデルの取得

In [22]:
#ハイパラ
x_dim = 1
h_dim = 32
z_dim = 4
learning_rate = 0.01
epochs=30

model = VRNN(input_shape=(timesteps, x_dim), h_dim=h_dim, z_dim=z_dim)

# 学習


In [23]:
loss = keras.losses.MeanSquaredError()
optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

model.compile(optimizer=optimizer, loss=loss)

#%load_ext tensorboard
# TensorBoardコールバックの作成
#tb_cb1 = tf.keras.callbacks.TensorBoard(
#histogram_freq=1,
#write_images=True
#)
#1, 5, 9, 11, 15, 16, 18, 20, 22, 25, 26


In [None]:
hist = model.fit(
    train_x, train_y,
    batch_size=10,
    epochs=epochs,
    #callbacks=[tb_cb1]
)

# 推論

In [27]:
id = 27
pre_id = id - 1

# old
# x_2450_ = x_2450[pre_id].reshape(1, -1)

# test_x, _ = make_dataset_for_stateful(x_2450_, x_2450_, timesteps)

x_876_ = x_G876x200[pre_id].reshape(1, -1)


test_x, _ = make_dataset_for_stateful(x_876_, x_876_, timesteps)


outputs = model(test_x)

pred_y = outputs[0]




# 潜在変数と隠れ状態の可視化

In [28]:
z = outputs[5]
phi_z = outputs[6]
h = outputs[7]

# tsne

In [None]:
prexity = 35
iter = 1000
learning = 200
pre = "35"
ite = "1000"
learn = "200"
tsne = TSNE(n_components=2, perplexity=prexity, learning_rate=learning, n_iter = iter)
#tsne = TSNE(n_components=2)
time = np.arange(0, 1189, 1)
time1 = np.arange(0, 2440, 1)
time2 = np.arange(0, 2440, 1)
z_ = z[50:500]
proj = tsne.fit_transform(z)
tg = "z"

plt.title("No.{:>2d}, tsne, z_dim = {:>2d}".format(id, z_dim))
scatter = plt.scatter(proj[:, 0], proj[:, 1], c=pred_y, cmap="rainbow", alpha=0.9, vmin=min(pred_y), vmax=max(pred_y))
# scatter = plt.scatter(proj[:, 0], proj[:, 1], c=time, cmap="rainbow", alpha=0.9, vmin=min(time), vmax=max(time))


plt.xlabel("潜在変数　1次元目")
plt.ylabel("潜在変数　2次元目")
colorbar = plt.colorbar(scatter)

plt.xlim(-80, 100)  # x軸の範囲設定
# plt.xticks(np.arange(-20, 20, 5))
# plt.yticks(np.arange(-20, 20, 5))
plt.ylim(-80, 80)  # y軸の範囲設定
plt.grid()
