In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, ReLU, BatchNormalization
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import TensorBoard
import datetime
import os
import matplotlib.pyplot as plt
# tensorboard --logdir logs/fit

# 定义 TensorBoard 日志目录
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_images=True)

# 扩散模型生成网络
def build_simple_diffusion_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Conv2D(32, (3, 3), padding='same', activation='relu')(inputs)
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    outputs = Conv2D(3, (3, 3), padding='same', activation='sigmoid')(x)
    return Model(inputs, outputs, name="simple_diffusion_model")

input_shape = (64, 64, 3)
diffusion_model = build_simple_diffusion_model(input_shape)
diffusion_model.summary()

# 定义损失函数和优化器
diffusion_model.compile(optimizer='adam', loss='mse')

# 打印模型输入输出形状
print("模型输入形状:", diffusion_model.input_shape)
print("模型输出形状:", diffusion_model.output_shape)

# 使用ImageDataGenerator加载数据
datagen = ImageDataGenerator(
    rescale=1.0/255.0,
    validation_split=0.2  # 设置验证集比例
)

# 生成训练数据
train_generator = datagen.flow_from_directory(
    './DATA/train',  # 数据集路径
    target_size=(64, 64),  # 调整图片大小
    class_mode='input',  # 更改为 'input'
    subset='training'  # 设置为训练数据
)

# 生成验证数据
validation_generator = datagen.flow_from_directory(
    './DATA/train',  # 数据集路径
    target_size=(64, 64),  # 调整图片大小
    class_mode='input',  # 更改为 'input'
    subset='validation'  # 设置为验证数据
)

# 检查生成器是否正确生成数据
print("检查训练数据生成器...")
for i in range(3):  # 检查前三个批次
    batch = next(train_generator)
    if batch is None:
        print(f"批次 {i} 是 None")
    else:
        print(f"批次 {i} 形状: {batch[0].shape}, {batch[1].shape}")

print("检查验证数据生成器...")
for i in range(3):  # 检查前三个批次
    batch = next(validation_generator)
    if batch is None:
        print(f"批次 {i} 是 None")
    else:
        print(f"批次 {i} 形状: {batch[0].shape}, {batch[1].shape}")

print(f"训练数据的batch size: {train_generator.batch_size}")
print(f"验证数据的batch size: {validation_generator.batch_size}")

# 自定义回调函数以记录每一层的输出
class LayerOutputLogger(tf.keras.callbacks.Callback):
    def __init__(self, log_dir, generator):
        super(LayerOutputLogger, self).__init__()
        self.log_dir = log_dir
        self.generator = generator
        self.file_writers = {}

    def set_model(self, model):
        super(LayerOutputLogger, self).set_model(model)
        for layer in self.model.layers:
            if 'conv' in layer.name or 'relu' in layer.name or 'batch_normalization' in layer.name:
                self.file_writers[layer.name] = tf.summary.create_file_writer(os.path.join(self.log_dir, layer.name))

    def on_epoch_end(self, epoch, logs=None):
        try:
            x_batch, _ = next(self.generator)
            sample = x_batch[0:1]  # 选择一个样本
            for layer_name, file_writer in self.file_writers.items():
                layer_model = Model(inputs=self.model.input, outputs=self.model.get_layer(layer_name).output)
                feature_maps = layer_model.predict(sample)
                with file_writer.as_default():
                    for i in range(min(feature_maps.shape[-1], 3)):  # 只记录前三个特征图
                        tf.summary.image(f"{layer_name}_filter_{i}", feature_maps[..., i:i+1], step=epoch)
        except Exception as e:
            print(f"Error in LayerOutputLogger on_epoch_end: {e}")

# 定义训练参数
epochs = 10

# 训练模型
history = diffusion_model.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    epochs=epochs,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    callbacks=[tensorboard_callback, LayerOutputLogger(log_dir, train_generator)]
)




# 生成新样本
noise = np.random.normal(0, 1, (10, 64, 64, 3))
generated_images = diffusion_model.predict(noise)

# 显示生成的图片
plt.figure(figsize=(20, 2))
for i in range(10):
    plt.subplot(1, 10, i+1)
    plt.imshow(generated_images[i])
    plt.axis('off')
plt.tight_layout()
plt.show()

# 打印训练历史
train_losses = history.history['loss']
val_losses = history.history['val_loss']

plt.figure(figsize=(12, 4))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, ReLU, BatchNormalization, LeakyReLU
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import TensorBoard
import datetime
import os
import matplotlib.pyplot as plt

