<a href="https://colab.research.google.com/github/sherrylittlefish/DLGAN/blob/main/model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**資料A**

In [4]:
import zipfile
import os

# 設定 zip 檔案路徑（根據你的實際位置修改）
zip_path = '/content/drive/MyDrive/高雄大學/pic.zip'
unzip_dir = '/content/unzipped'

# 解壓 pic.zip 到 /content/unzipped
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(unzip_dir)


In [6]:
# 解壓 filter1.zip
filtered1_zip_path = os.path.join(unzip_dir, 'filtered1.zip')
filtered1_dir = os.path.join(unzip_dir, 'filtered1')
with zipfile.ZipFile(filtered1_zip_path, 'r') as zip_ref:
    zip_ref.extractall(filtered1_dir)

In [7]:
import shutil
import random

# 指定資料夾與輸出位置
folders = ['0051', '0052', '0053']
trainA_dir = '/content/trainA'
testA_dir = '/content/testA'

os.makedirs(trainA_dir, exist_ok=True)
os.makedirs(testA_dir, exist_ok=True)

# 收集所有圖片檔案路徑
all_images = []
for folder in folders:
    folder_path = os.path.join(filtered1_dir, folder)
    if os.path.isdir(folder_path):
        for fname in os.listdir(folder_path):
            fpath = os.path.join(folder_path, fname)
            if os.path.isfile(fpath):
                all_images.append(fpath)

# 打亂順序
random.shuffle(all_images)

# 分割資料集（約 1:10 比例）
split_index = max(1, int(len(all_images) * 0.1))  # 至少1張到testA
test_images = all_images[:split_index]
train_images = all_images[split_index:]

# 複製圖片
def copy_images(image_list, target_dir):
    for src_path in image_list:
        fname = os.path.basename(src_path)
        dst_path = os.path.join(target_dir, fname)
        # 若重名可加上 prefix 防止覆蓋（這裡為保簡單略過）
        shutil.copy(src_path, dst_path)

copy_images(train_images, trainA_dir)
copy_images(test_images, testA_dir)

print(f"✔ 總共 {len(all_images)} 張圖片，已分成：")
print(f"   ➤ trainA：{len(train_images)} 張")
print(f"   ➤ testA ：{len(test_images)} 張")


✔ 總共 1359 張圖片，已分成：
   ➤ trainA：1224 張
   ➤ testA ：135 張


**資料B**

In [9]:
zip_path2 = '/content/drive/MyDrive/高雄大學/person_images_filtered.zip'
# 解壓 person_images_filtered.zip 到 /content/unzipped
with zipfile.ZipFile(zip_path2, 'r') as zip_ref:
    zip_ref.extractall(unzip_dir)

In [10]:
# 原始圖片資料夾
source_dir = '/content/unzipped/person_images_filtered'

# 輸出目錄
trainB_dir = '/content/trainB'
testB_dir = '/content/testB'

# 建立資料夾
os.makedirs(trainB_dir, exist_ok=True)
os.makedirs(testB_dir, exist_ok=True)

# 取得所有圖片清單
all_images = [f for f in os.listdir(source_dir) if os.path.isfile(os.path.join(source_dir, f))]

# 打亂順序
random.shuffle(all_images)

# 設定比例（約10%進 testB，其餘進 trainB）
num_test = max(1, int(len(all_images) * 0.1))  # 至少 1 張
test_images = all_images[:num_test]
train_images = all_images[num_test:]

# 複製檔案
def copy_images(image_list, target_dir):
    for fname in image_list:
        src = os.path.join(source_dir, fname)
        dst = os.path.join(target_dir, fname)
        shutil.copy(src, dst)

copy_images(train_images, trainB_dir)
copy_images(test_images, testB_dir)

print(f"✔ 已完成分割：")
print(f"   ➤ trainB：{len(train_images)} 張")
print(f"   ➤ testB ：{len(test_images)} 張")


✔ 已完成分割：
   ➤ trainB：6345 張
   ➤ testB ：705 張


In [12]:
import tensorflow as tf

# 設定你的資料夾路徑
data_dir = '/content/'

