In [5]:
from ultralytics import YOLO
import cv2
import os
import numpy as np
from ultralytics.utils.plotting import Annotator, colors

# Load the YOLO segmentation model
model = YOLO('yolo11x-seg.pt')
names = model.model.names  # Correcting to access the 'names' attribute

# Define video path and open video file
video_path = os.path.join(os.getcwd(), 'OurPlanet.mp4')
cap = cv2.VideoCapture(video_path)

# Retrieve video properties
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))

# Define output video writer
out = cv2.VideoWriter("instance-segmentation.avi", cv2.VideoWriter_fourcc(*"MJPG"), fps, (w, h))

# Process each frame in the video
while True:
    ret, im0 = cap.read()
    if not ret:
        break

    # Run model prediction on the current frame
    results = model.predict(im0)
    annotator = Annotator(im0, line_width=2)

    # Check if segmentation masks are available
    if results[0].masks is not None:
        clss = results[0].boxes.cls.cpu().tolist()
        masks = results[0].masks.xy
        for mask, cls in zip(masks, clss):
            # Use 'label' instead of 'det_label'
            annotator.seg_bbox(mask=mask, mask_color=colors(int(cls), True), label=names[int(cls)])

    # Write annotated frame to output video and display it
    out.write(im0)
    cv2.imshow("instance-segmentation", im0)

    if cv2.waitKey(1) & 0xFF == ord("q"):
        break

# Release resources
out.release()
cap.release()
cv2.destroyAllWindows()



0: 384x640 (no detections), 89.1ms
Speed: 3.0ms preprocess, 89.1ms inference, 23.9ms postprocess per image at shape (1, 3, 384, 640)


error: OpenCV(4.10.0) D:\a\opencv-python\opencv-python\opencv\modules\highgui\src\window.cpp:1301: error: (-2:Unspecified error) The function is not implemented. Rebuild the library with Windows, GTK+ 2.x or Cocoa support. If you are on Ubuntu or Debian, install libgtk2.0-dev and pkg-config, then re-run cmake or configure script in function 'cvShowImage'


# Transformer Autoencoder

In [13]:
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision.transforms.functional import to_pil_image
import numpy as np
from PIL import Image, ImageTk
import tkinter as tk
from tkinter import Label, Frame

# Define the Transformer-based Autoencoder model
class TransformerAutoencoder(nn.Module):
    def __init__(self, img_size=224, patch_size=16, emb_dim=512, num_heads=8, num_layers=6):
        super(TransformerAutoencoder, self).__init__()
        assert img_size % patch_size == 0, 'Image size must be divisible by patch size'
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.emb_dim = emb_dim
        self.patch_dim = 3 * patch_size * patch_size  # Assuming RGB images

        # Linear projection of flattened patches
        self.patch_embedding = nn.Linear(self.patch_dim, emb_dim)
        # Positional embedding
        self.positional_embedding = nn.Parameter(torch.randn(1, self.num_patches, emb_dim))

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        # Output projection to reconstruct patches
        self.output_projection = nn.Linear(emb_dim, self.patch_dim)

        # Target mask for causal decoding
        self.tgt_mask = nn.Transformer.generate_square_subsequent_mask(self.num_patches).to(
            torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        )

    def forward(self, x):
        batch_size = x.size(0)
        # Divide image into patches
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        # x shape: [batch_size, channels, num_patches_h, num_patches_w, patch_size, patch_size]
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous()
        # x shape: [batch_size, num_patches_h, num_patches_w, channels, patch_size, patch_size]
        x = x.view(batch_size, -1, self.patch_dim)
        # x shape: [batch_size, num_patches, patch_dim]

        # Patch embeddings
        x = self.patch_embedding(x)
        # x shape: [batch_size, num_patches, emb_dim]

        # Add positional encoding
        x = x + self.positional_embedding  # [1, num_patches, emb_dim] broadcasts over batch_size

        # Encode patches
        memory = self.encoder(x)
        # memory shape: [batch_size, num_patches, emb_dim]

        # Prepare target sequence for decoder (shifted right)
        tgt = torch.zeros_like(x)
        tgt[:, 1:, :] = x[:, :-1, :]  # Shift input embeddings to the right
        tgt = tgt + self.positional_embedding

        # Decode patches
        output = self.decoder(tgt, memory, tgt_mask=self.tgt_mask)
        # output shape: [batch_size, num_patches, emb_dim]

        # Project back to patch dimension
        reconstructed_patches = self.output_projection(output)
        # reconstructed_patches shape: [batch_size, num_patches, patch_dim]

        # Reconstruct image from patches
        reconstructed = reconstructed_patches.view(
            batch_size,
            self.img_size // self.patch_size,
            self.img_size // self.patch_size,
            3,
            self.patch_size,
            self.patch_size
        )
        reconstructed = reconstructed.permute(0, 3, 1, 4, 2, 5).contiguous()
        reconstructed = reconstructed.view(batch_size, 3, self.img_size, self.img_size)
        reconstructed = torch.sigmoid(reconstructed)  # Apply sigmoid to get pixel values between 0 and 1

        return memory, reconstructed

# Initialize the model, optimizer, and loss function
img_size = 224  # Input image resolution
autoencoder = TransformerAutoencoder(
    img_size=img_size,
    patch_size=16,
    emb_dim=512,
    num_heads=8,
    num_layers=6
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder.to(device)

optimizer = optim.Adam(autoencoder.parameters(), lr=0.0001)
criterion = nn.MSELoss()  # Reconstruction loss

# Video capture
cap = cv2.VideoCapture(0)

# Transform to preprocess the frame
transform = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
])

# Initialize Tkinter window
window = tk.Tk()
window.title("Real-time Transformer Autoencoder Visualization")

# Frame for organizing the input video, latent space, and reconstructed video
main_frame = Frame(window)
main_frame.pack()

# Labels for each of the three outputs
video_label = Label(main_frame, text="Input Video", font=("Arial", 12, "bold"))
video_label.grid(row=0, column=0)

latent_label = Label(main_frame, text="Latent Space", font=("Arial", 12, "bold"))
latent_label.grid(row=0, column=1)

output_label = Label(main_frame, text="Output Video", font=("Arial", 12, "bold"))
output_label.grid(row=0, column=2)

# Label to display the latent space size
latent_size_label = Label(window, text="", font=("Arial", 12))
latent_size_label.pack()