# 定义 TensorBoard 日志目录
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_images=True)

# 定义更复杂的扩散模型生成网络
def build_complex_diffusion_model(input_shape):
    inputs = Input(shape=input_shape)
    x = Conv2D(64, (3, 3), padding='same')(inputs)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)

    x = Conv2D(128, (3, 3), strides=2, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)

    x = Conv2D(256, (3, 3), strides=2, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)

    x = Conv2D(512, (3, 3), strides=2, padding='same')(x)
    x = LeakyReLU(alpha=0.2)(x)
    x = BatchNormalization()(x)

    x = Conv2DTranspose(256, (3, 3), strides=2, padding='same')(x)
    x = ReLU()(x)
    x = BatchNormalization()(x)

    x = Conv2DTranspose(128, (3, 3), strides=2, padding='same')(x)
    x = ReLU()(x)
    x = BatchNormalization()(x)

    x = Conv2DTranspose(64, (3, 3), strides=2, padding='same')(x)
    x = ReLU()(x)
    x = BatchNormalization()(x)

    outputs = Conv2D(3, (3, 3), padding='same', activation='sigmoid')(x)
    return Model(inputs, outputs, name="complex_diffusion_model")

input_shape = (64, 64, 3)
diffusion_model = build_complex_diffusion_model(input_shape)
diffusion_model.summary()

# 定义损失函数和优化器
diffusion_model.compile(optimizer='adam', loss='mse')

# 打印模型输入输出形状
print("模型输入形状:", diffusion_model.input_shape)
print("模型输出形状:", diffusion_model.output_shape)

# 使用ImageDataGenerator加载数据
datagen = ImageDataGenerator(
    rescale=1.0/255.0,
    validation_split=0.2  # 设置验证集比例
)

# 生成训练数据
train_generator = datagen.flow_from_directory(
    './DATA/train',  # 数据集路径
    target_size=(64, 64),  # 调整图片大小
    class_mode='input',  # 更改为 'input'
    subset='training',  # 设置为训练数据
    classes=['cats'] 
)

# 生成验证数据
validation_generator = datagen.flow_from_directory(
    './DATA/train',  # 数据集路径
    target_size=(64, 64),  # 调整图片大小
    class_mode='input',  # 更改为 'input'
    subset='validation' , # 设置为验证数据
    classes=['cats'] 
)

# 检查生成器是否正确生成数据
print("检查训练数据生成器...")
for i in range(3):  # 检查前三个批次
    batch = next(train_generator)
    if batch is None:
        print(f"批次 {i} 是 None")
    else:
        print(f"批次 {i} 形状: {batch[0].shape}, {batch[1].shape}")

print("检查验证数据生成器...")
for i in range(3):  # 检查前三个批次
    batch = next(validation_generator)
    if batch is None:
        print(f"批次 {i} 是 None")
    else:
        print(f"批次 {i} 形状: {batch[0].shape}, {batch[1].shape}")

print(f"训练数据的batch size: {train_generator.batch_size}")
print(f"验证数据的batch size: {validation_generator.batch_size}")

# 自定义回调函数以记录每一层的输出
class LayerOutputLogger(tf.keras.callbacks.Callback):
    def __init__(self, log_dir, generator):
        super(LayerOutputLogger, self).__init__()
        self.log_dir = log_dir
        self.generator = generator
        self.file_writers = {}

    def set_model(self, model):
        super(LayerOutputLogger, self).set_model(model)
        for layer in self.model.layers:
            if 'conv' in layer.name or 'relu' in layer.name or 'batch_normalization' in layer.name:
                self.file_writers[layer.name] = tf.summary.create_file_writer(os.path.join(self.log_dir, layer.name))

    def on_epoch_end(self, epoch, logs=None):
        try:
            x_batch, _ = next(self.generator)
            sample = x_batch[0:1]  # 选择一个样本
            for layer_name, file_writer in self.file_writers.items():
                layer_model = Model(inputs=self.model.input, outputs=self.model.get_layer(layer_name).output)
                feature_maps = layer_model.predict(sample)
                with file_writer.as_default():
                    for i in range(min(feature_maps.shape[-1], 3)):  # 只记录前三个特征图
                        tf.summary.image(f"{layer_name}_filter_{i}", feature_maps[..., i:i+1], step=epoch)
        except Exception as e:
            print(f"Error in LayerOutputLogger on_epoch_end: {e}")

# 定义训练参数
epochs = 50