# 自訂 function 來載入圖片（以符合 cycleGAN 格式）
def load_image_dataset(path, image_size=(256, 256), batch_size=1):
    dataset = tf.keras.utils.image_dataset_from_directory(
        path,
        labels=None,
        image_size=image_size,
        batch_size=batch_size,
        shuffle=True
    )
    # 正規化到 [-1, 1]，因為 CycleGAN 使用這種 scale
    normalization_layer = tf.keras.layers.Rescaling(scale=1./127.5, offset=-1)
    return dataset.map(lambda x: normalization_layer(x))

# 建立資料集
trainA = load_image_dataset(os.path.join(data_dir, 'trainA'))
trainB = load_image_dataset(os.path.join(data_dir, 'trainB'))
testA  = load_image_dataset(os.path.join(data_dir, 'testA'))
testB  = load_image_dataset(os.path.join(data_dir, 'testB'))


Found 1224 files.
Found 6345 files.
Found 135 files.
Found 705 files.


In [13]:
for example in trainA.take(1):
    print(example.shape)  # 應該是 (1, 256, 256, 3)


(1, 256, 256, 3)


In [14]:
train_pic = trainA
train_person_images = trainB
test_pic = testA
test_person_images = testB

In [23]:
!pip install git+https://github.com/tensorflow/examples.git

Collecting git+https://github.com/tensorflow/examples.git
  Cloning https://github.com/tensorflow/examples.git to /tmp/pip-req-build-zrmggrve
  Running command git clone --filter=blob:none --quiet https://github.com/tensorflow/examples.git /tmp/pip-req-build-zrmggrve
  Resolved https://github.com/tensorflow/examples.git to commit 3c7435733a9162481df350e57c53e58d2a705e4d
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: tensorflow-examples
  Building wheel for tensorflow-examples (setup.py) ... [?25l[?25hdone
  Created wheel for tensorflow-examples: filename=tensorflow_examples-0.1746868094.345130988853936603129253091149118257392589495885-py3-none-any.whl size=301646 sha256=7b833e7e83019ec34a62fc47909ff88d247b14bc946dc5e593439c8232daf0e9
  Stored in directory: /tmp/pip-ephem-wheel-cache-j9s03xdv/wheels/91/9b/e8/6ae2ecc930bd726c578e35b313e987a687bc5ce03c3a42c2d5
Successfully built tensorflow-examples
Installing collected packages: tensorflow-e

In [24]:
import tensorflow_datasets as tfds
from tensorflow_examples.models.pix2pix import pix2pix

import os
import time
import matplotlib.pyplot as plt
from IPython.display import clear_output

AUTOTUNE = tf.data.AUTOTUNE

In [26]:
BUFFER_SIZE = 1000
BATCH_SIZE = 1
IMG_WIDTH = 256
IMG_HEIGHT = 256

In [27]:
def random_crop(image):
  cropped_image = tf.image.random_crop(
      image, size=[IMG_HEIGHT, IMG_WIDTH, 3])

  return cropped_image

In [28]:
# normalizing the images to [-1, 1]
def normalize(image):
  image = tf.cast(image, tf.float32)
  image = (image / 127.5) - 1
  return image

In [29]:
def random_jitter(image):
  # resizing to 286 x 286 x 3
  image = tf.image.resize(image, [286, 286],
                          method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)

  # randomly cropping to 256 x 256 x 3
  image = random_crop(image)

  # random mirroring
  image = tf.image.random_flip_left_right(image)

  return image

In [30]:
def preprocess_image_train(image, label):
  image = random_jitter(image)
  image = normalize(image)
  return image

In [31]:
def preprocess_image_test(image, label):
  image = normalize(image)
  return image

In [32]:
train_pic = train_pic.cache().map(
    preprocess_image_train, num_parallel_calls=AUTOTUNE).shuffle(
    BUFFER_SIZE).batch(BATCH_SIZE)

train_person_images = train_person_images.cache().map(
    preprocess_image_train, num_parallel_calls=AUTOTUNE).shuffle(
    BUFFER_SIZE).batch(BATCH_SIZE)

test_pic = test_pic.map(
    preprocess_image_test, num_parallel_calls=AUTOTUNE).cache().shuffle(
    BUFFER_SIZE).batch(BATCH_SIZE)

test_person_images = test_person_images.map(
    preprocess_image_test, num_parallel_calls=AUTOTUNE).cache().shuffle(
    BUFFER_SIZE).batch(BATCH_SIZE)

TypeError: in user code:


    TypeError: outer_factory.<locals>.inner_factory.<locals>.tf__preprocess_image_train() missing 1 required positional argument: 'label'


In [34]:
import tensorflow as tf
import os
import numpy as np
import matplotlib.pyplot as plt

AUTOTUNE = tf.data.AUTOTUNE

# 資料載入
def load_image(file_path, size=(256, 256)):
    image = tf.io.read_file(file_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, size)
    image = (tf.cast(image, tf.float32) / 127.5) - 1  # 正規化 [-1, 1]
    return image

def make_dataset(folder, batch_size=1, image_size=(256, 256), shuffle=True):
    file_pattern = os.path.join(folder, '*')
    dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)
    dataset = dataset.map(lambda x: load_image(x, image_size), num_parallel_calls=AUTOTUNE)
    dataset = dataset.batch(batch_size).prefetch(AUTOTUNE)
    return dataset

def load_custom_dataset(root_path, batch_size=1, image_size=(256, 256)):
    trainA = make_dataset(os.path.join(root_path, 'trainA'), batch_size, image_size, shuffle=True)
    trainB = make_dataset(os.path.join(root_path, 'trainB'), batch_size, image_size, shuffle=True)
    testA  = make_dataset(os.path.join(root_path, 'testA'),  batch_size, image_size, shuffle=False)
    testB  = make_dataset(os.path.join(root_path, 'testB'),  batch_size, image_size, shuffle=False)
    return trainA, trainB, testA, testB

# 生成器與判別器
OUTPUT_CHANNELS = 3

def downsample(filters, size, apply_batchnorm=True):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                                      kernel_initializer=initializer, use_bias=False))
    if apply_batchnorm:
        result.add(tf.keras.layers.BatchNormalization())
    result.add(tf.keras.layers.LeakyReLU())
    return result

