In [36]:
import os

# ユーザーのディレクトリを作成する関数
def create_user_directory(data):

    user_id = data["user_id"]

    base_path = "../user"
    user_path = os.path.join(base_path, user_id)
    
    # ユーザーディレクトリを作成（すでに存在する場合は何もしない）
    try:
        os.makedirs(user_path, exist_ok=True)
        os.makedirs(os.path.join(user_path, "CartPole"), exist_ok=True)
        os.makedirs(os.path.join(user_path, "FlappyBird"), exist_ok=True)
        return {"message": "successfully"}
    except Exception as e:
        return {"message": str(e)}

# 使用例
data = {
    "user_id": "example_user_id"
}

response = create_user_directory(data)
print(response)

{'message': 'successfully'}


In [37]:
import os

# プロジェクトディレクトリを作成する関数
def create_project_directory(data):

    user_id = data["user_id"]
    project_name = data["project_name"]

    base_path = "../user"
    user_path = os.path.join(base_path, user_id)
    project_path = os.path.join(user_path, project_name)
    
    try:
        os.makedirs(project_path, exist_ok=True)
        return {"message": "successfully"}
    except Exception as e:
        return {"message": str(e)}

# 使用例
data = {
    "user_id": "example_user_id",
    "project_name": "example_project"
}
response = create_project_directory(data)
print(response)


{'message': 'successfully'}


In [38]:
import os

def create_model_directory_from_dict(data):

    user_id = data["user_id"]
    project_name = data["project_name"]
    model_id = data["model_id"]

    base_path = "../user"
    project_path = os.path.join(base_path, user_id, project_name)
    model_path = os.path.join(project_path, model_id)
    
    try:
        os.makedirs(model_path, exist_ok=True)
        return {"message": "successfully"}
    except Exception as e:
        return {"message": str(e)}

# 使用例
data = {
    "user_id": "example_user_id",
    "project_name": "example_project",
    "model_id": "example_model"
}
response = create_model_directory_from_dict(data)
print(response)


{'message': 'successfully'}


In [2]:
def make_conv_layer(layer):
    return f'            nn.Conv2d({layer["in_channels"]}, {layer["out_channel"]}, {layer["kernel_size"]}, stride={layer["stride"]}, padding={layer["padding"]}),\n'

def make_pool_layer(pool_type, kernel_size, stride, padding):
    return f'            nn.{pool_type}({kernel_size}, stride={stride}, padding={padding}),\n'

def make_dropout_layer(dropout_p):
    return f'            nn.Dropout(p={dropout_p}),\n'

def make_batchnorm_layer(num_features):
    return f'            nn.BatchNorm2d({num_features}),\n'

def make_linear(input_size, output_size):
    return f'            nn.Linear({input_size}, {output_size}),\n'

def make_activ(activ_name):
    return f'            nn.{activ_name}(),\n'

def make_flatten_bif(flutten_bif):
    return f'            nn.{flutten_bif}(1),\n'