# 训练模型
history = diffusion_model.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    epochs=epochs,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    callbacks=[tensorboard_callback, LayerOutputLogger(log_dir, train_generator)]
)

# 启动TensorBoard（在命令行中运行）
# !tensorboard --logdir logs/fit

# 生成新样本
noise = np.random.normal(0, 1, (10, 64, 64, 3))
generated_images = diffusion_model.predict(noise)

# 显示生成的图片
plt.figure(figsize=(20, 2))
for i in range(10):
    plt.subplot(1, 10, i+1)
    plt.imshow(generated_images[i])
    plt.axis('off')
plt.tight_layout()
plt.show()

# 打印训练历史
train_losses = history.history['loss']
val_losses = history.history['val_loss']

plt.figure(figsize=(12, 4))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
# 生成新样本
noise = np.random.normal(0, 2, (10, 64, 64, 3))
generated_images = diffusion_model.predict(noise)

# 显示生成的图片
plt.figure(figsize=(20, 2))
for i in range(10):
    plt.subplot(1, 10, i+1)
    plt.imshow(generated_images[i])
    plt.axis('off')
plt.tight_layout()
plt.show()


In [None]:
model_save_path = "diffusion_model.keras"
diffusion_model.save(model_save_path)

In [None]:
from tensorflow.keras.models import load_model

# 加载模型
model_load_path = "./DATA/model/diffusion_model.keras"
loaded_model = load_model(model_load_path)
print(f"Model loaded from {model_load_path}")

# 确认模型架构
loaded_model.summary()

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Conv2D, Conv2DTranspose, ReLU, BatchNormalization, LeakyReLU, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.callbacks import TensorBoard
import datetime
import os
import matplotlib.pyplot as plt

# 定义 TensorBoard 日志目录
log_dir = "logs/fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1, write_graph=True, write_images=True)

# 定义更复杂的扩散模型生成网络
def build_complex_diffusion_model(input_shape):
    inputs = Input(shape=input_shape, name='input_layer')
    x = Conv2D(64, (3, 3), padding='same', name='conv2d_1')(inputs)
    x = LeakyReLU(alpha=0.2, name='leaky_relu_1')(x)
    x = BatchNormalization(name='batch_norm_1')(x)

    x = Conv2D(128, (3, 3), strides=2, padding='same', name='conv2d_2')(x)
    x = LeakyReLU(alpha=0.2, name='leaky_relu_2')(x)
    x = BatchNormalization(name='batch_norm_2')(x)

    x = Conv2D(256, (3, 3), strides=2, padding='same', name='conv2d_3')(x)
    x = LeakyReLU(alpha=0.2, name='leaky_relu_3')(x)
    x = BatchNormalization(name='batch_norm_3')(x)
    x = Dropout(0.4, name='dropout_1')(x)

    x = Conv2D(512, (3, 3), strides=2, padding='same', name='conv2d_4')(x)
    x = LeakyReLU(alpha=0.2, name='leaky_relu_4')(x)
    x = BatchNormalization(name='batch_norm_4')(x)
    x = Dropout(0.4, name='dropout_2')(x)

    x = Conv2D(512, (3, 3), strides=2, padding='same', name='conv2d_5')(x)
    x = LeakyReLU(alpha=0.2, name='leaky_relu_5')(x)
    x = BatchNormalization(name='batch_norm_5')(x)
    x = Dropout(0.4, name='dropout_3')(x)

    x = Conv2DTranspose(512, (3, 3), strides=2, padding='same', name='conv2d_transpose_1')(x)
    x = ReLU(name='relu_1')(x)
    x = BatchNormalization(name='batch_norm_6')(x)
    x = Dropout(0.4, name='dropout_4')(x)

    x = Conv2DTranspose(256, (3, 3), strides=2, padding='same', name='conv2d_transpose_2')(x)
    x = ReLU(name='relu_2')(x)
    x = BatchNormalization(name='batch_norm_7')(x)
    x = Dropout(0.4, name='dropout_5')(x)

    x = Conv2DTranspose(128, (3, 3), strides=2, padding='same', name='conv2d_transpose_3')(x)
    x = ReLU(name='relu_3')(x)
    x = BatchNormalization(name='batch_norm_8')(x)

    x = Conv2DTranspose(64, (3, 3), strides=2, padding='same', name='conv2d_transpose_4')(x)
    x = ReLU(name='relu_4')(x)
    x = BatchNormalization(name='batch_norm_9')(x)

    outputs = Conv2D(3, (3, 3), padding='same', activation='sigmoid', name='output_layer')(x)
    return Model(inputs, outputs, name="complex_diffusion_model")

