# 自己回帰モデルを用いた画像生成の例

## githubリポジトリから10秒の動画をダウンロードして使用してください

## 環境構築

In [None]:
#@title
!pip install japanize_matplotlib

from google.colab import files, drive

import numpy as np
import cv2

import matplotlib.pyplot as plt
import seaborn as sns
import os

from moviepy.editor import VideoFileClip
from tqdm import tqdm

from matplotlib.animation import FuncAnimation
from IPython.display import HTML
from IPython.display import display

import japanize_matplotlib

drive.mount("/content/drive")

## mp4ファイルの動画処理

### 動画のアップロード
- 開発時はmp4の1020x1980で作成

In [None]:
#@title
mp4_file_name = list(files.upload().keys())[0]
if mp4_file_name[-3:] != "mp4":
    raise ValueError("入力される画像の形式は，mp4を想定しています．")
mp4_file_path = os.path.join("/content", mp4_file_name)

### 動画ファイルを画像に分割

In [None]:
#@title
def extract_frames(video_path, output_folder, frames_per_second):
    clip = VideoFileClip(video_path)

    # ビデオの長さとフレームレートを取得
    duration = clip.duration
    fps = clip.fps

    # 指定したフレームレートに合わせてフレームを抽出する間隔を計算
    interval = max(round(fps / frames_per_second), 1)

    # 出力フォルダが存在しない場合は作成
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for i, frame in tqdm(enumerate(clip.iter_frames())):
        # 指定したフレーム間隔でフレームを保存
        if i % interval == 0:
            output_path = os.path.join(output_folder, f"frame_{i:03d}.jpg")
            clip.save_frame(output_path, t=i/fps)  # 指定したタイムスタンプでフレームを保存

        if (i + 1) / fps > duration:
            break

# 使用例
video_path = os.path.join("/content", mp4_file_path)  # 入力ビデオのパス
output_folder = os.path.join("/content", "image_from_mp4")  # 出力フォルダのパス
frames_per_second = 100  # 1秒間に抽出するフレーム数

extract_frames(video_path, output_folder, frames_per_second)


### 白黒画像に変換と縮尺変換

In [None]:
#@title
image_folder_path = output_folder
image_name_list = os.listdir(image_folder_path)
gray_image_folder_path = "/content/gray_image_folder"
if not os.path.exists(gray_image_folder_path): os.mkdir(gray_image_folder_path)
for image_name in tqdm(image_name_list):
    image_path = os.path.join(image_folder_path, image_name)
    image = cv2.imread(image_path)
    scale_percent = 10 # percent of original size
    width = int(image.shape[1] * scale_percent / 100)
    height = int(image.shape[0] * scale_percent / 100)
    dim = (width, height)

    # resize image
    image = cv2.resize(image, dim, interpolation = cv2.INTER_AREA)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    cv2.imwrite(os.path.join(gray_image_folder_path, image_name), image)

## 学習データの作成

### 読み込み

#### 外観の確認

In [None]:
#@title
image_names = os.listdir(gray_image_folder_path)
image_path_list = [os.path.join(gray_image_folder_path, image_name) for image_name in image_names]
image_name = image_names[0]
fig, axs = plt.subplots(2, 3, figsize=(12, 6))
axs = axs.flatten()
for i, image_path in enumerate(image_path_list[:6]):
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    axs[i].imshow(image)
    axs[i].axis("off")
    axs[i].set_title(f"File name: {image_name}")
plt.suptitle("利用するデータの概観", fontsize=18)
plt.tight_layout()
plt.show()

データを変数として記録するフェーズ

In [None]:
#@title
data = []
for i in range(len(image_path_list)):
    image = cv2.imread(image_path_list[i])
    image = image.transpose(2, 0, 1)
    # 各チャンネルの状態を確認したかったらここを制御
    # if i == 0:
    #     fig = plt.figure(figsize=(12, 2.9))
    #     for channel in range(3):
    #         ax = fig.add_subplot(1, 3, channel+1)
    #         sns.heatmap(image[channel], cmap="gray", ax=ax, cbar=False)
    #         ax.axis("off")
    #         ax.set_title(f"channel: {channel}")
    #     plt.suptitle("各チャンネルごとのフレーム")
    #     plt.tight_layout()
    #     plt.show()
    image = image[0]
    data.append(image)
data = np.array(data, dtype=np.uint8)

In [None]:
#@title
image_shape = data.shape[1:]
data_1d = data.reshape(len(data), -1)
print(data.shape, "->", data_1d.shape)

train_size = len(data_1d) * 8 // 10
train, test = data_1d[:train_size], data_1d[train_size:]

print("train size:", len(train))
print("test size:", len(test))

### 学習データの圧縮

In [None]:
from sklearn.decomposition import PCA

# 圧縮する次元数を指定
n_components = 50

In [None]:
#@title
decomposition = PCA(n_components=200)
train_decomposed = decomposition.fit_transform(train)
fig = plt.figure(figsize=(12, 5))
ax.semilogy(base=np.e)
ax = fig.add_subplot()
sns.lineplot(decomposition.explained_variance_ratio_, ax=ax)
ax.set_title("各コンポーネントの分散の割合")
ax.set_ylabel("分散説明割合")
ax.set_xlabel("コンポーネント数（削減後の次元数）")
ax.text(n_components/200-.065, 0.3, f"被説明分散割合\nndim: {n_components} | {decomposition.explained_variance_ratio_[:n_components].sum():.5f}", transform=ax.transAxes, fontsize=12, ha="center")
ax.axvline(n_components, color="r")
plt.tight_layout()
plt.show()


