# Stable Diffusion for Text-to-Image Generation
### Stable Diffusion is a cutting-edge generative AI model designed to create high-quality images from textual descriptions using a diffusion process. It assumes that images can be generated by iteratively refining random noise into coherent visuals guided by text prompts, making it highly effective for tasks like generating custom visuals when there is a strong alignment between text and image features. This flexibility enables rapid prototyping and creative applications, ideal for enhancing user engagement in scenarios like financial dashboards. However, its computational intensity and sensitivity to prompt quality can limit its performance without proper tuning.

### Use Stable Diffusion as a baseline model for text-to-image generation, refining it based on the use case.

| Aspect | Details |
| :- | :- |
| Use For           | Text-to-image generation (e.g., creating financial charts from prompts)<br>Can be extended to image inpainting or style transfer |
| Key Assumptions   | - Strong alignment between text prompts and image features<br>- Sufficient data for learning text-image relationships<br>- Availability of computational resources for training |
| Advantages        | - Generates high-quality, diverse images<br>- Flexible for creative applications<br>- Pre-trained models available for quick deployment |
| Disadvantages     | - Computationally intensive during training and inference<br>- Prone to generating irrelevant or low-quality images without fine-tuning<br>- Sensitive to prompt quality and specificity |
| Avoid When        | - Very small datasets with limited samples<br>- Low computational resources<br>- Applications requiring real-time generation |
| Real-World Use Case | Financial dashboard generation (e.g., creating budget charts from text prompts)<br>Marketing content creation (e.g., generating visuals for financial reports) |

## Problem Description
### Generate small images of fashion items from text prompts (e.g., "Generate a black T-shirt") using a simplified Stable Diffusion-inspired model trained on the Fashion MNIST dataset from Kaggle. This is a text-to-image synthesis task where the model learns to refine noise into images based on text labels, optimized for memory efficiency.

Use the Fashion MNIST dataset from Kaggle: https://www.kaggle.com/datasets/zalando-research/fashionmnist

In [1]:
import pandas as pd
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
import numpy as np
import matplotlib.pyplot as plt
import os

2025-07-02 15:44:43.342961: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-07-02 15:44:43.354582: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1751467483.365119 2721161 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1751467483.368421 2721161 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1751467483.378161 2721161 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking 

In [2]:
project_dir = './19_stablediffusion'
data_dir = os.path.join(project_dir, 'data')
model_dir = os.path.join(project_dir, 'model')

In [3]:
def setup_gpu():
    ## tf version
    print("TensorFlow version:", tf.__version__)

    #### GPU Optimisation code ####
    print("GPUs:",tf.config.list_physical_devices('GPU'))

    print("GPUs: Allocate GPU Memory and create a new session")

    # Get the GPU memory fraction to allocate
    gpu_memory_fraction = 0.5

    # Create GPUOptions with the fraction of GPU memory to allocate
    gpu_options = tf.compat.v1.GPUOptions(per_process_gpu_memory_fraction=gpu_memory_fraction)

    # Create a session with the GPUOptions
    session = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(gpu_options=gpu_options))

In [4]:
setup_gpu()

TensorFlow version: 2.19.0
GPUs: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
GPUs: Allocate GPU Memory and create a new session


I0000 00:00:1751467487.237764 2721161 gpu_device.cc:2019] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 5119 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3080, pci bus id: 0000:01:00.0, compute capability: 8.6


In [5]:
# check TensorFlow version and handle AUTOTUNE compatibility
print("TensorFlow version:", tf.__version__)
if hasattr(tf.data, 'AUTOTUNE'):
    AUTOTUNE = tf.data.AUTOTUNE
else:
    AUTOTUNE = 2  # Fallback to a fixed prefetch buffer size (e.g., 2)
    print("Warning: tf.data.AUTOTUNE not available, using fallback value 2.")

TensorFlow version: 2.19.0