def make_python_code(data):
    py_1 = '''
import torch.nn as nn
'''
    py_2 = '''
class Simple_NN(nn.Module):
    def __init__(self):
        super(Simple_NN, self).__init__()
'''
    py_3 = '''
        self.seq = nn.Sequential(
'''
    py_4 = '''
            )
'''
    py_5 = '''
    def forward(self, x):
        return self.seq(x)
'''
    structure = data.get('structure')
    input_layer = structure.get('InputLayer')
    conv_layers = structure.get('ConvLayer')
    middle_layers = structure.get('MiddleLayer')
    output_size = structure.get('OutputLayer')
    flutten_way = structure.get('Flatten_way')
    save_dir = f'../user/{data.get("user_id")}/{data.get("Project_name")}/{data.get("model_id")}'

    in_channels = input_layer[2]
    for layer in conv_layers:
        layer_type = layer.get('layer_type')
        if layer_type == 'conv2d':
            py_3 += make_conv_layer(layer)
            py_3 += make_activ(layer.get('activ_func'))
            in_channels = layer.get('out_channel')
        elif layer_type == 'pool':
            py_3 += make_pool_layer(layer.get('pool_type'), layer.get('kernel_size'), layer.get('stride'), layer.get('padding'))
        elif layer_type == 'dropout':
            py_3 += make_dropout_layer(layer.get('dropout_p'))
        elif layer_type == 'batchnorm':
            py_3 += make_batchnorm_layer(in_channels)

    if flutten_way == 'GAP':
        py_3 += '            nn.AdaptiveAvgPool2d(1),\n'
    elif flutten_way == 'GMP':
        py_3 += '            nn.AdaptiveMaxPool2d(1),\n'

    py_3 += '            nn.Flatten(),\n'

    input_size = in_channels

    for layer in middle_layers:
        py_3 += make_linear(input_size, layer.get('input_size'))
        py_3 += make_activ(layer.get('activ_func'))
        input_size = layer.get('input_size')

    py_3 += make_linear(input_size, output_size)

    python_code = py_1 + py_2 + py_3 + py_4 + py_5
    file_name = 'model_config.py'

    try:
        with open(f"{save_dir}/{file_name}", 'w') as file:
            file.write(python_code)
        return {"message": "successfully"}
    except Exception as e:
        return {"message": str(e)}

# 使用例
data = {
    "model_type": "Conv",
    "user_id": "example_user_id",
    "Project_name": "example_project",
    "model_id": "example_model",
    "structure": {
        "InputLayer": [28, 28, 1],
        "Pretreatment": "none",
        "ConvLayer": [
            {
                "layer_type": "conv2d",
                "in_channels": 1,
                "out_channel": 64,
                "kernel_size": 3,
                "stride": 1,
                "padding": 0,
                "activ_func": "ReLU"
            },
            {
                "layer_type": "padding",
                "kernel_size": 3,
                "stride": 1,
                "padding": 0
            },
            {
                "layer_type": "dropout",
                "dropout_p": 0.2
            },
            {
                "layer_type": "batchnorm"
            }
        ],
        "Flatten_way": "GMP",
        "MiddleLayer": [
            {
                "input_size": 100,
                "activ_func": "Tanh"
            },
            {
                "input_size": 100,
                "activ_func": "Sigmoid"
            }
        ],
        "OutputLayer": 10
    }
}
response = make_python_code(data)
print(response)


{'message': 'successfully'}


In [5]:
import os
import shutil

def delete_model_directories(data):
    user_id = data["user_id"]
    project_name = data["Project_name"]
    model_id_list = data["model_id_list"]

    base_path = "../user"
    project_path = os.path.join(base_path, user_id, project_name)
    
    all_deleted = True
    
    try:
        for model_id in model_id_list:
            model_path = os.path.join(project_path, model_id)
            if os.path.exists(model_path):
                shutil.rmtree(model_path)
            else:
                all_deleted = False
        if all_deleted:
            return {"message": "successfully"}
        else:
            return {"message": "Failed"}
    except Exception as e:
        return {"message": str(e)}

# 使用例
data = {
    "user_id": "example_user_id",
    "Project_name": "example_project_name",
    "model_id_list": ["example_model_id_1", "example_model_id_2"]
}

response = delete_model_directories(data)
print(response)


{'message': 'Failed to delete some directories'}


In [14]:
import os
import shutil
import zipfile
from datetime import datetime

