# Import libs and utils

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load
import json
from pathlib import Path

DATA_DIR = '/kaggle/input/arc-prize-2025/'
data = {}
with open(Path(DATA_DIR) / 'arc-agi_training_challenges.json') as f:
    train_challenges = json.load(f)
with open(Path(DATA_DIR) / 'arc-agi_training_solutions.json') as f:
    train_solutions = json.load(f)

with open(Path(DATA_DIR) / 'arc-agi_evaluation_challenges.json') as f:
    eval_challenges = json.load(f)
with open(Path(DATA_DIR) / 'arc-agi_evaluation_solutions.json') as f:
    eval_solutions = json.load(f)

with open(Path(DATA_DIR) / 'arc-agi_test_challenges.json') as f:
    test_challenges = json.load(f)

print(f"Training tasks: {len(train_challenges)}")
print(f"Training solutions: {len(train_solutions)}")
print(f"Evaluation tasks: {len(eval_challenges)}")
print(f"Evaluation solutions: {len(eval_solutions)}")
print(f"Test tasks: {len(test_challenges)}")
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session# Utils
import random
import matplotlib.pyplot as plt
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from torchsummary import summary

def start_train(train_func, *args, **kwargs):
    for epoch in range(50):
        running_loss = 0.0
        print("epoch", epoch+1)
        i = 0
        train_func(*args, **kwargs)
# load data
import pandas as pd
for i in range(0, 1):
    key = list(train_challenges.keys())[i]
    print(train_challenges[key])
    print("----train----")
    print(train_solutions[key])
    print("----train----")

In [None]:
# 
class ARCData(Dataset):
    def __init__(self, dataframe):
        self.task_name = dataframe["task_name"]
        self.input_values = dataframe["task_input"]
        self.task_output = databframe["task_output"]
        self.labels = dataframe["labels"]

    def __len__(self):
        return len(self.text)

    def __getitem__(self, idx):
        # train_challenges
        df = (
            pd.DataFrame(x)
            .transpose()
            .reset_index()
            .rename(columns={'index': 'task_id'})
            .explode('train')
            .reset_index(drop=True)
        )
        df_train = df.join(pd.json_normalize(df['train']))
        df_test = df.join(pd.json_normalize(df['test']))
        df_train = df_train.drop(columns="train")
        df_test = df_test.drop(columns="test")
        labels = self.labels.iloc[idx]

In [None]:
# custom ConvBlock
class ParallelConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(ParallelConvBlock, self).__init__()
        
        self.cnn_block_s = nn.Sequential(
            nn.Conv2d(img_channels, 128, kernel_size=(3, 3), padding=1),
            nn.Conv2d(128, 256, kernel_size=(3, 3), padding=1),
            nn.BatchNorm2d(256),
            nn.AdaptiveAvgPool2d((1))
        )
        
        self.cnn_block_w = nn.Sequential(
            nn.Conv2d(img_channels, 64, kernel_size=(1, 3), padding=1),
            nn.Conv2d(64, 128, kernel_size=(3, 3), padding=1),
            nn.BatchNorm2d(128),
            nn.AdaptiveAvgPool2d((1))
        )
        
        self.cnn_block_h = nn.Sequential(
            nn.Conv2d(img_channels, 64, kernel_size=(3, 1), padding=1),
            nn.Conv2d(64, 128128, kernel_size=(3, 3), padding=1),
            nn.BatchNorm2d(128),
            nn.AdaptiveAvgPool2d((1))
        )

        
        
        self.pool(x).view(-1)

    def forward(self, x):
        x = x.unsqueeze(0)
        x_s = self.cnn_block_s(x).squeeze(2, 3)
        x_h = self.cnn_block_h(x).squeeze(2, 3)
        x_v = self.cnn_block_w(x).squeeze(2, 3)
        
        out = torch.cat([x_s, x_h, x_v], dim=1)
        out = self.bn(out)
        return out

# Discriminator

## data

In [None]:
## prepare train data
inputs = []
labels = []
discriminator_train_data = []
discriminator_test_data = []
# get the inputs; data is a list of [inputs, labels]
for task_name, task_data in train_challenges.items():
    task_bag = []
    for temp in task_data["train"]:
        for k, v in temp.items():
            # i_input = torch.nn.functional.normalize(torch.tensor([v]).float())
            i_input = torch.tensor(v) # +1 is because 0 means transparent layer
            i_label = torch.tensor([int(k == "output"), 1 - int(k == "output")]).float()
            # discriminator_train_data.append((i_input, i_label, task_name))
            task_bag.append((i_input, i_label, task_name))
            # discriminator_train_data.append(i_label)
    discriminator_train_data.append(task_bag)