input_shape = (64, 64, 3)
diffusion_model = build_complex_diffusion_model(input_shape)
diffusion_model.summary()

# 定义损失函数和优化器
diffusion_model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0002), loss='mse')

# 打印模型输入输出形状
print("模型输入形状:", diffusion_model.input_shape)
print("模型输出形状:", diffusion_model.output_shape)

# 使用ImageDataGenerator加载数据
datagen = ImageDataGenerator(
    rescale=1.0/255.0,
    validation_split=0.2  # 设置验证集比例
)

# 生成训练数据，只从猫的目录中加载数据
train_generator = datagen.flow_from_directory(
    './DATA/train',  # 数据集路径
    target_size=(64, 64),  # 调整图片大小
    class_mode='input',  # 更改为 'input'
    subset='training',  # 设置为训练数据
    classes=['cats']  # 只加载cats文件夹
)

# 生成验证数据，只从猫的目录中加载数据
validation_generator = datagen.flow_from_directory(
    './DATA/train',  # 数据集路径
    target_size=(64, 64),  # 调整图片大小
    class_mode='input',  # 更改为 'input'
    subset='validation',  # 设置为验证数据
    classes=['cats']  # 只加载cats文件夹
)

# 检查生成器是否正确生成数据
print("检查训练数据生成器...")
for i in range(3):  # 检查前三个批次
    batch = next(train_generator)
    if batch is None:
        print(f"批次 {i} 是 None")
    else:
        print(f"批次 {i} 形状: {batch[0].shape}, {batch[1].shape}")

print("检查验证数据生成器...")
for i in range(3):  # 检查前三个批次
    batch = next(validation_generator)
    if batch is None:
        print(f"批次 {i} 是 None")
    else:
        print(f"批次 {i} 形状: {batch[0].shape}, {batch[1].shape}")

print(f"训练数据的batch size: {train_generator.batch_size}")
print(f"验证数据的batch size: {validation_generator.batch_size}")

# 自定义回调函数以记录每一层的输出
class LayerOutputLogger(tf.keras.callbacks.Callback):
    def __init__(self, log_dir, generator):
        super(LayerOutputLogger, self).__init__()
        self.log_dir = log_dir
        self.generator = generator
        self.file_writers = {}

    def set_model(self, model):
        super(LayerOutputLogger, self).set_model(model)
        for layer in self.model.layers:
            if 'conv' in layer.name or 'relu' in layer.name or 'batch_normalization' in layer.name:
                self.file_writers[layer.name] = tf.summary.create_file_writer(os.path.join(self.log_dir, layer.name))

    def on_epoch_end(self, epoch, logs=None):
        try:
            x_batch, _ = next(self.generator)
            sample = x_batch[0:1]  # 选择一个样本
            for layer_name, file_writer in self.file_writers.items():
                layer_model = Model(inputs=self.model.input, outputs=self.model.get_layer(layer_name).output)
                feature_maps = layer_model.predict(sample)
                with file_writer.as_default():
                    for i in range(min(feature_maps.shape[-1], 3)):  # 只记录前三个特征图
                        tf.summary.image(f"{layer_name}_filter_{i}", feature_maps[..., i:i+1], step=epoch)
        except Exception as e:
            print(f"Error in LayerOutputLogger on_epoch_end: {e}")

# 定义训练参数
epochs = 100

# 训练模型
history = diffusion_model.fit(
    train_generator,
    steps_per_epoch=train_generator.samples // train_generator.batch_size,
    epochs=epochs,
    validation_data=validation_generator,
    validation_steps=validation_generator.samples // validation_generator.batch_size,
    callbacks=[tensorboard_callback, LayerOutputLogger(log_dir, train_generator)]
)

# 启动TensorBoard（在命令行中运行）
# !tensorboard --logdir logs/fit

# 生成新样本
noise = np.random.normal(0, 1, (10, 64, 64, 3))
generated_images = diffusion_model.predict(noise)

# 显示生成的图像
plt.figure(figsize=(20, 2))
for i in range(10):
    plt.subplot(1, 10, i+1)
    plt.imshow(generated_images[i])
    plt.axis('off')
plt.tight_layout()
plt.show()

# 打印训练历史
train_losses = history.history['loss']
val_losses = history.history['val_loss']

plt.figure(figsize=(12, 4))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.tight_layout()
plt.show()