# 指定したモデルディレクトリをダウンロードする関数
def download_model_directories(data):
    user_id = data["user_id"]
    project_name = data["Project_name"]
    model_id_list = data["model_id_list"]

    base_path = "../user"
    user_path = os.path.join(base_path, user_id)
    project_path = os.path.join(user_path, project_name)
    
    zip_file_paths = []
    
    try:
        # 各モデルのディレクトリを処理
        for model_id in model_id_list:
            model_path = os.path.join(project_path, model_id)
            
            # __pycache__ を削除
            pycache_path = os.path.join(model_path, '__pycache__')
            if os.path.exists(pycache_path):
                shutil.rmtree(pycache_path)
            
            # モデルのディレクトリをZIPファイルに圧縮
            zip_file_path = os.path.join(project_path, f"{model_id}.zip")
            shutil.make_archive(zip_file_path.replace('.zip', ''), 'zip', model_path)
            zip_file_paths.append(zip_file_path)
        
        # ユーザーのダウンロードフォルダのパスを取得
        downloads_folder = os.path.join(os.path.expanduser("~"), "Downloads")
        
        # 日付と時間を取得し＆ファイル名を生成
        current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
        final_zip_file_name = f"model_{current_time}.zip"
        final_zip_file = os.path.join(downloads_folder, final_zip_file_name)
        
        # すべてのZIPファイルを1つのZIPファイルに集約
        with zipfile.ZipFile(final_zip_file, 'w') as zipf:
            for file in zip_file_paths:
                zipf.write(file, os.path.basename(file))
                os.remove(file)

        return final_zip_file
    except Exception as e:
        return str(e)

# 使用例
data = {
    "user_id": "example_user_id",
    "Project_name": "example_project",
    "model_id_list": ["example_model_id_1", "example_model_id_2"]
}

result = download_model_directories(data)
print(result)


C:\Users\iriku\Downloads\model_20240719_213522.zip


In [42]:
import numpy as np
from torchvision import transforms
import torch
from torch.utils.data import DataLoader, Dataset
from PIL import Image
import cv2
import base64
import os
import sys
sys.path.append('..')
from train.train_image_classification import GCN


In [37]:
# ZCA白色化の実装
class ZCAWhitening():
    def __init__(self, epsilon=1e-4, device="cuda"):  # 計算が重いのでGPUを用いる
        self.epsilon = epsilon
        self.device = device
        self.mean = None
        self.ZCA_matrix = None

    def fit(self, images):  # 変換行列と平均をデータから計算
        """
        Argument
        --------
        images : torchvision.datasets.cifar.CIFAR10
            入力画像（訓練データ全体）．(N, C, H, W)
        """
        x = images[0][0].reshape(1, -1)  # 画像（1枚）を1次元化
        self.mean = torch.zeros([1, x.size()[1]]).to(self.device)  # 平均値を格納するテンソル．xと同じ形状
        con_matrix = torch.zeros([x.size()[1], x.size()[1]]).to(self.device)
        for i in range(len(images)):  # 各データについての平均を取る
            x = images[i][0].reshape(1, -1).to(self.device)
            self.mean += x / len(images)
            con_matrix += torch.mm(x.t(), x) / len(images)
            if i % 10000 == 0:
                print("{0}/{1}".format(i, len(images)))
        con_matrix -= torch.mm(self.mean.t(), self.mean)
        # E: 固有値 V: 固有ベクトルを並べたもの
        E, V = torch.linalg.eigh(con_matrix)  # 固有値分解
        self.ZCA_matrix = torch.mm(torch.mm(V, torch.diag((E.squeeze()+self.epsilon)**(-0.5))), V.t())  # A(\Lambda + \epsilon I)^{1/2}A^T
        print("completed!")

    def __call__(self, x):
        size = x.size()
        x = x.reshape(1, -1).to(self.device)
        print(x.shape)
        print(self.mean.shape)
        x -= self.mean  # x - \bar{x}
        x = torch.mm(x, self.ZCA_matrix.t())
        x = x.reshape(tuple(size))
        x = x.to("cpu")
        return x
    
    @classmethod
    def load(cls, filepath):
        state = torch.load(filepath)
        instance = cls(epsilon=state['epsilon'], device=state['device'])
        instance.mean = state['mean']
        instance.ZCA_matrix = state['ZCA_matrix']
        return instance

