## Import packages

In [1]:
## new drive mount for when pipeline is run without training part of colab

from google.colab import drive
import os
import numpy as np
import tensorflow as tf
import matplotlib as mpl

import IPython.display as display
import PIL.Image

# Mount Google Drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from diffusers import StableDiffusionPipeline
import torch
from PIL import Image

# Load your trained model
model_path = "/content/drive/MyDrive/Festify_final/converted_model"
pipe = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32)
pipe = pipe.to("cpu")



The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

In [3]:
def pathToImg(path):
  '''
  Converts an image path to a tf-readable image
  '''
  # Load the input image
  input_image = Image.open(path).convert("RGB")
  input_image = input_image.resize((300, 300))  # Change size if needed

  # Convert to a NumPy array
  img_array = np.array(input_image)
  # Normalize the image for pre-trained model (if required)
  img_array = tf.keras.applications.vgg16.preprocess_input(img_array)
  return img_array

def randImg():
  '''
  Returns random noise of the proper size and type
  '''
  return np.random.randint(low=0,high=255,size=(300,300,3), dtype=np.uint8)

In [4]:
base_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet')

# Maximize the activations of these layers
names = ['mixed3', 'mixed5']
layers = [base_model.get_layer(name).output for name in names]

# Create the feature extraction model
dream_model = tf.keras.Model(inputs=base_model.input, outputs=layers)

def deprocess(img):
  img = 255*(img + 1.0)/2.0
  return tf.cast(img, tf.uint8)

class DeepDream(tf.Module):
  def __init__(self, model):
    self.model = model

  @tf.function(
      input_signature=(
        tf.TensorSpec(shape=[None,None,3], dtype=tf.float32),
        tf.TensorSpec(shape=[], dtype=tf.int32),
        tf.TensorSpec(shape=[], dtype=tf.float32),)
  )
  def __call__(self, img, steps, step_size):
      print("Tracing")
      loss = tf.constant(0.0)
      for n in tf.range(steps):
        with tf.GradientTape() as tape:
          # This needs gradients relative to `img`
          # `GradientTape` only watches `tf.Variable`s by default
          tape.watch(img)
          loss = calc_loss(img, self.model)

        # Calculate the gradient of the loss with respect to the pixels of the input image.
        gradients = tape.gradient(loss, img)

        # Normalize the gradients.
        gradients /= tf.math.reduce_std(gradients) + 1e-8

        # In gradient ascent, the "loss" is maximized so that the input image increasingly "excites" the layers.
        # You can update the image by directly adding the gradients (because they're the same shape!)
        img = img + gradients*step_size
        img = tf.clip_by_value(img, -1, 1)

      return loss, img

deepdream = DeepDream(dream_model)

def calc_loss(img, model):
  # Pass forward the image through the model to retrieve the activations.
  # Converts the image into a batch of size 1.
  img_batch = tf.expand_dims(img, axis=0)
  layer_activations = model(img_batch)
  if len(layer_activations) == 1:
    layer_activations = [layer_activations]

  losses = []
  for act in layer_activations:
    loss = tf.math.reduce_mean(act)
    losses.append(loss)

  return  tf.reduce_sum(losses)

def run_deep_dream_simple(img, steps=100, step_size=0.01):
  # Convert from uint8 to the range expected by the model.
  img = tf.keras.applications.inception_v3.preprocess_input(img)
  img = tf.convert_to_tensor(img)
  step_size = tf.convert_to_tensor(step_size)
  steps_remaining = steps
  step = 0
  while steps_remaining:
    if steps_remaining>100:
      run_steps = tf.constant(100)
    else:
      run_steps = tf.constant(steps_remaining)
    steps_remaining -= run_steps
    step += run_steps

    loss, img = deepdream(img, run_steps, tf.constant(step_size))

  result = deprocess(img)

  return result