print(len(discriminator_train_data))
discriminator_train_data[0][0][0]



## model

In [None]:
# create a pretrain model for discriminator
# do 2 things: learn how to pick up pattern from inputs and outputs
# 
class SVMLoss(nn.Module):
    def __init__(self):
        super(SVMLoss, self).__init__()

    def forward(self, outputs, labels, weight_decay):
        # Hinge Loss: max(0, 1 - y * wx)
        hinge_loss = torch.mean(torch.clamp(1 - labels * outputs, min=0))
        # L2 Regularization
        l2_reg = 0.5 * weight_decay * torch.sum(self.model.weight**2) # Assuming 'self.model' is the linear layer
        return hinge_loss + l2_reg

class DynamicLinearGenerator(nn.Module):
    def __init__(self, input_dim, target_in_dim, target_out_dim):
        super().__init__()
        self.input_dim = input_dim
        self.target_in_dim = target_in_dim
        self.target_out_dim = target_out_dim
        
        self.ffn_block = nn.Sequential(
            nn.Linear(input_dim, 256, bias=True),
            nn.ReLU(),
            nn.Linear(256, target_in_dim * target_out_dim + target_out_dim, bias=True),
        )

    def forward(self, x):
        # Generate flat parameters
        params = self.fc(x)
        
        # Split into weight and bias
        weight = params[:, :self.target_in_dim * self.target_out_dim]
        bias = params[:, self.target_in_dim * self.target_out_dim:]
        
        # Reshape weight into (out_dim, in_dim) for linear layer
        weight = weight.view(-1, self.target_out_dim, self.target_in_dim)
        
        return DynamicLinear(weight, bias, in_features=target_in_dim, out_features=target_out_dim)

class DynamicLinear(nn.Linear):
    def __init__(self, weights, bias, *arg, **kwargs):
        super().__init__(*arg, **kwargs)
        with torch.no_grad():
            self.weight = weights
            self.bias = bias



class Discriminator(nn.Module):
    """
        Discriminate if an image is a task input or a task output
        Must be fast train for each task
        Input: 
    """
    def __init__(self, img_channels, learning_block_dim=128):
        super(Discriminator, self).__init__()
        # self.block1 = ParallelConvBlock(img_channels, 32)
        # self.block2 = ParallelConvBlock(32, 64)
        self.learning_block=128
        self.out_channels = 3*32
        cnn_block_s_out = self.out_channels
        self.cnn_block_s = nn.Sequential(
            nn.Conv2d(img_channels, 128, kernel_size=(3, 3), padding=2),
            nn.Conv2d(128, cnn_block_s_out, kernel_size=(3, 3), padding=2),
            nn.BatchNorm2d(cnn_block_s_out),
            nn.AdaptiveAvgPool2d((1))
        )

        cnn_block_w_out = self.out_channels
        self.cnn_block_w = nn.Sequential(
            nn.Conv2d(img_channels, 128, kernel_size=(1, 3), padding=2),
            nn.Conv2d(128, cnn_block_w_out, kernel_size=(3, 3), padding=2),
            nn.BatchNorm2d(cnn_block_w_out),
            nn.AdaptiveAvgPool2d((1))

        )

        cnn_block_h_out = self.out_channels
        self.cnn_block_h = nn.Sequential(
            nn.Conv2d(img_channels, 128, kernel_size=(3, 1), padding=2),
            nn.Conv2d(128, cnn_block_h_out, kernel_size=(3, 3), padding=2),
            nn.BatchNorm2d(cnn_block_h_out),
            nn.AdaptiveAvgPool2d((1))
        )