In [38]:
# カスタムデータセット
class CustomDataset(Dataset):
    def __init__(self, x_train, t_train, transform=None):
        data = x_train.astype('float32')
        # self.x_train = data
        # data = np.transpose(x_train, (0, 2, 3, 1)).astype('float32')
        self.x_train = []
        if x_train.shape[3] == 3:
            for i in range(data.shape[0]):
                self.x_train.append(Image.fromarray(np.uint8(data[i])))
        else:
            for i in range(x_train.shape[0]):
                # グレースケール画像を2D配列として扱う
                img = x_train[i].squeeze()  # (28, 28, 1) -> (28, 28)
                # 0-255の範囲にスケーリング（必要な場合）
                # img = (img * 255).astype(np.uint8)
                self.x_train.append(Image.fromarray(img, mode='L'))
        self.t_train = t_train
        if transform is None:
            self.transform = transforms.ToTensor()
        else:
            self.transform = transform

    def __len__(self):
        return len(self.x_train)
    
    def __getitem__(self, idx):
        x_train = self.transform(self.x_train[idx])
        t_train = torch.tensor(self.t_train[idx], dtype=torch.long)

        return x_train, t_train

In [39]:
# 標準化後の画像を[0, 1]に正規化する
def deprocess(x):
    """
    Argument
    --------
    x : np.ndarray
        入力画像．(H, W, C)

    Return
    ------
    _x : np.ndarray
        [0, 1]で正規化した画像．(H, W, C)
    """
    _min = np.min(x)
    _max = np.max(x)
    _x = (x - _min)/(_max - _min)
    return _x


In [54]:
# カスタムデータセット
class PreDataset(Dataset):
    def __init__(self, x_train, transform=None):
        data = x_train.astype('float32')
        # self.x_train = data
        # data = np.transpose(x_train, (0, 2, 3, 1)).astype('float32')
        self.x_train = []
        if x_train.shape[3] == 3:
            for i in range(data.shape[0]):
                self.x_train.append(Image.fromarray(np.uint8(data[i])))
        else:
            for i in range(x_train.shape[0]):
                # グレースケール画像を2D配列として扱う
                img = x_train[i].squeeze()  # (28, 28, 1) -> (28, 28)
                # 0-255の範囲にスケーリング（必要な場合）
                # img = (img * 255).astype(np.uint8)
                self.x_train.append(Image.fromarray(img, mode='L'))
        if transform is None:
            self.transform = transforms.ToTensor()
        else:
            self.transform = transform
        self.normal_transform = transforms.Compose([
            transforms.ToTensor()
        ])

    def __len__(self):
        return len(self.x_train)
    
    def __getitem__(self, idx):
        normal_x = self.normal_transform(self.x_train[idx])
        pre_x = self.transform(self.x_train[idx])

        return normal_x, pre_x

In [58]:
# 前処理後の画像の取得
def get_images(config):
    project_name = config['project_name']
    dataset_dir = os.path.abspath(os.path.join(os.getcwd(), "../dataset", project_name))
    x_train = np.load(os.path.join(dataset_dir, "x_train.npy"))
    image_shape = int(config['input_info']['change_shape'])
    preprocessing = config['input_info']['preprocessing']
    transform_list = [
        transforms.Resize((image_shape, image_shape)),
        transforms.ToTensor()
    ]
    if preprocessing == 'GCN':
        gcn = GCN()
        transform_list.append(gcn)
    elif preprocessing == 'ZCA':
        zca = ZCAWhitening.load(os.path.join(dataset_dir, f"{project_name}_zca.pth"))
        transform_list.append(zca)
    transform = transforms.Compose(transform_list)
    train_dataset = PreDataset(x_train, transform)
    train_loader = DataLoader(train_dataset, batch_size=int(1), shuffle=True)
    i = 0
    images = []
    pre_images = []
    for x, pre in train_loader:
        print(x.shape)
        numpy_x = x.numpy()
        numpy_x = np.transpose(numpy_x[0], (1, 2, 0))
        numpy_pre = pre.numpy()
        numpy_pre = np.transpose(numpy_pre[0], (1, 2, 0))
        cv2.imshow('normal', numpy_x)
        cv2.imshow('preprocessing', deprocess(numpy_pre))
        cv2.waitKey(0)
        cv2.destroyAllWindows()
        print(numpy_x.shape)
        _, img_png = cv2.imencode('.png', numpy_x)
        img_base64 = base64.b64encode(img_png).decode()
        images.append(img_base64)
        _, pre_img_png = cv2.imencode('.png', deprocess(numpy_pre))
        pre_img_base64 = base64.b64encode(pre_img_png).decode()
        pre_images.append(pre_img_base64)
        i += 1
        if i == 1:
            break
    print(images)
    print(pre_images)