def random_roll(img, maxroll):
  # Randomly shift the image to avoid tiled boundaries.
  shift = tf.random.uniform(shape=[2], minval=-maxroll, maxval=maxroll, dtype=tf.int32)
  img_rolled = tf.roll(img, shift=shift, axis=[0,1])
  return shift, img_rolled

class TiledGradients(tf.Module):
  def __init__(self, model):
    self.model = model

  @tf.function(
      input_signature=(
        tf.TensorSpec(shape=[None,None,3], dtype=tf.float32),
        tf.TensorSpec(shape=[2], dtype=tf.int32),
        tf.TensorSpec(shape=[], dtype=tf.int32),)
  )
  def __call__(self, img, img_size, tile_size=512):
    shift, img_rolled = random_roll(img, tile_size)

    # Initialize the image gradients to zero.
    gradients = tf.zeros_like(img_rolled)

    # Skip the last tile, unless there's only one tile.
    xs = tf.range(0, img_size[1], tile_size)[:-1]
    if not tf.cast(len(xs), bool):
      xs = tf.constant([0])
    ys = tf.range(0, img_size[0], tile_size)[:-1]
    if not tf.cast(len(ys), bool):
      ys = tf.constant([0])

    for x in xs:
      for y in ys:
        # Calculate the gradients for this tile.
        with tf.GradientTape() as tape:
          # This needs gradients relative to `img_rolled`.
          # `GradientTape` only watches `tf.Variable`s by default.
          tape.watch(img_rolled)

          # Extract a tile out of the image.
          img_tile = img_rolled[y:y+tile_size, x:x+tile_size]
          loss = calc_loss(img_tile, self.model)

        # Update the image gradients for this tile.
        gradients = gradients + tape.gradient(loss, img_rolled)

    # Undo the random shift applied to the image and its gradients.
    gradients = tf.roll(gradients, shift=-shift, axis=[0,1])

    # Normalize the gradients.
    gradients /= tf.math.reduce_std(gradients) + 1e-8

    return gradients

get_tiled_gradients = TiledGradients(dream_model)

