<a href="https://www.kaggle.com/code/nigamshitij/lego-instructions-using-transformers?scriptVersionId=194559915" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [3]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [4]:
import torch
import torch.nn as nn
import torchvision.models as models
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import numpy as np

In [5]:
class LEGOInstructionDataset(Dataset):
    def __init__(self, instruction_paths, prompts, transform=None):
        self.instruction_paths = instruction_paths
        self.prompts = prompts
        self.transform = transform

    def __len__(self):
        return len(self.instruction_paths)

    def __getitem__(self, idx):
        instruction_images = []
        for img_path in self.instruction_paths[idx]:
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            instruction_images.append(image)
        
        return {
            'instructions': torch.stack(instruction_images),
            'prompt': self.prompts[idx]
        }

In [6]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-np.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0)]

In [7]:
class LEGOInstructionGenerator(nn.Module):
    def __init__(self, num_instructions, d_model, nhead, num_encoder_layers, num_decoder_layers):
        super().__init__()
        
        # Image encoder (using ResNet as feature extractor)
        self.image_encoder = models.resnet18(pretrained=True)
        self.image_encoder.fc = nn.Linear(self.image_encoder.fc.in_features, d_model)
        
        # Prompt encoder
        self.prompt_encoder = nn.Embedding(10000, d_model)  # Assuming vocab size of 10000
        self.positional_encoding = PositionalEncoding(d_model)
        
        # Transformer
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers
        )
        
        # Output layer
        self.fc_out = nn.Linear(d_model, num_instructions)

    def forward(self, src_images, tgt_prompt):
        # Encode images
        batch_size, seq_len, c, h, w = src_images.shape
        src_images = src_images.view(batch_size * seq_len, c, h, w)
        src_features = self.image_encoder(src_images)
        src_features = src_features.view(batch_size, seq_len, -1)
        
        # Encode prompt
        tgt_embedded = self.prompt_encoder(tgt_prompt)
        tgt_embedded = self.positional_encoding(tgt_embedded)
        
        # Transform
        output = self.transformer(src_features, tgt_embedded)
        
        # Generate instruction probabilities
        return self.fc_out(output)

In [8]:
def train_model(model, train_loader, num_epochs, device):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters())
    
    model.to(device)
    
    for epoch in range(num_epochs):
        model.train()
        for batch in train_loader:
            instructions = batch['instructions'].to(device)
            prompts = batch['prompt'].to(device)
            
            optimizer.zero_grad()
            outputs = model(instructions[:, :-1], prompts)
            loss = criterion(outputs.view(-1, outputs.size(-1)), instructions[:, 1:].view(-1))
            loss.backward()
            optimizer.step()
        
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

In [9]:
def generate_instructions(model, prompt, max_length, device):
    model.eval()
    with torch.no_grad():
        prompt_tensor = torch.tensor([prompt]).to(device)
        output_sequence = [model.prompt_encoder.weight.shape[0] - 1]  # Start token
        
        for _ in range(max_length):
            tgt_tensor = torch.tensor([output_sequence]).to(device)
            output = model(None, tgt_tensor)
            next_item = output[0, -1].argmax().item()
            output_sequence.append(next_item)
            
            if next_item == model.prompt_encoder.weight.shape[0] - 2:  # End token
                break
        
    return output_sequence

In [10]:
# Usage example
num_instructions = 100  # Number of possible instruction types
d_model = 512
nhead = 8
num_encoder_layers = 6
num_decoder_layers = 6

In [None]:
model = LEGOInstructionGenerator(
    num_instructions, 
    d_model, 
    nhead, 
    num_encoder_layers, 
    num_decoder_layers
)

In [None]:
# Assume we have prepared our dataset
# train_loader = DataLoader(lego_dataset, batch_size=32, shuffle=True)

# Train the model
# train_model(model, train_loader, num_epochs=10, device='cuda' if torch.cuda.is_available() else 'cpu')

# Generate new instructions
# prompt = torch.tensor([1, 2, 3, 4, 5])  # Example prompt
# generated_instructions = generate_instructions(model, prompt, max_length=50, device='cuda' if torch.cuda.is_available() else 'cpu')
# print(generated_instructions)