def update_frame():
    # Capture frame-by-frame
    ret, frame = cap.read()
    if not ret:
        print("Failed to capture video frame")
        window.after(10, update_frame)
        return

    # Convert frame to PIL image
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img_pil = Image.fromarray(rgb_frame)
    img_resized = img_pil.resize((img_size, img_size))

    # Display original frame
    img_tk = ImageTk.PhotoImage(image=img_resized)
    video_label.imgtk = img_tk
    video_label.configure(image=img_tk)

    # Preprocess frame for model input
    input_image = transform(img_resized).unsqueeze(0).to(device)  # Shape: [1, 3, H, W]

    # Perform a training step
    autoencoder.train()
    optimizer.zero_grad()

    latent_space, reconstructed = autoencoder(input_image)  # Forward pass
    loss = criterion(reconstructed, input_image)            # Compute loss
    loss.backward()                                         # Backpropagation

    # Gradient clipping for stability
    torch.nn.utils.clip_grad_norm_(autoencoder.parameters(), max_norm=1.0)

    optimizer.step()                                        # Update weights

    # Display the latent space (mean over embedding dimension)
    latent_space_np = latent_space.detach().mean(dim=2).cpu().numpy()  # Shape: [batch_size, num_patches]
    # Reshape to image
    num_patches_side = img_size // autoencoder.patch_size
    latent_image = latent_space_np[0].reshape(num_patches_side, num_patches_side)
    # Upscale to match image size
    latent_image_resized = cv2.resize(latent_image, (img_size, img_size), interpolation=cv2.INTER_CUBIC)
    # Normalize and convert to uint8
    latent_image_resized = (255 * (latent_image_resized - latent_image_resized.min()) /
                            (latent_image_resized.max() - latent_image_resized.min() + 1e-5)).astype(np.uint8)
    latent_img = Image.fromarray(latent_image_resized)
    latent_img_tk = ImageTk.PhotoImage(image=latent_img)
    latent_label.imgtk = latent_img_tk
    latent_label.configure(image=latent_img_tk)

    # Display the reconstructed output at input resolution
    output_np = reconstructed[0].detach().cpu().numpy().transpose(1, 2, 0)  # Convert to HxWxC
    output_np = (255 * output_np).astype(np.uint8)
    output_img = Image.fromarray(output_np)
    output_img_tk = ImageTk.PhotoImage(image=output_img)
    output_label.imgtk = output_img_tk
    output_label.configure(image=output_img_tk)

    # Update the latent space size label
    latent_size = latent_space.size()
    latent_size_text = f"Latent Space Size: {list(latent_size)}"
    latent_size_label.config(text=latent_size_text)

    # Schedule the next frame update
    window.after(1, update_frame)

# Start video feed update loop
update_frame()

# Run Tkinter main loop
window.mainloop()

# Release the video capture on exit
cap.release()


Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to capture video frame
Failed to 

In [14]:
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np
from PIL import Image, ImageTk
import tkinter as tk
from tkinter import Label, Frame

# Define the Transformer-based Autoencoder model
class TransformerAutoencoder(nn.Module):
    def __init__(self, img_size=224, patch_size=16, emb_dim=512, num_heads=8, num_layers=6):
        super(TransformerAutoencoder, self).__init__()
        assert img_size % patch_size == 0, 'Image size must be divisible by patch size'
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.emb_dim = emb_dim
        self.patch_dim = 3 * patch_size * patch_size  # Assuming RGB images

        # Linear projection of flattened patches
        self.patch_embedding = nn.Linear(self.patch_dim, emb_dim)
        # Positional embedding
        self.positional_embedding = nn.Parameter(torch.randn(1, self.num_patches, emb_dim))

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        # Output projection to reconstruct patches
        self.output_projection = nn.Linear(emb_dim, self.patch_dim)

        # Target mask for causal decoding
        self.tgt_mask = nn.Transformer.generate_square_subsequent_mask(self.num_patches).to(
            torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        )

    def _get_patches(self, x):
        batch_size = x.size(0)
        # Divide image into patches
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous()
        x = x.view(batch_size, -1, self.patch_dim)
        return x

    def _reconstruct_from_patches(self, patches, batch_size):
        # Reconstruct image from patches
        reconstructed = patches.view(
            batch_size,
            self.img_size // self.patch_size,
            self.img_size // self.patch_size,
            3,
            self.patch_size,
            self.patch_size
        )
        reconstructed = reconstructed.permute(0, 3, 1, 4, 2, 5).contiguous()
        reconstructed = reconstructed.view(batch_size, 3, self.img_size, self.img_size)
        return reconstructed

    def encode(self, x):
        x_patches = self._get_patches(x)
        x_embedded = self.patch_embedding(x_patches) + self.positional_embedding
        memory = self.encoder(x_embedded)
        return x_embedded, memory

    def decode(self, x_embedded, memory):
        # Prepare target sequence for decoder (shifted right)
        tgt = torch.zeros_like(x_embedded)
        tgt[:, 1:, :] = x_embedded[:, :-1, :]
        tgt = tgt + self.positional_embedding

        # Decode patches
        output = self.decoder(tgt, memory, tgt_mask=self.tgt_mask)
        reconstructed_patches = self.output_projection(output)
        return reconstructed_patches

    def forward(self, x):
        batch_size = x.size(0)
        x_embedded, memory = self.encode(x)
        reconstructed_patches = self.decode(x_embedded, memory)
        reconstructed = self._reconstruct_from_patches(reconstructed_patches, batch_size)
        reconstructed = torch.sigmoid(reconstructed)  # Apply sigmoid to get pixel values between 0 and 1
        return memory, reconstructed

# Initialize the model, optimizer, and loss function
img_size = 224  # Input image resolution
autoencoder = TransformerAutoencoder(
    img_size=img_size,
    patch_size=16,
    emb_dim=512,
    num_heads=8,
    num_layers=6
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
autoencoder.to(device)

optimizer = optim.Adam(autoencoder.parameters(), lr=0.0001)
criterion = nn.MSELoss()  # Reconstruction loss

# Video capture
cap = cv2.VideoCapture(0)

# Transform to preprocess the frame
transform = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
])

# Initialize Tkinter window
window = tk.Tk()
window.title("Real-time Transformer Autoencoder Visualization")

# Frame for organizing the input video, latent space, and reconstructed video
main_frame = Frame(window)
main_frame.pack()

# Labels for each of the three outputs
video_label = Label(main_frame)
video_label.grid(row=1, column=0)

latent_label = Label(main_frame)
latent_label.grid(row=1, column=1)

output_label = Label(main_frame)
output_label.grid(row=1, column=2)

# Titles for each section
Label(main_frame, text="Input Video", font=("Arial", 12, "bold")).grid(row=0, column=0)
Label(main_frame, text="Latent Space", font=("Arial", 12, "bold")).grid(row=0, column=1)
Label(main_frame, text="Output Video", font=("Arial", 12, "bold")).grid(row=0, column=2)

# Label to display the latent space size
latent_size_label = Label(window, text="", font=("Arial", 12))
latent_size_label.pack()

def update_frame():
    # Capture frame-by-frame
    ret, frame = cap.read()
    if not ret:
        print("Failed to capture video frame")
        window.after(10, update_frame)
        return

    # Convert frame to PIL image
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img_pil = Image.fromarray(rgb_frame)
    img_resized = img_pil.resize((img_size, img_size))

    # Display original frame
    img_tk = ImageTk.PhotoImage(image=img_resized)
    video_label.imgtk = img_tk
    video_label.configure(image=img_tk)

    # Preprocess frame for model input
    input_image = transform(img_resized).unsqueeze(0).to(device)  # Shape: [1, 3, H, W]

    # Perform a training step
    autoencoder.train()
    optimizer.zero_grad()

    latent_space, reconstructed = autoencoder(input_image)  # Forward pass
    loss = criterion(reconstructed, input_image)            # Compute loss
    loss.backward()                                         # Backpropagation

    # Gradient clipping for stability
    torch.nn.utils.clip_grad_norm_(autoencoder.parameters(), max_norm=1.0)

    optimizer.step()                                        # Update weights

    # Display the latent space (mean over embedding dimension)
    latent_space_np = latent_space.detach().mean(dim=2).cpu().numpy()  # Shape: [batch_size, num_patches]
    # Reshape to image
    num_patches_side = img_size // autoencoder.patch_size
    latent_image = latent_space_np[0].reshape(num_patches_side, num_patches_side)
    # Upscale to match image size
    latent_image_resized = cv2.resize(latent_image, (img_size, img_size), interpolation=cv2.INTER_CUBIC)
    # Normalize and convert to uint8
    latent_image_resized = (255 * (latent_image_resized - latent_image_resized.min()) /
                            (latent_image_resized.max() - latent_image_resized.min() + 1e-5)).astype(np.uint8)
    latent_img = Image.fromarray(latent_image_resized)
    latent_img_tk = ImageTk.PhotoImage(image=latent_img)
    latent_label.imgtk = latent_img_tk
    latent_label.configure(image=latent_img_tk)

    # Display the reconstructed output at input resolution
    output_np = reconstructed[0].detach().cpu().numpy().transpose(1, 2, 0)  # Convert to HxWxC
    output_np = (255 * output_np).astype(np.uint8)
    output_img = Image.fromarray(output_np)
    output_img_tk = ImageTk.PhotoImage(image=output_img)
    output_label.imgtk = output_img_tk
    output_label.configure(image=output_img_tk)

    # Update the latent space size label
    latent_size = latent_space.size()
    latent_size_text = f"Latent Space Size: {list(latent_size)}"
    latent_size_label.config(text=latent_size_text)

    # Schedule the next frame update
    window.after(1, update_frame)