config = {
    'project_name': 'CIFAR10',
    'input_info': {
        'change_shape': '32',
        'preprocessing': 'ZCA'
    }
}
get_images(config=config)

  state = torch.load(filepath)


torch.Size([1, 3072])
torch.Size([1, 3072])
torch.Size([1, 3, 32, 32])
(32, 32, 3)
['iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAIAAAD8GO2jAAABOUlEQVRIDZ3BAbaiMADAwOT+h85WfKzwpVKdceAbcUFO4sWBNTElJ/HiwJ1YIg9x4sBcfBYPMufAm1gXyJwDB3FWzAjIHQd2cVbckp38Vzy5YRdnxToBGYoDBzbxpviOCMWBKDPFt5TiQJSZ4ltKcSDKpeIHSnEgyqUCVKDiV6JcKkAFKn4lyqUCVKDiV6JshDgoQGVX8T0H3sSmVHYVT8pQLHBgokJENhWgArErPvGBiYBS2QVyEgexC0QGByYCSmUTUCpX4ppDPMhfAaWyC+ST+EuUnZwElMoukBtxIogM8ldAISKbQO7FiyAiFyo2Kt+LB1G5FlCAyi6QVYEDEwEFqGwqQGVN4MBExUZlUzGoLAkcuBJQPIlYsVNZ48CVijmVNQ68CSg+UFniwJuKOyoLHHhTsUDlzj+Gq5VRPF4eZgAAAABJRU5ErkJggg==']
['iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAIAAAD8GO2jAAABuklEQVRIDaXBAVYbVwBFMd39L/r1jzGhTjw0nEodnkae5ilfRpgXuTU6fBk5Rr4x8t9G5SH3Rl6MXEbem0t5KDfmkh+bS0kY+X/mki+jJB9GzCVPI0/bqpHvzCWXDp9G/rB5q/yFDt/YfCi/zDEL5a2Ro8M7I2zKsXmRy5R3Zgkd7m2rsK3alMvmYYX8boTpcGNTtlWYp4xG2FbIG9s63Bp52FZ5MSM08l6Hd4at8rAtFEaetlVGfjOXDje2VdhoaYTNw6h8q8M7w1ZhW4VtuaxyGc1TXszR4cvIp22VT6MNKxabyp1hHW5sq/wyMxUjRsMcIZeRy+jixrZCHrYV8i8jbORp5MjR4d62yqeRb438W4

In [None]:
def dispend_image(self):
    #
    # バイナリコードにする
    # ------------------------------
    # description :
    #   画像データをpng -> base64に変換して返す
    # argmetns :
    #   * __init__にて定義
    # return :
    #   画像データ（base64）(list)
    images = []
    for img in self.x:
        if self.config['color'] == 'gray': # グレースケール(チャンネル１)
            img = img.reshape(self.config['shape'])
            img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) # グレースケールを3chに変換
            _, img_png = cv2.imencode('.png', img)
            img_base64 = base64.b64encode(img_png).decode()
            images.append(img_base64)
        if self.config['color'] == 'rgb': # カラー(チャンネル３) * 検証まだ
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            _, img_png = cv2.imencode('.png', img)
            img_base64 = base64.b64encode(img_png).decode()
            images.append(img_base64)
    return images