cnn_block_s_out+cnn_block_h_out+cnn_block_w_out
    
    
        self.softmax = nn.Softmax()


    def forward(self, train_task_input, train_task_output, target, learning_effort=10):
        """
        
        """
        x = torch.stack([train_task_input == i for i in range(0, 10)]).float().unsqueeze(0) # decompose in into channels
        y = torch.stack([train_task_output == i for i in range(0, 10)]).float().unsqueeze(0) # decompose in into channels
        # sampling

        # input features
        x_s = self.cnn_block_s(x).squeeze() # [10, 64, 1]
        x_h = self.cnn_block_h(x).squeeze() # [10, 64, 1]
        x_v = self.cnn_block_w(x).squeeze() # [10, 64, 1]
        
        x_sh_concat = torch.concat([x_s, x_h], dim=0)
        x_sv_concat = torch.concat([x_s, x_v], dim=0)
        x_hv_concat = torch.concat([x_h, x_v], dim=0)

        x_examiner_sh = self.linear(y_sh_concat)
        x_examiner_sv = self.linear(y_sv_concat)
        x_examiner_hv = self.linear(y_hv_concat)
        
        # output features
        y_s = self.cnn_block_s(y).squeeze() # [10, 64, 1]
        y_h = self.cnn_block_h(y).squeeze() # [10, 64, 1]
        y_v = self.cnn_block_w(y).squeeze() # [10, 64, 1]
        
        y_sh_concat = torch.concat([x_s, x_h], dim=0)
        y_sv_concat = torch.concat([x_s, x_v], dim=0)
        y_hv_concat = torch.concat([x_h, x_v], dim=0)
        
        y_examiner_sh = self.linear(y__concat)
        y_examiner_sv = self.linear(y_sh_concat)
        y_examiner_hv = self.linear(y_sh_concat)
            
        x_weight, x_bias = self.ffn_block(torch.concat([x_examiner_sh, x_examiner_sv, x_examiner_hv], dim=0))
        y_weight = self.ffn_block(torch.concat([y_examiner_sh, y_examiner_sv, y_examiner_hv], dim=0))
        
        with torch.no_grad():
            x_fc = nn.Linear(learning_block_dim, 2)
            x_fc.weight.copy_(x_weight.view(2, learning_block_dim))  # Use .copy_() for in-place assignment
            y_fc = nn.Linear(learning_block_dim, 2)
            y_fc.weight.copy_(y_weight.view(2, learning_block_dim))  # Use .copy_() for in-place assignment
        self.fit(x_fc, train)
        x = self.fc(x, train_task_inputs)
        return self.softmax(x)

    def fit(self, model, x, y, epochs=10):
        # create sample for training

        # SVM loss
        criterion = torch.nn.HingeEmbeddingLoss(margin=1.0, size_average=None, reduce=None, reduction='mean')
        optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
        # train
        for epoch in range(epochs):
            # train using permutation (asume permutation will preserve correlation)
            for i in range(0, 10):
                # train
                train_x = torch.roll(x, i, 0)
                optimizer.zero_grad()
                # get output
                outputs = model(train_x)
                # calculate loss
                loss = criterion(outputs, i_label)
                loss.backward()
                
                optimizer.step()
            
        
discriminator = Discriminator(10)
try:
    discriminator.load_state_dict(torch.load(f'/kaggle/working/{discriminator.__class__.__name__}.pth', weights_only=True))
    # discriminator.load_state_dict(torch.load(f'/kaggle/working/discriminator.pth', weights_only=True))
except FileNotFoundError as e:
    display(e)
except Exception as e:
    display(e)

discriminator.eval()
outputs = discriminator(discriminator_train_data[0][0][0])
print(outputs)

In [None]:
import torch
x = torch.tensor(range(1, 41)).view(10, 2, 2)
torch.roll(x, 1, 0)

## train

In [None]:
import torch.optim as optim
# discriminate 2 things: which image is the input and which is the output
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(discriminator.parameters(), lr=0.0001)
def train_discriminator(epoch=1, batch_size = 500):
    for epoch in range(epoch):
        running_loss = 0.0
        print("epoch", epoch+1)
        i = 0
        for task in discriminator_train_data:
            for sub_epoch in range(50):
                for i_input, i_label, task_name in task:
                    # zero the parameter gradients
                    optimizer.zero_grad()
                    # get output
                    outputs = discriminator(i_input)
                    # calculate loss
                    loss = criterion(outputs, i_label)
                    loss.backward()
                    
                    optimizer.step()
            
                    # print statistics
                    running_loss += loss.item()
                    i += 1
    
                    if (i % batch_size) == (batch_size - 1):    # print every 2000 mini-batches
                        print(outputs, i_label)
                        print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / batch_size:.3f}')
                        running_loss = 0.0
        torch.save(discriminator.state_dict(), f'/kaggle/working/{discriminator.__class__.__name__}.pth')

train_discriminator()




In [None]:
# real training
train_discriminator(50)