# Start video feed update loop
update_frame()

# Run Tkinter main loop
window.mainloop()

# Release the video capture on exit
cap.release()


In [2]:
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np
from PIL import Image, ImageTk
import tkinter as tk
from tkinter import Label, Frame
import time
import sys

# Hashing imports
import binascii  # For converting hashes to hex
from tkinter import filedialog

# Define the Teacher class with the encoder
class Teacher(nn.Module):
    def __init__(self, img_size=224, patch_size=16, emb_dim=512, num_heads=8, num_layers=6):
        super(Teacher, self).__init__()
        assert img_size % patch_size == 0, 'Image size must be divisible by patch size'
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.emb_dim = emb_dim
        self.patch_dim = 3 * patch_size * patch_size  # Assuming RGB images

        # Linear projection of flattened patches
        self.patch_embedding = nn.Linear(self.patch_dim, emb_dim)
        # Positional embedding
        self.positional_embedding = nn.Parameter(torch.randn(1, self.num_patches, emb_dim))

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    def _get_patches(self, x):
        batch_size = x.size(0)
        # Divide image into patches
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous()
        x = x.view(batch_size, -1, self.patch_dim)
        return x

    def encode(self, x):
        x_patches = self._get_patches(x)
        x_embedded = self.patch_embedding(x_patches) + self.positional_embedding
        memory = self.encoder(x_embedded)
        return x_embedded, memory

# Define the Apprentice class with the decoder
class Apprentice(nn.Module):
    def __init__(self, teacher, img_size=224, patch_size=16, emb_dim=512, num_heads=8, num_layers=6):
        super(Apprentice, self).__init__()
        self.teacher = teacher
        self.img_size = img_size
        self.patch_size = patch_size
        self.emb_dim = emb_dim
        self.patch_dim = 3 * patch_size * patch_size  # Assuming RGB images
        self.num_patches = (img_size // patch_size) ** 2

        # Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        # Output projection to reconstruct patches
        self.output_projection = nn.Linear(emb_dim, self.patch_dim)

        # Target mask for causal decoding
        self.tgt_mask = nn.Transformer.generate_square_subsequent_mask(self.num_patches).to(
            torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        )

    def _reconstruct_from_patches(self, patches, batch_size):
        # Reconstruct image from patches
        reconstructed = patches.view(
            batch_size,
            self.img_size // self.patch_size,
            self.img_size // self.patch_size,
            3,
            self.patch_size,
            self.patch_size
        )
        reconstructed = reconstructed.permute(0, 3, 1, 4, 2, 5).contiguous()
        reconstructed = reconstructed.view(batch_size, 3, self.img_size, self.img_size)
        return reconstructed

    def decode(self, x_embedded, memory):
        # Prepare target sequence for decoder (shifted right)
        tgt = torch.zeros_like(x_embedded)
        tgt[:, 1:, :] = x_embedded[:, :-1, :]
        tgt = tgt + self.teacher.positional_embedding

        # Decode patches
        output = self.decoder(tgt, memory, tgt_mask=self.tgt_mask)
        reconstructed_patches = self.output_projection(output)
        return reconstructed_patches

    def forward(self, x_embedded, memory):
        batch_size = x_embedded.size(0)
        reconstructed_patches = self.decode(x_embedded, memory)
        reconstructed = self._reconstruct_from_patches(reconstructed_patches, batch_size)
        reconstructed = torch.sigmoid(reconstructed)  # Apply sigmoid to get pixel values between 0 and 1
        return reconstructed

# Hashing functions
def calculate_ahash(frame, hash_size=8):
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized_frame = cv2.resize(gray_frame, (hash_size, hash_size))
    avg_pixel_value = np.mean(resized_frame)
    ahash = ''.join(['1' if pixel > avg_pixel_value else '0' for pixel in resized_frame.flatten()])
    return ahash

def hamming_distance(hash1, hash2):
    return sum(el1 != el2 for el1, el2 in zip(hash1, hash2))

def get_min_hamming_distance(frame_hash, knowledge_base_hashes):
    if not knowledge_base_hashes:
        return 64  # Max possible distance for 8x8 hash
    min_distance = 64
    for stored_hash in knowledge_base_hashes:
        distance = hamming_distance(frame_hash, stored_hash)
        if distance < min_distance:
            min_distance = distance
    return min_distance

def get_knowledge_base_size(knowledge_base_hashes):
    total_size = sys.getsizeof(knowledge_base_hashes)
    for h in knowledge_base_hashes:
        total_size += sys.getsizeof(h)
    return total_size

# Initialize the model components
img_size = 224  # Input image resolution
teacher = Teacher(
    img_size=img_size,
    patch_size=16,
    emb_dim=512,
    num_heads=8,
    num_layers=6
)
apprentice = Apprentice(
    teacher=teacher,
    img_size=img_size,
    patch_size=16,
    emb_dim=512,
    num_heads=8,
    num_layers=6
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
teacher.to(device)
apprentice.to(device)

optimizer = optim.Adam(list(teacher.parameters()) + list(apprentice.parameters()), lr=0.0001)
criterion = nn.MSELoss()  # Reconstruction loss

# Video capture
cap = cv2.VideoCapture(0)

# Transform to preprocess the frame
transform = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
])

# Knowledge base to store hashes of previously seen frames
knowledge_base_hashes = set()

# Initialize Tkinter window
window = tk.Tk()
window.title("Real-time Transformer Autoencoder Visualization with Hashing")

# Frame for organizing the input video, latent space, and reconstructed video
main_frame = Frame(window)
main_frame.pack()

# Labels for each of the three outputs
video_label = Label(main_frame)
video_label.grid(row=1, column=0)

latent_label = Label(main_frame)
latent_label.grid(row=1, column=1)

output_label = Label(main_frame)
output_label.grid(row=1, column=2)

# Titles for each section
Label(main_frame, text="Input Video", font=("Arial", 12, "bold")).grid(row=0, column=0)
Label(main_frame, text="Latent Space", font=("Arial", 12, "bold")).grid(row=0, column=1)
Label(main_frame, text="Output Video", font=("Arial", 12, "bold")).grid(row=0, column=2)