In [6]:
# load Fashion MNIST dataset from Kaggle
def load_fashionmnist():
    # assuming dataset is downloaded to data_dir (update path as needed)
    train_data = pd.read_csv(os.path.join(data_dir, 'fashion-mnist_train.csv'))
    test_data = pd.read_csv(os.path.join(data_dir, 'fashion-mnist_test.csv'))
    
    # extract features and labels
    X_train = train_data.drop('label', axis=1).values / 255.0
    y_train = to_categorical(train_data['label'].values)
    X_test = test_data.drop('label', axis=1).values / 255.0
    y_test = to_categorical(test_data['label'].values)
    
    # reshape to 28x28x1 images
    X_train = X_train.reshape(-1, 28, 28, 1)
    X_test = X_test.reshape(-1, 28, 28, 1)
    
    return (X_train, y_train), (X_test, y_test)

In [7]:
# preprocess data with text labels
def preprocess_data(X, y):
    # map labels to text prompts 
    label_to_text = {
        0: "T-shirt/top", 1: "Trouser", 2: "Pullover", 3: "Dress", 4: "Coat",
        5: "Sandal", 6: "Shirt", 7: "Sneaker", 8: "Bag", 9: "Ankle boot"
    }
    text_labels = np.array([label_to_text[np.argmax(yi)] for yi in y])
    return X, text_labels

In [8]:
# create vocabulary from text labels
def create_vocabulary(text_labels):
    vocab = {word: i+1 for i, word in enumerate(set(word for text in text_labels for word in text.split()))}
    vocab['<PAD>'] = 0
    return vocab

# simple text encoder with tokenization
def tokenize_text(texts, vocab, max_length=5):
    tokenized = []
    for text in texts:
        # Ensure text is a string and handle NumPy string objects
        text_str = str(text) if hasattr(text, 'decode') else str(text)
        tokens = [vocab.get(word, 0) for word in text_str.split()[:int(max_length)]]
        tokens = tokens + [0] * (int(max_length) - len(tokens)) if len(tokens) < int(max_length) else tokens[:int(max_length)]
        tokenized.append(tokens)
    return tf.convert_to_tensor(tokenized, dtype=tf.int32)

In [9]:
# define a basic U-Net for diffusion
class SimpleUNet(tf.keras.Model):
    def __init__(self):
        super(SimpleUNet, self).__init__()
        self.conv1 = tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu')
        self.pool = tf.keras.layers.MaxPooling2D(pool_size=(2, 2))  # Reduce to 14x14
        self.conv2 = tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu')
        self.upconv = tf.keras.layers.Conv2DTranspose(16, 3, strides=2, padding='same', output_padding=1, activation='relu')  # Upsample to 28x28
        self.conv3 = tf.keras.layers.Conv2D(1, 3, padding='same', activation='sigmoid')
        self.text_pool = tf.keras.layers.GlobalAveragePooling1D()
        self.text_dense = tf.keras.layers.Dense(14 * 14, activation='relu')  # Match reduced size
        self.text_up = tf.keras.layers.UpSampling2D(size=(2, 2))  # Upsample text to 28x28

    def call(self, inputs):
        noise, text_emb = inputs
        # Process text embedding to reduce sequence dimension
        text = self.text_pool(text_emb)  # Reduce (32, 5, 128) to (32, 128)
        text = self.text_dense(text)  # Map to (32, 196)
        text = tf.reshape(text, (-1, 14, 14, 1))  # Reshape to 14x14
        text = self.text_up(text)  # Upsample to 28x28
        
        # U-Net: Refine noise with text conditioning
        x = self.conv1(noise)  # (32, 28, 28, 16)
        x = self.pool(x)  # (32, 14, 14, 16)
        x = self.conv2(x)  # (32, 14, 14, 32)
        x = self.upconv(x)  # (32, 28, 28, 16)
        x = tf.keras.layers.Concatenate(axis=-1)([x, text])  # (32, 28, 28, 17)
        x = self.conv3(x)  # Reduce to 1 channel
        return x