def upsample(filters, size, apply_dropout=False):
    initializer = tf.random_normal_initializer(0., 0.02)
    result = tf.keras.Sequential()
    result.add(tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                               padding='same',
                                               kernel_initializer=initializer,
                                               use_bias=False))
    result.add(tf.keras.layers.BatchNormalization())
    if apply_dropout:
        result.add(tf.keras.layers.Dropout(0.5))
    result.add(tf.keras.layers.ReLU())
    return result

def Generator():
    inputs = tf.keras.layers.Input(shape=[256, 256, 3])
    down_stack = [
        downsample(64, 4, apply_batchnorm=False),
        downsample(128, 4),
        downsample(256, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
        downsample(512, 4),
    ]
    up_stack = [
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4, apply_dropout=True),
        upsample(512, 4),
        upsample(256, 4),
        upsample(128, 4),
        upsample(64, 4),
    ]
    initializer = tf.random_normal_initializer(0., 0.02)
    last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS, 4,
                                           strides=2,
                                           padding='same',
                                           kernel_initializer=initializer,
                                           activation='tanh')
    x = inputs
    skips = []
    for down in down_stack:
        x = down(x)
        skips.append(x)
    skips = reversed(skips[:-1])
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = tf.keras.layers.Concatenate()([x, skip])
    x = last(x)
    return tf.keras.Model(inputs=inputs, outputs=x)

def Discriminator():
    initializer = tf.random_normal_initializer(0., 0.02)
    inp = tf.keras.layers.Input(shape=[256, 256, 3], name='input_image')
    tar = tf.keras.layers.Input(shape=[256, 256, 3], name='target_image')
    x = tf.keras.layers.concatenate([inp, tar])  # (bs, 256, 256, channels*2)
    down1 = downsample(64, 4, False)(x)
    down2 = downsample(128, 4)(down1)
    down3 = downsample(256, 4)(down2)
    zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3)
    conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                  kernel_initializer=initializer,
                                  use_bias=False)(zero_pad1)
    batchnorm1 = tf.keras.layers.BatchNormalization()(conv)
    leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)
    zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu)
    last = tf.keras.layers.Conv2D(1, 4, strides=1,
                                  kernel_initializer=initializer)(zero_pad2)
    return tf.keras.Model(inputs=[inp, tar], outputs=last)