# Label to display the latent space size
latent_size_label = Label(window, text="", font=("Arial", 12))
latent_size_label.pack()

# Hashing variables
similarity_threshold = tk.DoubleVar(value=50.0)
knowledge_base_size_var = tk.StringVar(value="Knowledge Base Size: 0 bytes")

# Frame for controls
controls_frame = tk.Frame(window)
controls_frame.pack(pady=5)

# Slider for similarity threshold
threshold_label = tk.Label(controls_frame, text="Similarity Threshold (%)", font=("Helvetica", 12))
threshold_label.pack(side=tk.LEFT, padx=5)
threshold_slider = tk.Scale(controls_frame, from_=0, to=100, orient=tk.HORIZONTAL,
                            variable=similarity_threshold, length=200)
threshold_slider.pack(side=tk.LEFT, padx=5)

# Force training checkbox
force_training_var = tk.BooleanVar(value=False)
force_training_checkbox = tk.Checkbutton(controls_frame, text="Force Training", variable=force_training_var)
force_training_checkbox.pack(side=tk.LEFT, padx=5)

# Knowledge base size label
knowledge_base_size_label = tk.Label(window, textvariable=knowledge_base_size_var, font=("Helvetica", 12))
knowledge_base_size_label.pack()

def update_frame():
    # Capture frame-by-frame
    ret, frame = cap.read()
    if not ret:
        print("Failed to capture video frame")
        window.after(10, update_frame)
        return

    # Convert frame to PIL image
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img_pil = Image.fromarray(rgb_frame)
    img_resized = img_pil.resize((img_size, img_size))

    # Display original frame
    img_tk = ImageTk.PhotoImage(image=img_resized)
    video_label.imgtk = img_tk
    video_label.configure(image=img_tk)

    # Preprocess frame for model input
    input_image = transform(img_resized).unsqueeze(0).to(device)  # Shape: [1, 3, H, W]

    # Hashing logic to determine whether to train or infer
    frame_array = np.array(img_resized)
    frame_hash = calculate_ahash(frame_array)

    min_distance = get_min_hamming_distance(frame_hash, knowledge_base_hashes)
    similarity_percentage = (1 - (min_distance / 64)) * 100

    # Get the current threshold from the slider
    threshold = similarity_threshold.get()

    # Check if force training is enabled
    if force_training_var.get():
        # Force training and include the hash
        knowledge_base_hashes.add(frame_hash)

        # Perform a training step
        teacher.train()
        apprentice.train()
        optimizer.zero_grad()

        x_embedded, memory = teacher.encode(input_image)
        reconstructed = apprentice(x_embedded, memory)  # Forward pass
        loss = criterion(reconstructed, input_image)    # Compute loss
        loss.backward()                                 # Backpropagation

        # Gradient clipping for stability
        torch.nn.utils.clip_grad_norm_(list(teacher.parameters()) + list(apprentice.parameters()), max_norm=1.0)

        optimizer.step()                                # Update weights
    else:
        # Decide whether to train or just infer based on similarity
        if similarity_percentage < threshold:
            # Add new frame hash to knowledge base
            knowledge_base_hashes.add(frame_hash)

            # Perform a training step
            teacher.train()
            apprentice.train()
            optimizer.zero_grad()

            x_embedded, memory = teacher.encode(input_image)
            reconstructed = apprentice(x_embedded, memory)  # Forward pass
            loss = criterion(reconstructed, input_image)    # Compute loss
            loss.backward()                                 # Backpropagation

            # Gradient clipping for stability
            torch.nn.utils.clip_grad_norm_(list(teacher.parameters()) + list(apprentice.parameters()), max_norm=1.0)

            optimizer.step()                                # Update weights
        else:
            # Just perform inference
            teacher.eval()
            apprentice.eval()
            with torch.no_grad():
                x_embedded, memory = teacher.encode(input_image)
                reconstructed = apprentice(x_embedded, memory)

    # Display the latent space (mean over embedding dimension)
    latent_space = memory
    latent_space_np = latent_space.detach().mean(dim=2).cpu().numpy()  # Shape: [batch_size, num_patches]
    # Reshape to image
    num_patches_side = img_size // teacher.patch_size
    latent_image = latent_space_np[0].reshape(num_patches_side, num_patches_side)
    # Upscale to match image size
    latent_image_resized = cv2.resize(latent_image, (img_size, img_size), interpolation=cv2.INTER_CUBIC)
    # Normalize and convert to uint8
    latent_image_resized = (255 * (latent_image_resized - latent_image_resized.min()) /
                            (latent_image_resized.max() - latent_image_resized.min() + 1e-5)).astype(np.uint8)
    latent_img = Image.fromarray(latent_image_resized)
    latent_img_tk = ImageTk.PhotoImage(image=latent_img)
    latent_label.imgtk = latent_img_tk
    latent_label.configure(image=latent_img_tk)

    # Display the reconstructed output at input resolution
    output_np = reconstructed[0].detach().cpu().numpy().transpose(1, 2, 0)  # Convert to HxWxC
    output_np = (255 * output_np).astype(np.uint8)
    output_img = Image.fromarray(output_np)
    output_img_tk = ImageTk.PhotoImage(image=output_img)
    output_label.imgtk = output_img_tk
    output_label.configure(image=output_img_tk)

    # Update the latent space size label
    latent_size = latent_space.size()
    latent_size_text = f"Latent Space Size: {list(latent_size)}"
    latent_size_label.config(text=latent_size_text)

    # Update knowledge base size
    kb_size_bytes = get_knowledge_base_size(knowledge_base_hashes)
    if kb_size_bytes < 1024:
        knowledge_base_size_var.set(f"Knowledge Base Size: {kb_size_bytes} bytes")
    else:
        kb_size_kb = kb_size_bytes / 1024
        knowledge_base_size_var.set(f"Knowledge Base Size: {kb_size_kb:.2f} KB")

    # Schedule the next frame update
    window.after(1, update_frame)

# Start video feed update loop
update_frame()

# Run Tkinter main loop
window.mainloop()

# Release the video capture on exit
cap.release()


In [4]:
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np
from PIL import Image, ImageTk
import tkinter as tk
from tkinter import Label, Frame
import time
import sys

# Hashing imports
import binascii  # For converting hashes to hex
from tkinter import filedialog

# Define the Teacher class with the encoder
class Teacher(nn.Module):
    def __init__(self, img_size=224, patch_size=8, emb_dim=512, num_heads=8, num_layers=6):
        super(Teacher, self).__init__()
        assert img_size % patch_size == 0, 'Image size must be divisible by patch size'
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.emb_dim = emb_dim
        self.patch_dim = 3 * patch_size * patch_size  # Assuming RGB images

        # Linear projection of flattened patches
        self.patch_embedding = nn.Linear(self.patch_dim, emb_dim)
        # Positional embedding
        self.positional_embedding = nn.Parameter(torch.randn(1, self.num_patches, emb_dim))

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    def _get_patches(self, x):
        batch_size = x.size(0)
        # Divide image into patches
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous()
        x = x.view(batch_size, -1, self.patch_dim)
        return x

    def encode(self, x):
        x_patches = self._get_patches(x)
        x_embedded = self.patch_embedding(x_patches) + self.positional_embedding
        memory = self.encoder(x_embedded)
        return x_embedded, memory