In [10]:
# set up diffusion parameters
timesteps = 20  # reduced for efficiency
beta = np.linspace(0.0001, 0.02, timesteps)
alpha = 1.0 - beta
alpha_cumprod = np.cumprod(alpha, dtype=np.float32)

In [11]:
# add noise to images
def add_noise(images, timestep):
    noise = tf.random.normal(shape=tf.shape(images), dtype=tf.float32)
    sqrt_alpha_cumprod = tf.sqrt(alpha_cumprod[timestep])
    sqrt_one_minus_alpha_cumprod = tf.sqrt(1 - alpha_cumprod[timestep])
    noisy_images = sqrt_alpha_cumprod * images + sqrt_one_minus_alpha_cumprod * noise
    return noisy_images, noise

In [12]:
# initialize and compile the model
model = SimpleUNet()
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')

I0000 00:00:1751467487.297303 2721161 gpu_device.cc:2019] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 5119 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 3080, pci bus id: 0000:01:00.0, compute capability: 8.6


In [13]:
# load and preprocess data
(X_train, y_train), (X_test, y_test) = load_fashionmnist()
X_train, text_train = preprocess_data(X_train, y_train)
X_test, text_test = preprocess_data(X_test, y_test)

# create vocabulary and initialize text encoder
vocab = create_vocabulary(text_train)
text_encoder = tf.keras.layers.Embedding(input_dim=len(vocab), output_dim=128)

# convert text to embeddings for training
text_train_emb = text_encoder(tokenize_text(text_train, vocab))
text_test_emb = text_encoder(tokenize_text(text_test, vocab))

In [14]:
# training loop with diffusion steps
for epoch in range(10):
    print(f"Epoch {epoch + 1}/10")
    for i in range(0, len(X_train), 32):  # Batch manually due to small dataset
        batch_images = X_train[i:i+32]
        batch_text = text_train_emb[i:i+32]
        for t in range(timesteps):
            noisy_images, noise = add_noise(batch_images, t)
            model.fit([noisy_images, batch_text], batch_images, batch_size=32, epochs=1, verbose=0)

Epoch 1/10


I0000 00:00:1751467491.568818 2721320 service.cc:152] XLA service 0x7fc1ec003e70 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1751467491.568848 2721320 service.cc:160]   StreamExecutor device (0): NVIDIA GeForce RTX 3080, Compute Capability 8.6
2025-07-02 15:44:51.587607: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:269] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
I0000 00:00:1751467491.703388 2721320 cuda_dnn.cc:529] Loaded cuDNN version 90501


I0000 00:00:1751467493.643823 2721320 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


Epoch 2/10
Epoch 3/10
Epoch 4/10


KeyboardInterrupt: 

In [None]:
# function to generate an image from a text prompt
def generate_image(model, text_emb, timesteps):
    noise = tf.random.normal(shape=(1, 28, 28, 1), dtype=tf.float32)
    for t in reversed(range(timesteps)):
        predicted_noise = model.predict([noise, text_emb], verbose=0)
        noise = (noise - beta[t] * predicted_noise) / tf.sqrt(alpha[t])
    return noise

In [None]:
# test with a sample prompt
sample_prompt = tokenize_text(["black T-shirt"])
sample_emb = text_encoder(sample_prompt)
generated_image = generate_image(model, sample_emb, timesteps)

# display the result
plt.imshow(generated_image[0, :, :, 0], cmap='gray')
plt.title("Generated Fashion Item")
plt.axis('off')
plt.show()


In [None]:
# save the model for later use
model_filename = os.path.join(model_dir, 'sd_model.h5')
model.save(model)
print("Model saved successfully.")

# load and test the saved model
loaded_model = tf.keras.models.load_model(os.path.join(model_dir, 'model.h5'), custom_objects={'SimpleUNet': SimpleUNet})
test_image = generate_image(loaded_model, sample_emb, timesteps)
plt.imshow(test_image[0, :, :, 0], cmap='gray')
plt.title("Test from Loaded Model")
plt.axis('off')
plt.show()