decomposition = PCA(n_components=n_components)
train_decomposed = decomposition.fit_transform(train)
test_decomposed = decomposition.transform(test)

# 学習データとテストデータへの分割

In [None]:
#@title
# 訓練データ
train_features = []
train_t = []
for i in range(len(train_decomposed)-3):
    train_features.append(train_decomposed[i:i+3])
    train_t.append(train_decomposed[i+3])
train_features = np.array(train_features, dtype=np.uint8).reshape(len(train_features), -1)
train_t = np.array(train_t).reshape(len(train_t), -1)

train_t_image = (np.dot(train_t, decomposition.components_)\
                +train[:-3].mean(axis=0)).reshape(-1, image_shape[0], image_shape[1])
# テストデータ
test_features = []
test_t = []
for i in range(len(test_decomposed)-3):
    test_features.append(test_decomposed[i:i+3])
    test_t.append(test_decomposed[i+3])
test_features = np.array(test_features, dtype=np.uint8)\
                        .reshape(len(test_features), -1)
test_t = np.array(test_t, dtype=np.uint8).reshape(len(test_t), -1)

test_t_image = (np.dot(test_t, decomposition.components_)\
                +test[:-3].mean(axis=0)).reshape(-1, image_shape[0], image_shape[1])

### どれくらいの精度で圧縮されているのか確認

In [None]:
#@title
fig = plt.figure(figsize=(20, 5.5))
for i in range(10):
    ax = fig.add_subplot(2, 5, i+1)
    sns.heatmap((np.dot(train_t[i], decomposition.components_)+train.mean(axis=0))\
                .reshape(image_shape), cmap="gray", ax=ax, cbar=False)
    ax.set_title("train_t[{}]".format(i))
    ax.axis("off")
fig.suptitle(f"{decomposition.n_components}次元に圧縮された教師データを展開"
                ,fontsize=14)
fig.tight_layout()
fig.show()

fig = plt.figure(figsize=(20, 3))
for i in range(5):
    ax = fig.add_subplot(1, 5, i+1)
    sns.heatmap((np.dot(test_t[i], decomposition.components_)+train.mean(axis=0))\
                .reshape(image_shape), cmap="gray", ax=ax, cbar=False)
    ax.set_title("test_t[{}]".format(i))
    ax.axis("off")
fig.suptitle(f"{decomposition.n_components}次元に圧縮された未知データを展開"
                ,fontsize=14)
fig.tight_layout()
fig.show()

In [None]:
train_features.shape

# モデルの構築

In [None]:
scaled = True

学習フェーズ

In [None]:
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler().fit_transform
regr = ElasticNet(random_state=0)
if scaled: regr.fit(scaler(train_features), train_t)
else: regr.fit(train_features, train_t)

推論フェーズ

In [None]:
#@title
if scaled:
    train_predict = regr.predict(scaler(train_features))
    test_predict = regr.predict(scaler(test_features))
else:
    train_predict = regr.predict(train_features)
    test_predict = regr.predict(test_features)

print("Train_predict.shape:", train_predict.shape)
print("Train_t.shape:", train_t.shape)
print("Test_predict.shape:", test_predict.shape)
print("Test_t.shape:", test_t.shape)

train_predicted_image = \
    (np.dot(train_predict, decomposition.components_)\
    + train[:-3].mean(axis=0)).reshape(-1, image_shape[0], image_shape[1])

test_predicted_image = \
    (np.dot(test_predict, decomposition.components_)\
    + test[:-3].mean(axis=0)).reshape(-1, image_shape[0], image_shape[1])

In [None]:
print("Train true image:", train_t_image.shape)
print("Train predicted image:", train_predicted_image.shape)
print("Test true image:", test_t_image.shape)
print("Test predicted image:", test_predicted_image.shape)

## 推論結果の可視化（アニメーション）

In [None]:
#@title
def makeAnimation(frames):
    plt.figure(figsize=(frames[0].shape[1]/25, frames[0].shape[0]/25), dpi=72)
    patch = plt.imshow(frames[0], cmap="gray")
    plt.axis('off')
    plt.tight_layout()

    def animate(i):
        patch.set_data(frames[i])

    anim = FuncAnimation(plt.gcf(), animate, frames=len(frames), interval=1000/30.0)
    display(HTML(anim.to_jshtml()))


### 教師データ

In [None]:
makeAnimation(train_t_image)

### 教師データに対する予測結果

In [None]:
makeAnimation(train_predicted_image)

### 未知データ

In [None]:
makeAnimation(test_t_image)

### 未知データへの予測

In [None]:
makeAnimation(test_predicted_image)

予測したデータから無限に生成

In [None]:
#@title
# 初期値の設定
feature = test_features[-1]

predicted_images = []
# 予測フレーム数
n_predict_frames = 100
for _ in range(n_predict_frames):
    if scaled: predicted = regr.predict(scaler([feature]))
    else: predicted = regr.predict([feature])
    predicted_images.append(
        (np.dot(predicted[0], decomposition.components_)+train.mean(axis=0)).reshape(image_shape[0], image_shape[1])
    )
    feature = np.vstack((feature.reshape(-1, n_components)[1:], predicted)).flatten()

In [None]:
makeAnimation(predicted_images)