# Define the Apprentice class with the decoder
class Apprentice(nn.Module):
    def __init__(self, teacher, img_size=224, patch_size=8, emb_dim=512, num_heads=8, num_layers=6):
        super(Apprentice, self).__init__()
        self.teacher = teacher
        self.img_size = img_size
        self.patch_size = patch_size
        self.emb_dim = emb_dim
        self.patch_dim = 3 * patch_size * patch_size  # Assuming RGB images
        self.num_patches = (img_size // patch_size) ** 2

        # Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        # Output projection to reconstruct patches
        self.output_projection = nn.Linear(emb_dim, self.patch_dim)

        # Target mask for causal decoding
        self.tgt_mask = nn.Transformer.generate_square_subsequent_mask(self.num_patches).to(
            torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        )

    def _reconstruct_from_patches(self, patches, batch_size):
        # Reconstruct image from patches
        reconstructed = patches.view(
            batch_size,
            self.img_size // self.patch_size,
            self.img_size // self.patch_size,
            3,
            self.patch_size,
            self.patch_size
        )
        reconstructed = reconstructed.permute(0, 3, 1, 4, 2, 5).contiguous()
        reconstructed = reconstructed.view(batch_size, 3, self.img_size, self.img_size)
        return reconstructed

    def decode(self, x_embedded, memory):
        # Prepare target sequence for decoder (shifted right)
        tgt = torch.zeros_like(x_embedded)
        tgt[:, 1:, :] = x_embedded[:, :-1, :]
        tgt = tgt + self.teacher.positional_embedding

        # Decode patches
        output = self.decoder(tgt, memory, tgt_mask=self.tgt_mask)
        reconstructed_patches = self.output_projection(output)
        return reconstructed_patches

    def forward(self, x_embedded, memory):
        batch_size = x_embedded.size(0)
        reconstructed_patches = self.decode(x_embedded, memory)
        reconstructed = self._reconstruct_from_patches(reconstructed_patches, batch_size)
        reconstructed = torch.sigmoid(reconstructed)  # Apply sigmoid to get pixel values between 0 and 1
        return reconstructed

# Hashing functions
def calculate_ahash(frame, hash_size=8):
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized_frame = cv2.resize(gray_frame, (hash_size, hash_size))
    avg_pixel_value = np.mean(resized_frame)
    ahash = ''.join(['1' if pixel > avg_pixel_value else '0' for pixel in resized_frame.flatten()])
    return ahash

def hamming_distance(hash1, hash2):
    return sum(el1 != el2 for el1, el2 in zip(hash1, hash2))

def get_min_hamming_distance(frame_hash, knowledge_base_hashes):
    if not knowledge_base_hashes:
        return 64  # Max possible distance for 8x8 hash
    min_distance = 64
    for stored_hash in knowledge_base_hashes:
        distance = hamming_distance(frame_hash, stored_hash)
        if distance < min_distance:
            min_distance = distance
    return min_distance

def get_knowledge_base_size(knowledge_base_hashes):
    total_size = sys.getsizeof(knowledge_base_hashes)
    for h in knowledge_base_hashes:
        total_size += sys.getsizeof(h)
    return total_size

# Initialize the model components
img_size = 224  # Input image resolution
patch_size = 8  # Reduced from 16 to 8 for more tiles
teacher = Teacher(
    img_size=img_size,
    patch_size=patch_size,
    emb_dim=512,
    num_heads=8,
    num_layers=6
)
apprentice = Apprentice(
    teacher=teacher,
    img_size=img_size,
    patch_size=patch_size,
    emb_dim=512,
    num_heads=8,
    num_layers=6
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
teacher.to(device)
apprentice.to(device)

optimizer = optim.Adam(list(teacher.parameters()) + list(apprentice.parameters()), lr=0.0001)
criterion = nn.MSELoss()  # Reconstruction loss

# Video capture
cap = cv2.VideoCapture(0)

# Transform to preprocess the frame
transform = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
])

# Knowledge base to store hashes of previously seen frames
knowledge_base_hashes = set()

# Initialize Tkinter window
window = tk.Tk()
window.title("Real-time Transformer Autoencoder Visualization with Hashing")

# Frame for organizing the input video, latent space, and reconstructed video
main_frame = Frame(window)
main_frame.pack()

# Labels for each of the three outputs
video_label = Label(main_frame)
video_label.grid(row=1, column=0)

latent_label = Label(main_frame)
latent_label.grid(row=1, column=1)

output_label = Label(main_frame)
output_label.grid(row=1, column=2)

# Titles for each section
Label(main_frame, text="Input Video", font=("Arial", 12, "bold")).grid(row=0, column=0)
Label(main_frame, text="Latent Space", font=("Arial", 12, "bold")).grid(row=0, column=1)
Label(main_frame, text="Output Video", font=("Arial", 12, "bold")).grid(row=0, column=2)

# Label to display the latent space size
latent_size_label = Label(window, text="", font=("Arial", 12))
latent_size_label.pack()

# Hashing variables
similarity_threshold = tk.DoubleVar(value=50.0)
knowledge_base_size_var = tk.StringVar(value="Knowledge Base Size: 0 bytes")

# Frame for controls
controls_frame = tk.Frame(window)
controls_frame.pack(pady=5)

# Mode selection (Train or Inference)
mode_var = tk.StringVar(value="Inference")
train_radio = tk.Radiobutton(controls_frame, text="Train", variable=mode_var, value="Train")
inference_radio = tk.Radiobutton(controls_frame, text="Inference", variable=mode_var, value="Inference")
train_radio.pack(side=tk.LEFT, padx=5)
inference_radio.pack(side=tk.LEFT, padx=5)

# Slider for similarity threshold
threshold_label = tk.Label(controls_frame, text="Similarity Threshold (%)", font=("Helvetica", 12))
threshold_label.pack(side=tk.LEFT, padx=5)
threshold_slider = tk.Scale(controls_frame, from_=0, to=100, orient=tk.HORIZONTAL,
                            variable=similarity_threshold, length=200)
threshold_slider.pack(side=tk.LEFT, padx=5)

# Knowledge base size label
knowledge_base_size_label = tk.Label(window, textvariable=knowledge_base_size_var, font=("Helvetica", 12))
knowledge_base_size_label.pack()

# Labels to display the sizes
size_info_label = tk.Label(window, text="", font=("Arial", 12))
size_info_label.pack()