def run_deep_dream_with_octaves(img, steps_per_octave=100, step_size=0.01,
                                octaves=range(-2,3), octave_scale=1.3):
  base_shape = tf.shape(img)
  img = tf.keras.utils.img_to_array(img)
  img = tf.keras.applications.inception_v3.preprocess_input(img)

  initial_shape = img.shape[:-1]
  img = tf.image.resize(img, initial_shape)
  for octave in octaves:
    # Scale the image based on the octave
    new_size = tf.cast(tf.convert_to_tensor(base_shape[:-1]), tf.float32)*(octave_scale**octave)
    new_size = tf.cast(new_size, tf.int32)
    img = tf.image.resize(img, new_size)

    for step in range(steps_per_octave):
      gradients = get_tiled_gradients(img, new_size)
      img = img + gradients*step_size
      img = tf.clip_by_value(img, -1, 1)

  result = deprocess(img)
  return result

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5
[1m87910968/87910968[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 0us/step


### Stable diffusion

In [27]:
# convert trained model to stable diffusion pipeline

from diffusers import StableDiffusionPipeline, UNet2DConditionModel, AutoencoderKL
import torch

checkpoint_dir = "/content/drive/MyDrive/Festify_final/checkpoint-1000"
output_dir = "/content/drive/MyDrive/Festify_final/converted_model"

# Load and assemble the components
unet = UNet2DConditionModel.from_pretrained(checkpoint_dir, subfolder="unet")
vae = AutoencoderKL.from_pretrained("CompVis/stable-diffusion-v1-4", subfolder="vae")
text_encoder = StableDiffusionPipeline.from_pretrained("CompVis/stable-diffusion-v1-4").text_encoder

# Save the full pipeline
pipeline = StableDiffusionPipeline.from_pretrained(
    "CompVis/stable-diffusion-v1-4",
    unet=unet,
    vae=vae,
    text_encoder=text_encoder,
    torch_dtype=torch.float16
)
pipeline.save_pretrained(output_dir)
print("Model converted and saved successfully!")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


vae/config.json:   0%|          | 0.00/551 [00:00<?, ?B/s]

diffusion_pytorch_model.safetensors:   0%|          | 0.00/335M [00:00<?, ?B/s]

model_index.json:   0%|          | 0.00/541 [00:00<?, ?B/s]

Fetching 16 files:   0%|          | 0/16 [00:00<?, ?it/s]

model.safetensors:   0%|          | 0.00/1.22G [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/492M [00:00<?, ?B/s]

(…)ature_extractor/preprocessor_config.json:   0%|          | 0.00/342 [00:00<?, ?B/s]

(…)kpoints/scheduler_config-checkpoint.json:   0%|          | 0.00/209 [00:00<?, ?B/s]

safety_checker/config.json:   0%|          | 0.00/4.56k [00:00<?, ?B/s]

text_encoder/config.json:   0%|          | 0.00/592 [00:00<?, ?B/s]

scheduler/scheduler_config.json:   0%|          | 0.00/313 [00:00<?, ?B/s]

tokenizer/merges.txt:   0%|          | 0.00/525k [00:00<?, ?B/s]

diffusion_pytorch_model.safetensors:   0%|          | 0.00/3.44G [00:00<?, ?B/s]

tokenizer/special_tokens_map.json:   0%|          | 0.00/472 [00:00<?, ?B/s]

tokenizer/tokenizer_config.json:   0%|          | 0.00/806 [00:00<?, ?B/s]

unet/config.json:   0%|          | 0.00/743 [00:00<?, ?B/s]

tokenizer/vocab.json:   0%|          | 0.00/1.06M [00:00<?, ?B/s]

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

Model converted and saved successfully!


In [26]:
from diffusers import StableDiffusionPipeline
from diffusers import StableDiffusionImg2ImgPipeline
import torch
from PIL import Image

# Load trained model
model_path = "/content/drive/MyDrive/Festify_final/converted_model"
pipe = StableDiffusionImg2ImgPipeline.from_pretrained(model_path, torch_dtype=torch.float32)
pipe = pipe.to("cpu")

def stablediff(input_img, output_str="output_image", prompt="Festus at the beach"):
  '''
  img-2-img stable diffusion on inputted image.
  '''
  # Generate img2img output
  output = pipe(prompt=prompt, image=input_img, strength=0.6)

  # Save and display
  output.images[0].save(f"{output_str}.png")
  return output.images[0]

img = stablediff(pathToImg("optimized_img_11.png"), output_str=f"output_image_{filter}", prompt="festus")
display(img)

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]



  0%|          | 0/30 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
from diffusers import StableDiffusionPipeline
from diffusers import StableDiffusionImg2ImgPipeline
import torch
from PIL import Image

# Load trained model
model_path = "/content/drive/MyDrive/Festify_final/converted_model"
pipe = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32)
pipe = pipe.to("cpu")

def stablediff(output_str="output_image", prompt="Festus at the beach"):
  '''
  img-2-img stable diffusion on inputted image.
  '''
  # Generate img2img output
  output = pipe(prompt=prompt)

  # Save and display
  output.images[0].save(f"{output_str}.png")
  return output.images[0]

img = stablediff(output_str=f"output_image_{filter}", prompt="festus playing hockey")
display(img)

Loading pipeline components...:   0%|          | 0/7 [00:00<?, ?it/s]

  0%|          | 0/50 [00:00<?, ?it/s]

### Feature visualization

In [6]:
## From LessWrong https://www.lesswrong.com/posts/raRSW3e9iYMwkqjBX/cnn-feature-visualization-in-50-lines-of-code

import tensorflow as tf
# base_model = tf.keras.applications.VGG19(include_top=False, weights='imagenet')
base_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet')
def maximize_activation(starting_img, target_layer="mixed0", target_index=0,\
                        steps=10, step_size=0.1):

    # Take the network and cut it off at the layer we want to analyze,
    # i.e. we only need the part from the input to the target_layer.
    target = [base_model.get_layer(target_layer).output]
    part_model = tf.keras.Model(inputs=base_model.input, outputs=target)

    # The next part is the function to maximize the target layer/node by
    # adjusting the input, equivalent to the usual gradient descent but
    # gradient ascent. Run an optimization loop:
    def gradient_ascent(img, steps, step_size):
        loss = tf.constant(0.0)
        for n in tf.range(steps):
            # As in normal NN training, you want to record the computation
            # of the forward-pass (the part_model call below) to compute the
            # gradient afterwards. This is what tf.GradientTape does.
            with tf.GradientTape() as tape:
                tape.watch(img)
                # Forward-pass (compute the activation given our image)
                activation = part_model(tf.expand_dims(img, axis=0))
                # The activation will be of shape (1,N,N,L) where N is related to
                # the resolution of the input image (assuming our target layer is
                # a convolutional filter), and L is the size of the layer. E.g. for a
                # 256x256 image in "block4_conv1" of VGG19, this will be
                # (1,32,32,512) -- we select one of the 512 nodes (index) and
                # average over the rest (you can average selectively to affect
                # only part of the image but there's not really a point):
                loss = tf.math.reduce_mean(activation[:,:,:,target_index])

            # Get the gradient, i.e. derivative of "loss" with respect to input
            # and normalize.
            gradients = tape.gradient(loss, img)
            gradients /= tf.math.reduce_std(gradients)

            # In the final step move the image in the direction of the gradient to
            # increate the "loss" (our targeted activation). Note that the sign here
            # is opposite to the typical gradient descent (our "loss" is the target
            # activation which we maximize, not something we minimize).
            img = img + gradients*step_size
            img = tf.clip_by_value(img, -1, 1)
        return loss, img

    # Preprocessing of the image (converts from [0..255] to [-1..1]
    img = tf.keras.applications.inception_v3.preprocess_input(starting_img)
    img = tf.convert_to_tensor(img)
    # Run the gradient ascent loop
    loss, img = gradient_ascent(img, tf.constant(steps), tf.constant(step_size))
    # Convert back to [0..255] and return the new image
    img = tf.cast(255*(img + 1.0)/2.0, tf.uint8)
    return img



### Channel Visualization

In [18]:
# main function - runs stable diffusion and feature visualization combination on inputted filter

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from diffusers import StableDiffusionPipeline
from diffusers import StableDiffusionImg2ImgPipeline
import torch
from PIL import Image
import tensorflow as tf
from IPython.display import display

output_folder = '/content/OptimizedImages'
os.makedirs(output_folder, exist_ok=True)

## loop through different filters
def visualize(filter):

  # visualize filter
  # optimized_img = maximize_activation(randImg(), target_layer="mixed4", target_index=filter, steps=50, step_size=0.2)
  # image = Image.fromarray(optimized_img.numpy())
  # display(image)

  # generate image from filter  visualization
  newImg = stablediff(pathToImg("optimized_img_11.png"), output_str=f"output_image_{filter}", prompt="festus")
  display(newImg)

  img = pathToImg("/content/drive/MyDrive/Festus/festus.jpg")
  base_shape = tf.shape(img)[:-1]
  img = run_deep_dream_with_octaves(img=img, step_size=0.01)

  file_name = f'filter_{filter}.png'
  file_path = os.path.join(output_folder, file_name)
  toVizImg = tf.image.resize(img, base_shape)
  toVizImg = tf.image.convert_image_dtype(toVizImg/255.0, dtype=tf.uint8)
  display(toVizImg)

In [19]:
visualize(6)

  0%|          | 0/30 [00:00<?, ?it/s]

TypeError: a bytes-like object is required, not 'Image'