# 損失與訓練
loss_obj = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real, generated):
    real_loss = loss_obj(tf.ones_like(real), real)
    generated_loss = loss_obj(tf.zeros_like(generated), generated)
    total_disc_loss = real_loss + generated_loss
    return total_disc_loss * 0.5

def generator_loss(generated):
    return loss_obj(tf.ones_like(generated), generated)

LAMBDA = 10

def calc_cycle_loss(real_image, cycled_image):
    loss = tf.reduce_mean(tf.abs(real_image - cycled_image))
    return LAMBDA * loss

def identity_loss(real_image, same_image):
    loss = tf.reduce_mean(tf.abs(real_image - same_image))
    return LAMBDA * 0.5 * loss

generator_g = Generator()
generator_f = Generator()
discriminator_x = Discriminator()
discriminator_y = Discriminator()

generator_g_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
generator_f_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_x_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)
discriminator_y_optimizer = tf.keras.optimizers.Adam(2e-4, beta_1=0.5)

@tf.function
def train_step(real_x, real_y):
    with tf.GradientTape(persistent=True) as tape:
        fake_y = generator_g(real_x, training=True)
        cycled_x = generator_f(fake_y, training=True)

        fake_x = generator_f(real_y, training=True)
        cycled_y = generator_g(fake_x, training=True)

        same_x = generator_f(real_x, training=True)
        same_y = generator_g(real_y, training=True)

        disc_real_x = discriminator_x(real_x, training=True)
        disc_real_y = discriminator_y(real_y, training=True)

        disc_fake_x = discriminator_x(fake_x, training=True)
        disc_fake_y = discriminator_y(fake_y, training=True)

        gen_g_loss = generator_loss(disc_fake_y)
        gen_f_loss = generator_loss(disc_fake_x)

        total_cycle_loss = calc_cycle_loss(real_x, cycled_x) + calc_cycle_loss(real_y, cycled_y)
        total_gen_g_loss = gen_g_loss + total_cycle_loss + identity_loss(real_y, same_y)
        total_gen_f_loss = gen_f_loss + total_cycle_loss + identity_loss(real_x, same_x)

        disc_x_loss = discriminator_loss(disc_real_x, disc_fake_x)
        disc_y_loss = discriminator_loss(disc_real_y, disc_fake_y)

    generator_g_optimizer.apply_gradients(zip(tape.gradient(total_gen_g_loss, generator_g.trainable_variables),
                                              generator_g.trainable_variables))

    generator_f_optimizer.apply_gradients(zip(tape.gradient(total_gen_f_loss, generator_f.trainable_variables),
                                              generator_f.trainable_variables))

    discriminator_x_optimizer.apply_gradients(zip(tape.gradient(disc_x_loss, discriminator_x.trainable_variables),
                                                  discriminator_x.trainable_variables))

    discriminator_y_optimizer.apply_gradients(zip(tape.gradient(disc_y_loss, discriminator_y.trainable_variables),
                                                  discriminator_y.trainable_variables))

# 載入你的資料
dataset_root = '/content/'  # ← 改成你的資料夾
trainA, trainB, testA, testB = load_custom_dataset(dataset_root)

# 開始訓練
EPOCHS = 5  # 可以調整

for epoch in range(EPOCHS):
    print(f"Epoch {epoch+1}/{EPOCHS}")
    for real_x, real_y in tf.data.Dataset.zip((trainA, trainB)):
        train_step(real_x, real_y)


Epoch 1/5


ValueError: in user code:

    File "<ipython-input-33-8a3f10709180>", line 158, in train_step  *
        disc_real_x = discriminator_x(real_x, training=True)
    File "/usr/local/lib/python3.11/dist-packages/keras/src/utils/traceback_utils.py", line 122, in error_handler  **
        raise e.with_traceback(filtered_tb) from None
    File "/usr/local/lib/python3.11/dist-packages/keras/src/layers/input_spec.py", line 160, in assert_input_compatibility
        raise ValueError(

    ValueError: Layer "functional_75" expects 2 input(s), but it received 1 input tensors. Inputs received: [<tf.Tensor 'real_x:0' shape=(1, 256, 256, 3) dtype=float32>]


In [39]:
# import tensorflow as tf
print(tf.__version__)


2.18.0


In [41]:
import sys
print(sys.version)


3.11.12 (main, Apr  9 2025, 08:55:54) [GCC 11.4.0]