def update_frame():
    # Capture frame-by-frame
    ret, frame = cap.read()
    if not ret:
        print("Failed to capture video frame")
        window.after(10, update_frame)
        return

    # Convert frame to PIL image
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img_pil = Image.fromarray(rgb_frame)
    img_resized = img_pil.resize((img_size, img_size))

    # Display original frame
    img_tk = ImageTk.PhotoImage(image=img_resized)
    video_label.imgtk = img_tk
    video_label.configure(image=img_tk)

    # Preprocess frame for model input
    input_image = transform(img_resized).unsqueeze(0).to(device)  # Shape: [1, 3, H, W]

    # Hashing logic to determine whether to train or infer
    frame_array = np.array(img_resized)
    frame_hash = calculate_ahash(frame_array)

    min_distance = get_min_hamming_distance(frame_hash, knowledge_base_hashes)
    similarity_percentage = (1 - (min_distance / 64)) * 100

    # Get the current threshold from the slider
    threshold = similarity_threshold.get()
    mode = mode_var.get()

    if mode == "Train":
        # Force training and include the hash
        knowledge_base_hashes.add(frame_hash)

        # Perform a training step
        teacher.train()
        apprentice.train()
        optimizer.zero_grad()

        x_embedded, memory = teacher.encode(input_image)
        reconstructed = apprentice(x_embedded, memory)  # Forward pass
        loss = criterion(reconstructed, input_image)    # Compute loss
        loss.backward()                                 # Backpropagation

        # Gradient clipping for stability
        torch.nn.utils.clip_grad_norm_(list(teacher.parameters()) + list(apprentice.parameters()), max_norm=1.0)

        optimizer.step()                                # Update weights
    elif mode == "Inference":
        # Decide whether to train or just infer based on similarity
        if similarity_percentage < threshold:
            # Add new frame hash to knowledge base
            knowledge_base_hashes.add(frame_hash)

            # Perform a training step
            teacher.train()
            apprentice.train()
            optimizer.zero_grad()

            x_embedded, memory = teacher.encode(input_image)
            reconstructed = apprentice(x_embedded, memory)  # Forward pass
            loss = criterion(reconstructed, input_image)    # Compute loss
            loss.backward()                                 # Backpropagation

            # Gradient clipping for stability
            torch.nn.utils.clip_grad_norm_(list(teacher.parameters()) + list(apprentice.parameters()), max_norm=1.0)

            optimizer.step()                                # Update weights
        else:
            # Just perform inference
            teacher.eval()
            apprentice.eval()
            with torch.no_grad():
                x_embedded, memory = teacher.encode(input_image)
                reconstructed = apprentice(x_embedded, memory)

    # Display the latent space (mean over embedding dimension)
    latent_space = memory
    latent_space_np = latent_space.detach().mean(dim=2).cpu().numpy()  # Shape: [batch_size, num_patches]
    # Reshape to image
    num_patches_side = img_size // teacher.patch_size
    latent_image = latent_space_np[0].reshape(num_patches_side, num_patches_side)
    # Upscale to match image size
    latent_image_resized = cv2.resize(latent_image, (img_size, img_size), interpolation=cv2.INTER_CUBIC)
    # Normalize and convert to uint8
    latent_image_resized = (255 * (latent_image_resized - latent_image_resized.min()) /
                            (latent_image_resized.max() - latent_image_resized.min() + 1e-5)).astype(np.uint8)
    latent_img = Image.fromarray(latent_image_resized)
    latent_img_tk = ImageTk.PhotoImage(image=latent_img)
    latent_label.imgtk = latent_img_tk
    latent_label.configure(image=latent_img_tk)

    # Display the reconstructed output at input resolution
    output_np = reconstructed[0].detach().cpu().numpy().transpose(1, 2, 0)  # Convert to HxWxC
    output_np = (255 * output_np).astype(np.uint8)
    output_img = Image.fromarray(output_np)
    output_img_tk = ImageTk.PhotoImage(image=output_img)
    output_label.imgtk = output_img_tk
    output_label.configure(image=output_img_tk)

    # Update the latent space size label
    latent_size = latent_space.size()
    latent_size_text = f"Latent Space Size: {list(latent_size)}"
    latent_size_label.config(text=latent_size_text)

    # Update knowledge base size
    kb_size_bytes = get_knowledge_base_size(knowledge_base_hashes)
    if kb_size_bytes < 1024:
        knowledge_base_size_var.set(f"Knowledge Base Size: {kb_size_bytes} bytes")
    else:
        kb_size_kb = kb_size_bytes / 1024
        knowledge_base_size_var.set(f"Knowledge Base Size: {kb_size_kb:.2f} KB")

    # Calculate sizes
    original_size_kb = frame_array.nbytes / 1024
    latent_size_kb = latent_space.element_size() * latent_space.nelement() / 1024
    reconstructed_size_kb = output_np.nbytes / 1024

    # Update the size info label
    size_info_text = f"Original Image Size: {original_size_kb:.2f} KB | Latent Space Size: {latent_size_kb:.2f} KB | Reconstructed Image Size: {reconstructed_size_kb:.2f} KB"
    size_info_label.config(text=size_info_text)

    # Schedule the next frame update
    window.after(1, update_frame)

# Start video feed update loop
update_frame()

# Run Tkinter main loop
window.mainloop()

# Release the video capture on exit
cap.release()


In [7]:
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import numpy as np
from PIL import Image, ImageTk, ImageOps
import tkinter as tk
from tkinter import Label, Frame, OptionMenu
import time
import sys

# Hashing imports
import binascii  # For converting hashes to hex
from tkinter import filedialog

# Import for pHash
import scipy.fftpack

# Define the Teacher class with the encoder and latent space projection
class Teacher(nn.Module):
    def __init__(self, img_size=224, patch_size=16, emb_dim=256, num_heads=8, num_layers=6, latent_dim=64):
        super(Teacher, self).__init__()
        assert img_size % patch_size == 0, 'Image size must be divisible by patch size'
        self.img_size = img_size
        self.patch_size = patch_size
        self.num_patches = (img_size // patch_size) ** 2
        self.emb_dim = emb_dim
        self.latent_dim = latent_dim
        self.patch_dim = 3 * patch_size * patch_size  # Assuming RGB images

        # Linear projection of flattened patches
        self.patch_embedding = nn.Linear(self.patch_dim, emb_dim)
        # Positional embedding
        self.positional_embedding = nn.Parameter(torch.randn(1, self.num_patches, emb_dim))

        # Transformer Encoder
        encoder_layer = nn.TransformerEncoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

        # Projection to reduce the latent space dimensionality
        self.latent_projection = nn.Linear(self.num_patches * emb_dim, latent_dim)

    def _get_patches(self, x):
        batch_size = x.size(0)
        # Divide image into patches
        x = x.unfold(2, self.patch_size, self.patch_size).unfold(3, self.patch_size, self.patch_size)
        x = x.permute(0, 2, 3, 1, 4, 5).contiguous()
        x = x.view(batch_size, -1, self.patch_dim)
        return x

    def encode(self, x):
        x_patches = self._get_patches(x)
        x_embedded = self.patch_embedding(x_patches) + self.positional_embedding
        memory = self.encoder(x_embedded)  # Shape: [batch_size, num_patches, emb_dim]

        # Flatten memory and project to latent space
        memory_flat = memory.view(x.size(0), -1)  # Shape: [batch_size, num_patches * emb_dim]
        latent = self.latent_projection(memory_flat)  # Shape: [batch_size, latent_dim]
        return x_embedded, memory, latent

# Define the Apprentice class with the decoder
class Apprentice(nn.Module):
    def __init__(self, teacher, img_size=224, patch_size=16, emb_dim=256, num_heads=8, num_layers=6, latent_dim=64):
        super(Apprentice, self).__init__()
        self.teacher = teacher
        self.img_size = img_size
        self.patch_size = patch_size
        self.emb_dim = emb_dim
        self.latent_dim = latent_dim
        self.patch_dim = 3 * patch_size * patch_size  # Assuming RGB images
        self.num_patches = (img_size // patch_size) ** 2

        # Projection to expand latent space back to memory size
        self.latent_expand = nn.Linear(latent_dim, self.num_patches * emb_dim)

        # Transformer Decoder
        decoder_layer = nn.TransformerDecoderLayer(d_model=emb_dim, nhead=num_heads, batch_first=True)
        self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        # Output projection to reconstruct patches
        self.output_projection = nn.Linear(emb_dim, self.patch_dim)

        # Target mask for causal decoding
        self.tgt_mask = nn.Transformer.generate_square_subsequent_mask(self.num_patches).to(
            torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        )

    def _reconstruct_from_patches(self, patches, batch_size):
        # Reconstruct image from patches
        reconstructed = patches.view(
            batch_size,
            self.img_size // self.patch_size,
            self.img_size // self.patch_size,
            3,
            self.patch_size,
            self.patch_size
        )
        reconstructed = reconstructed.permute(0, 3, 1, 4, 2, 5).contiguous()
        reconstructed = reconstructed.view(batch_size, 3, self.img_size, self.img_size)
        return reconstructed

    def decode(self, x_embedded, latent):
        # Expand latent space back to memory size
        memory_expanded = self.latent_expand(latent)  # Shape: [batch_size, num_patches * emb_dim]
        memory = memory_expanded.view(-1, self.num_patches, self.emb_dim)  # Shape: [batch_size, num_patches, emb_dim]

        # Prepare target sequence for decoder (shifted right)
        tgt = torch.zeros_like(x_embedded)
        tgt[:, 1:, :] = x_embedded[:, :-1, :]
        tgt = tgt + self.teacher.positional_embedding

        # Decode patches
        output = self.decoder(tgt, memory, tgt_mask=self.tgt_mask)
        reconstructed_patches = self.output_projection(output)
        return reconstructed_patches

    def forward(self, x_embedded, latent):
        batch_size = x_embedded.size(0)
        reconstructed_patches = self.decode(x_embedded, latent)
        reconstructed = self._reconstruct_from_patches(reconstructed_patches, batch_size)
        reconstructed = torch.sigmoid(reconstructed)  # Apply sigmoid to get pixel values between 0 and 1
        return reconstructed

# Hashing functions
def calculate_ahash(frame, hash_size=8):
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized_frame = cv2.resize(gray_frame, (hash_size, hash_size))
    avg_pixel_value = np.mean(resized_frame)
    ahash = ''.join(['1' if pixel > avg_pixel_value else '0' for pixel in resized_frame.flatten()])
    return ahash

def calculate_dhash(frame, hash_size=8):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized = cv2.resize(gray, (hash_size + 1, hash_size))
    diff = resized[:, 1:] > resized[:, :-1]
    dhash = ''.join(['1' if val else '0' for val in diff.flatten()])
    return dhash

def calculate_phash(frame, hash_size=8):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized = cv2.resize(gray, (hash_size, hash_size))
    dct = scipy.fftpack.dct(scipy.fftpack.dct(resized.T, norm='ortho').T, norm='ortho')
    dct_low_freq = dct[:8, :8]
    med = np.median(dct_low_freq)
    phash = ''.join(['1' if val > med else '0' for val in dct_low_freq.flatten()])
    return phash

def calculate_whash(frame, hash_size=8):
    # This is a simplified version using Haar wavelets
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    resized = cv2.resize(gray, (hash_size, hash_size))
    coeffs = pywt.wavedec2(resized, 'haar', level=1)
    LL = coeffs[0]
    med = np.median(LL)
    whash = ''.join(['1' if val > med else '0' for val in LL.flatten()])
    return whash

def hamming_distance(hash1, hash2):
    return sum(c1 != c2 for c1, c2 in zip(hash1, hash2))

def get_min_hamming_distance(frame_hash, knowledge_base_hashes):
    if not knowledge_base_hashes:
        return len(frame_hash)  # Max possible distance
    min_distance = len(frame_hash)
    for stored_hash in knowledge_base_hashes:
        distance = hamming_distance(frame_hash, stored_hash)
        if distance < min_distance:
            min_distance = distance
    return min_distance

def get_knowledge_base_size(knowledge_base_hashes):
    total_size = sys.getsizeof(knowledge_base_hashes)
    for h in knowledge_base_hashes:
        total_size += sys.getsizeof(h)
    return total_size

# Initialize the model components
img_size = 224  # Input image resolution
patch_size = 16
emb_dim = 256
latent_dim = 64  # Smaller latent space
teacher = Teacher(
    img_size=img_size,
    patch_size=patch_size,
    emb_dim=emb_dim,
    num_heads=8,
    num_layers=6,
    latent_dim=latent_dim
)
apprentice = Apprentice(
    teacher=teacher,
    img_size=img_size,
    patch_size=patch_size,
    emb_dim=emb_dim,
    num_heads=8,
    num_layers=6,
    latent_dim=latent_dim
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
teacher.to(device)
apprentice.to(device)

optimizer = optim.Adam(list(teacher.parameters()) + list(apprentice.parameters()), lr=0.0001)
criterion = nn.MSELoss()  # Reconstruction loss

# Video capture
cap = cv2.VideoCapture(0)

# Transform to preprocess the frame
transform = transforms.Compose([
    transforms.Resize((img_size, img_size)),
    transforms.ToTensor(),
])

# Knowledge base to store hashes of previously seen frames
knowledge_base_hashes = set()

# Initialize Tkinter window
window = tk.Tk()
window.title("Real-time Transformer Autoencoder Visualization with Hashing")

# Frame for organizing the input video, latent space, and reconstructed video
main_frame = Frame(window)
main_frame.pack()

# Labels for each of the three outputs
video_label = Label(main_frame)
video_label.grid(row=1, column=0)

latent_label = Label(main_frame)
latent_label.grid(row=1, column=1)

output_label = Label(main_frame)
output_label.grid(row=1, column=2)

# Titles for each section
Label(main_frame, text="Input Video", font=("Arial", 12, "bold")).grid(row=0, column=0)
Label(main_frame, text="Latent Space", font=("Arial", 12, "bold")).grid(row=0, column=1)
Label(main_frame, text="Output Video", font=("Arial", 12, "bold")).grid(row=0, column=2)

# Label to display the latent space size
latent_size_label = Label(window, text="", font=("Arial", 12))
latent_size_label.pack()

# Hashing variables
similarity_threshold = tk.DoubleVar(value=50.0)
knowledge_base_size_var = tk.StringVar(value="Knowledge Base Size: 0 bytes")

# Frame for controls
controls_frame = tk.Frame(window)
controls_frame.pack(pady=5)

# Mode selection (Train or Inference)
mode_var = tk.StringVar(value="Inference")
train_radio = tk.Radiobutton(controls_frame, text="Train", variable=mode_var, value="Train")
inference_radio = tk.Radiobutton(controls_frame, text="Inference", variable=mode_var, value="Inference")
train_radio.pack(side=tk.LEFT, padx=5)
inference_radio.pack(side=tk.LEFT, padx=5)

# Slider for similarity threshold
threshold_label = tk.Label(controls_frame, text="Similarity Threshold (%)", font=("Helvetica", 12))
threshold_label.pack(side=tk.LEFT, padx=5)
threshold_slider = tk.Scale(controls_frame, from_=0, to=100, orient=tk.HORIZONTAL,
                            variable=similarity_threshold, length=200)
threshold_slider.pack(side=tk.LEFT, padx=5)

# Hashing method selection
hash_methods = ["aHash", "dHash", "pHash"]
selected_hash_method = tk.StringVar(value="aHash")
hash_method_menu = tk.OptionMenu(controls_frame, selected_hash_method, *hash_methods)
hash_method_menu.config(width=10)
hash_method_label = tk.Label(controls_frame, text="Hash Method:", font=("Helvetica", 12))
hash_method_label.pack(side=tk.LEFT, padx=5)
hash_method_menu.pack(side=tk.LEFT, padx=5)

# Knowledge base size label
knowledge_base_size_label = tk.Label(window, textvariable=knowledge_base_size_var, font=("Helvetica", 12))
knowledge_base_size_label.pack()

# Labels to display the sizes
size_info_label = tk.Label(window, text="", font=("Arial", 12))
size_info_label.pack()

def update_frame():
    # Capture frame-by-frame
    ret, frame = cap.read()
    if not ret:
        print("Failed to capture video frame")
        window.after(10, update_frame)
        return

    # Convert frame to PIL image
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    img_pil = Image.fromarray(rgb_frame)
    img_resized = img_pil.resize((img_size, img_size))

    # Display original frame
    img_tk = ImageTk.PhotoImage(image=img_resized)
    video_label.imgtk = img_tk
    video_label.configure(image=img_tk)

    # Preprocess frame for model input
    input_image = transform(img_resized).unsqueeze(0).to(device)  # Shape: [1, 3, H, W]

    # Hashing logic to determine whether to train or infer
    frame_array = np.array(img_resized)
    hash_method = selected_hash_method.get()

    if hash_method == "aHash":
        frame_hash = calculate_ahash(frame_array)
        max_distance = 64
    elif hash_method == "dHash":
        frame_hash = calculate_dhash(frame_array)
        max_distance = 64
    elif hash_method == "pHash":
        frame_hash = calculate_phash(frame_array)
        max_distance = 64
    else:
        frame_hash = calculate_ahash(frame_array)
        max_distance = 64

    min_distance = get_min_hamming_distance(frame_hash, knowledge_base_hashes)
    similarity_percentage = (1 - (min_distance / max_distance)) * 100

    # Get the current threshold from the slider
    threshold = similarity_threshold.get()
    mode = mode_var.get()

    if mode == "Train":
        # Force training and include the hash
        knowledge_base_hashes.add(frame_hash)

        # Perform a training step
        teacher.train()
        apprentice.train()
        optimizer.zero_grad()

        x_embedded, memory, latent = teacher.encode(input_image)
        reconstructed = apprentice(x_embedded, latent)  # Forward pass
        loss = criterion(reconstructed, input_image)    # Compute loss
        loss.backward()                                 # Backpropagation

        # Gradient clipping for stability
        torch.nn.utils.clip_grad_norm_(list(teacher.parameters()) + list(apprentice.parameters()), max_norm=1.0)

        optimizer.step()                                # Update weights
    elif mode == "Inference":
        # Decide whether to train or just infer based on similarity
        if similarity_percentage < threshold:
            # Add new frame hash to knowledge base
            knowledge_base_hashes.add(frame_hash)

            # Perform a training step
            teacher.train()
            apprentice.train()
            optimizer.zero_grad()

            x_embedded, memory, latent = teacher.encode(input_image)
            reconstructed = apprentice(x_embedded, latent)  # Forward pass
            loss = criterion(reconstructed, input_image)    # Compute loss
            loss.backward()                                 # Backpropagation

            # Gradient clipping for stability
            torch.nn.utils.clip_grad_norm_(list(teacher.parameters()) + list(apprentice.parameters()), max_norm=1.0)

            optimizer.step()                                # Update weights
        else:
            # Just perform inference
            teacher.eval()
            apprentice.eval()
            with torch.no_grad():
                x_embedded, memory, latent = teacher.encode(input_image)
                reconstructed = apprentice(x_embedded, latent)

    # Display the latent space (use the compressed latent vector)
    latent_space = latent  # Shape: [batch_size, latent_dim]
    latent_space_np = latent_space.detach().cpu().numpy()
    # Reshape to square for visualization
    latent_dim_sqrt = int(np.ceil(np.sqrt(latent_dim)))
    latent_image = np.zeros((latent_dim_sqrt, latent_dim_sqrt))
    latent_flat = latent_space_np[0]
    latent_image.flat[:latent_flat.size] = latent_flat
    # Upscale to match image size
    latent_image_resized = cv2.resize(latent_image, (img_size, img_size), interpolation=cv2.INTER_CUBIC)
    # Normalize and convert to uint8
    latent_image_resized = (255 * (latent_image_resized - latent_image_resized.min()) /
                            (latent_image_resized.max() - latent_image_resized.min() + 1e-5)).astype(np.uint8)
    latent_img = Image.fromarray(latent_image_resized)
    latent_img_tk = ImageTk.PhotoImage(image=latent_img)
    latent_label.imgtk = latent_img_tk
    latent_label.configure(image=latent_img_tk)

    # Display the reconstructed output at input resolution
    output_np = reconstructed[0].detach().cpu().numpy().transpose(1, 2, 0)  # Convert to HxWxC
    output_np = (255 * output_np).astype(np.uint8)
    output_img = Image.fromarray(output_np)
    output_img_tk = ImageTk.PhotoImage(image=output_img)
    output_label.imgtk = output_img_tk
    output_label.configure(image=output_img_tk)

    # Update the latent space size label
    latent_size = latent_space.size()
    latent_size_text = f"Latent Space Size: {list(latent_space.size())}"
    latent_size_label.config(text=latent_size_text)

    # Update knowledge base size
    kb_size_bytes = get_knowledge_base_size(knowledge_base_hashes)
    if kb_size_bytes < 1024:
        knowledge_base_size_var.set(f"Knowledge Base Size: {kb_size_bytes} bytes")
    else:
        kb_size_kb = kb_size_bytes / 1024
        knowledge_base_size_var.set(f"Knowledge Base Size: {kb_size_kb:.2f} KB")

    # Calculate sizes
    original_size_kb = frame_array.nbytes / 1024
    latent_size_kb = latent_space.element_size() * latent_space.nelement() / 1024
    reconstructed_size_kb = output_np.nbytes / 1024

    # Update the size info label
    size_info_text = f"Original Image Size: {original_size_kb:.2f} KB | Latent Space Size: {latent_size_kb:.2f} KB | Reconstructed Image Size: {reconstructed_size_kb:.2f} KB"
    size_info_label.config(text=size_info_text)

    # Schedule the next frame update
    window.after(1, update_frame)

# Start video feed update loop
update_frame()

# Run Tkinter main loop
window.mainloop()

# Release the video capture on exit
cap.release()


  return torch._dynamo.disable(fn, recursive)(*args, **kwargs)


In [8]:
# Image and latent space dimensions
input_width, input_height, channels = 224, 224, 3
latent_dim = 64  # Size of the latent space vector

# Calculate input and reconstructed image size in bytes
image_size_bytes = input_width * input_height * channels * 4  # 32-bit floats
image_size_kb = image_size_bytes / 1024  # Convert to KB

# Calculate latent space size in bytes
latent_size_bytes = latent_dim * 4  # 32-bit floats
latent_size_kb = latent_size_bytes / 1024  # Convert to KB

# Calculate compression factor
compression_factor = image_size_bytes / latent_size_bytes

# Display the results
print(f"Input/Reconstructed Image Size: {image_size_kb:.2f} KB")
print(f"Latent Space Size: {latent_size_kb:.2f} KB")
print(f"Compression Factor (Input/Reconstructed Size to Latent Space Size): {compression_factor:.2f}x smaller")


Input/Reconstructed Image Size: 588.00 KB
Latent Space Size: 0.25 KB
Compression Factor (Input/Reconstructed Size to Latent Space Size): 2352.00x smaller
