In [None]:
import json
import math
import os
import random
import shutil
import sys
from collections import Counter, OrderedDict
from pathlib import Path
from typing import ClassVar, Iterator, Sequence

import numpy as np
import pandas as pd
import scipy.stats
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import accuracy_score
from torch import Tensor
from torch.utils.data import DataLoader, Dataset, Sampler, Subset
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score
from sklearn.model_selection import StratifiedKFold
import torch.optim as optim
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import OrderedDict
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, precision_score, recall_score, matthews_corrcoef

#Data flipon prep

In [None]:
shin = pd.read_csv("/content/Shin500_kan_dataset.csv")
endo = pd.read_csv("/content/Endoquad_kan_dataset.csv")
hdna = pd.read_csv("/content/HDNA500_kan_dataset.csv")
kou = pd.read_csv("/content/Kou500_kan_dataset.csv")
g4chip = pd.read_csv("/content/kan_g4_chip.csv")
g4cut = pd.read_csv("/content/kan_g4_cut.csv")
g4seq = pd.read_csv("/content/kan_g4_seq.csv")

In [None]:
df = hdna

In [None]:
df_train = df[df['split'] == 'train']
df_test = df[df['split'] == 'test']

In [None]:
len(df_train["sequence"][0])

512

In [None]:
batch_size = 32

In [None]:
encoding_map = {
    "A": [1, 0, 0, 0],
    "T": [0, 1, 0, 0],
    "C": [0, 0, 1, 0],
    "G": [0, 0, 0, 1]
}

In [None]:
def encode_sequence(sequence):
    encoded_seq = [encoding_map[base] for base in sequence if base in encoding_map]
    return torch.tensor(encoded_seq, dtype=torch.float).t()

def pad_sequences(sequences, target_length):
    padded_sequences = []
    for seq in sequences:
        seq_len = seq.size(1)
        if seq_len < target_length:
            left_padding = (target_length - seq_len) // 2
            right_padding = target_length - seq_len - left_padding
            padded_seq = torch.nn.functional.pad(seq, (left_padding, right_padding))
        else:
            padded_seq = seq[:, :target_length]
        padded_sequences.append(padded_seq)
    return torch.stack(padded_sequences)

def collate_fn(batch):
    sequences, labels = zip(*batch)
    sequence_lengths = [seq.size(1) for seq in sequences]
    target_length = int(np.median(sequence_lengths))
    padded_sequences = pad_sequences(sequences, target_length)
    labels = torch.stack(labels)
    return padded_sequences, labels

In [None]:
class DNASequencesDataset(Dataset):
    def __init__(self, dataframe):
        self.data = dataframe
        self.data = self.data[~self.data['sequence'].str.contains('N')].reset_index(drop=True)
        self.encoded_sequences = [encode_sequence(seq) for seq in self.data['sequence']]
        self.labels = torch.tensor(self.data['label'].values, dtype=torch.long)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.encoded_sequences[idx], self.labels[idx]

In [None]:
train_dataset = DNASequencesDataset(df_train)
test_dataset = DNASequencesDataset(df_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=False)

#Model

KAN Linear

In [None]:
import torch
import torch.nn.functional as F
import math


class KANLinear(torch.nn.Module):
    def __init__(
        self,
        in_features,
        out_features,
        grid_size=5,
        spline_order=3,
        scale_noise=0.1,
        scale_base=1.0,
        scale_spline=1.0,
        enable_standalone_scale_spline=True,
        base_activation=torch.nn.SiLU,
        grid_eps=0.02,
        grid_range=[-1, 1],
    ):
        super(KANLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.grid_size = grid_size
        self.spline_order = spline_order

        h = (grid_range[1] - grid_range[0]) / grid_size
        grid = (
            (
                torch.arange(-spline_order, grid_size + spline_order + 1) * h
                + grid_range[0]
            )
            .expand(in_features, -1)
            .contiguous()
        )
        self.register_buffer("grid", grid)

        self.base_weight = torch.nn.Parameter(torch.Tensor(out_features, in_features))
        self.spline_weight = torch.nn.Parameter(
            torch.Tensor(out_features, in_features, grid_size + spline_order)
        )
        if enable_standalone_scale_spline:
            self.spline_scaler = torch.nn.Parameter(
                torch.Tensor(out_features, in_features)
            )

        self.scale_noise = scale_noise
        self.scale_base = scale_base
        self.scale_spline = scale_spline
        self.enable_standalone_scale_spline = enable_standalone_scale_spline
        self.base_activation = base_activation()
        self.grid_eps = grid_eps

        self.reset_parameters()

    def reset_parameters(self):
        torch.nn.init.kaiming_uniform_(self.base_weight, a=math.sqrt(5) * self.scale_base)
        with torch.no_grad():
            noise = (
                (
                    torch.rand(self.grid_size + 1, self.in_features, self.out_features)
                    - 1 / 2
                )
                * self.scale_noise
                / self.grid_size
            )
            self.spline_weight.data.copy_(
                (self.scale_spline if not self.enable_standalone_scale_spline else 1.0)
                * self.curve2coeff(
                    self.grid.T[self.spline_order : -self.spline_order],
                    noise,
                )
            )
            if self.enable_standalone_scale_spline:
                # torch.nn.init.constant_(self.spline_scaler, self.scale_spline)
                torch.nn.init.kaiming_uniform_(self.spline_scaler, a=math.sqrt(5) * self.scale_spline)

    def b_splines(self, x: torch.Tensor):
        """
        Compute the B-spline bases for the given input tensor.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, in_features).

        Returns:
            torch.Tensor: B-spline bases tensor of shape (batch_size, in_features, grid_size + spline_order).
        """
        assert x.dim() == 2 and x.size(1) == self.in_features

        grid: torch.Tensor = (
            self.grid
        )  # (in_features, grid_size + 2 * spline_order + 1)
        x = x.unsqueeze(-1)
        bases = ((x >= grid[:, :-1]) & (x < grid[:, 1:])).to(x.dtype)
        for k in range(1, self.spline_order + 1):
            bases = (
                (x - grid[:, : -(k + 1)])
                / (grid[:, k:-1] - grid[:, : -(k + 1)])
                * bases[:, :, :-1]
            ) + (
                (grid[:, k + 1 :] - x)
                / (grid[:, k + 1 :] - grid[:, 1:(-k)])
                * bases[:, :, 1:]
            )

        assert bases.size() == (
            x.size(0),
            self.in_features,
            self.grid_size + self.spline_order,
        )
        return bases.contiguous()

    def curve2coeff(self, x: torch.Tensor, y: torch.Tensor):
        """
        Compute the coefficients of the curve that interpolates the given points.

        Args:
            x (torch.Tensor): Input tensor of shape (batch_size, in_features).
            y (torch.Tensor): Output tensor of shape (batch_size, in_features, out_features).

        Returns:
            torch.Tensor: Coefficients tensor of shape (out_features, in_features, grid_size + spline_order).
        """
        assert x.dim() == 2 and x.size(1) == self.in_features
        assert y.size() == (x.size(0), self.in_features, self.out_features)

        A = self.b_splines(x).transpose(
            0, 1
        )  # (in_features, batch_size, grid_size + spline_order)
        B = y.transpose(0, 1)  # (in_features, batch_size, out_features)
        solution = torch.linalg.lstsq(
            A, B
        ).solution  # (in_features, grid_size + spline_order, out_features)
        result = solution.permute(
            2, 0, 1
        )  # (out_features, in_features, grid_size + spline_order)

        assert result.size() == (
            self.out_features,
            self.in_features,
            self.grid_size + self.spline_order,
        )
        return result.contiguous()

    @property
    def scaled_spline_weight(self):
        return self.spline_weight * (
            self.spline_scaler.unsqueeze(-1)
            if self.enable_standalone_scale_spline
            else 1.0
        )

    def forward(self, x: torch.Tensor):
        assert x.size(-1) == self.in_features
        original_shape = x.shape
        x = x.reshape(-1, self.in_features)

        base_output = F.linear(self.base_activation(x), self.base_weight)
        spline_output = F.linear(
            self.b_splines(x).view(x.size(0), -1),
            self.scaled_spline_weight.view(self.out_features, -1),
        )
        output = base_output + spline_output

        output = output.reshape(*original_shape[:-1], self.out_features)
        return output

    @torch.no_grad()
    def update_grid(self, x: torch.Tensor, margin=0.01):
        assert x.dim() == 2 and x.size(1) == self.in_features
        batch = x.size(0)

        splines = self.b_splines(x)  # (batch, in, coeff)
        splines = splines.permute(1, 0, 2)  # (in, batch, coeff)
        orig_coeff = self.scaled_spline_weight  # (out, in, coeff)
        orig_coeff = orig_coeff.permute(1, 2, 0)  # (in, coeff, out)
        unreduced_spline_output = torch.bmm(splines, orig_coeff)  # (in, batch, out)
        unreduced_spline_output = unreduced_spline_output.permute(
            1, 0, 2
        )  # (batch, in, out)

        # sort each channel individually to collect data distribution
        x_sorted = torch.sort(x, dim=0)[0]
        grid_adaptive = x_sorted[
            torch.linspace(
                0, batch - 1, self.grid_size + 1, dtype=torch.int64, device=x.device
            )
        ]

        uniform_step = (x_sorted[-1] - x_sorted[0] + 2 * margin) / self.grid_size
        grid_uniform = (
            torch.arange(
                self.grid_size + 1, dtype=torch.float32, device=x.device
            ).unsqueeze(1)
            * uniform_step
            + x_sorted[0]
            - margin
        )

        grid = self.grid_eps * grid_uniform + (1 - self.grid_eps) * grid_adaptive
        grid = torch.concatenate(
            [
                grid[:1]
                - uniform_step
                * torch.arange(self.spline_order, 0, -1, device=x.device).unsqueeze(1),
                grid,
                grid[-1:]
                + uniform_step
                * torch.arange(1, self.spline_order + 1, device=x.device).unsqueeze(1),
            ],
            dim=0,
        )

        self.grid.copy_(grid.T)
        self.spline_weight.data.copy_(self.curve2coeff(x, unreduced_spline_output))

    def regularization_loss(self, regularize_activation=1.0, regularize_entropy=1.0):
        """
        Compute the regularization loss.

        This is a dumb simulation of the original L1 regularization as stated in the
        paper, since the original one requires computing absolutes and entropy from the
        expanded (batch, in_features, out_features) intermediate tensor, which is hidden
        behind the F.linear function if we want an memory efficient implementation.

        The L1 regularization is now computed as mean absolute value of the spline
        weights. The authors implementation also includes this term in addition to the
        sample-based regularization.
        """
        l1_fake = self.spline_weight.abs().mean(-1)
        regularization_loss_activation = l1_fake.sum()
        p = l1_fake / regularization_loss_activation
        regularization_loss_entropy = -torch.sum(p * p.log())
        return (
            regularize_activation * regularization_loss_activation
            + regularize_entropy * regularization_loss_entropy
        )


class KAN(torch.nn.Module):
    def __init__(
        self,
        layers_hidden,
        grid_size=5,
        spline_order=3,
        scale_noise=0.1,
        scale_base=1.0,
        scale_spline=1.0,
        base_activation=torch.nn.SiLU,
        grid_eps=0.02,
        grid_range=[-1, 1],
    ):
        super(KAN, self).__init__()
        self.grid_size = grid_size
        self.spline_order = spline_order

        self.layers = torch.nn.ModuleList()
        for in_features, out_features in zip(layers_hidden, layers_hidden[1:]):
            self.layers.append(
                KANLinear(
                    in_features,
                    out_features,
                    grid_size=grid_size,
                    spline_order=spline_order,
                    scale_noise=scale_noise,
                    scale_base=scale_base,
                    scale_spline=scale_spline,
                    base_activation=base_activation,
                    grid_eps=grid_eps,
                    grid_range=grid_range,
                )
            )

    def forward(self, x: torch.Tensor, update_grid=True):
        for layer in self.layers:
            if update_grid:
                layer.update_grid(x)
            x = layer(x)
        return x

    def regularization_loss(self, regularize_activation=1.0, regularize_entropy=1.0):
        return sum(
            layer.regularization_loss(regularize_activation, regularize_entropy)
            for layer in self.layers
        )

In [None]:
class Bilinear(nn.Module):
    """
    Simplified Bilinear layer that uses a single Linear layer for pairwise interaction.
    """
    def __init__(self, n: int, out=None, bias=False):
        super().__init__()
        if out is None:
            out = n
        # self.fc = nn.Linear(n, out, bias=bias)
        self.fc = KANLinear(n, out)

    def forward(self, x):
        # Flatten input if necessary
        return self.fc(x)

class SELayer(nn.Module):
    """
    Simplified Squeeze-and-Excite layer.

    Parameters
    ----------
    inp : int
        Middle layer size.
    oup : int
        Input and output size.
    reduction : int, optional
        Reduction parameter. Default is 4.
    """
    def __init__(self, inp, oup, reduction=4):
        super(SELayer, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Sequential(
            KANLinear(oup, inp // reduction),
            # nn.Linear(oup, inp // reduction, bias=False),
            # nn.SiLU(),
            KANLinear(inp // reduction, oup),
            # nn.Linear(inp // reduction, oup, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _ = x.size()
        y = x.view(b, c, -1).mean(dim=2)
        y = self.fc(y).view(b, c, 1)
        return x * y

# Main SeqNN model
class SeqNN(nn.Module):
    """
    LegNet neural network for binary classification.
    """
    __constants__ = ('resize_factor')

    def __init__(self,
                seqsize,
                use_single_channel,
                use_reverse_channel,
                use_multisubstate_channel,
                block_sizes=[256, 256, 128, 128, 64, 64, 32, 32],
                ks=5,
                resize_factor=4,
                activation=nn.SiLU,
                filter_per_group=2,
                se_reduction=4,
                final_ch=1,
                bn_momentum=0.1):
        super().__init__()
        self.block_sizes = block_sizes
        self.resize_factor = resize_factor
        self.se_reduction = se_reduction
        self.seqsize = seqsize
        self.use_single_channel = use_single_channel
        self.use_reverse_channel = use_reverse_channel
        self.use_multisubstate_channel = use_multisubstate_channel
        self.final_ch = final_ch
        self.bn_momentum = bn_momentum
        seqextblocks = OrderedDict()

        in_channels_first_block = 4
        if self.use_single_channel:
            in_channels_first_block += 1
        if self.use_reverse_channel:
            in_channels_first_block += 1
        if self.use_multisubstate_channel:
            in_channels_first_block += 1

        block = nn.Sequential(
            nn.Conv1d(
                in_channels=in_channels_first_block,
                out_channels=block_sizes[0],
                kernel_size=ks,
                padding='same',
                bias=False
            ),
            nn.BatchNorm1d(block_sizes[0], momentum=self.bn_momentum),
            activation()
        )
        seqextblocks['blc0'] = block

        # Building remaining blocks
        for ind, (prev_sz, sz) in enumerate(zip(block_sizes[:-1], block_sizes[1:])):
            block = nn.Sequential(
                nn.Conv1d(prev_sz, sz * self.resize_factor, kernel_size=1, padding='same', bias=False),
                nn.BatchNorm1d(sz * self.resize_factor, momentum=self.bn_momentum),
                activation(),

                nn.Conv1d(sz * self.resize_factor, sz * self.resize_factor, kernel_size=ks,
                          groups=sz * self.resize_factor // filter_per_group, padding='same', bias=False),
                nn.BatchNorm1d(sz * self.resize_factor, momentum=self.bn_momentum),
                activation(),

                SELayer(prev_sz, sz * self.resize_factor, reduction=self.se_reduction),

                nn.Conv1d(sz * self.resize_factor, prev_sz, kernel_size=1, padding='same', bias=False),
                nn.BatchNorm1d(prev_sz, momentum=self.bn_momentum),
                activation(),
            )
            seqextblocks[f'inv_res_blc{ind}'] = block

            resize_block = nn.Sequential(
                nn.Conv1d(2 * prev_sz, sz, kernel_size=ks, padding='same', bias=False),
                nn.BatchNorm1d(sz, momentum=self.bn_momentum),
                activation()
            )
            seqextblocks[f'resize_blc{ind}'] = resize_block

        self.seqextractor = nn.ModuleDict(seqextblocks)

        self.mapper = nn.Sequential(
            nn.Conv1d(block_sizes[-1], self.final_ch, kernel_size=1, padding='same'),
            activation()
        )

        self.register_buffer('bins', torch.arange(start=0, end=self.final_ch, step=1, requires_grad=False))

    def feature_extractor(self, x):
        x = self.seqextractor['blc0'](x)
        for i in range(len(self.block_sizes) - 1):
            x = torch.cat([x, self.seqextractor[f'inv_res_blc{i}'](x)], dim=1)
            x = self.seqextractor[f'resize_blc{i}'](x)
        return x

    def forward(self, x):
        f = self.feature_extractor(x)
        x = self.mapper(f)
        x = F.adaptive_avg_pool1d(x, 1)
        x = x.squeeze(2)
        prob = torch.sigmoid(x).squeeze(1)

        return prob
        # logprobs = F.log_softmax(x, dim=1)

        # # Soft-argmax operation (optional)
        # x = F.softmax(x, dim=1)
        # score = (x * self.bins).sum(dim=1)

        # return logprobs, score


KAN Conv

In [None]:
#Util
def add_padding_1d(array: np.ndarray, padding: int) -> np.ndarray:
    """Adds padding to a 1D array."""
    n = array.shape[0]
    padded_array = np.zeros(n + 2 * padding)
    padded_array[padding: n + padding] = array
    return padded_array


def calc_out_dims_1d(array, kernel_size, stride, dilation, padding):
    """Calculate output dimensions for 1D convolution."""
    batch_size, n_channels, n = matrix.shape
    out_size = np.floor((n + 2 * padding - kernel_size - (kernel_size - 1) * (dilation - 1)) / stride).astype(int) + 1
    return out_size, batch_size, n_channels


def multiple_convs_kan_conv1d(array,
                               kernels,
                               kernel_size,
                               out_channels,
                               stride=1,
                               dilation=1,
                               padding=0,
                               device="cuda") -> torch.Tensor:
    """Performs a 1D convolution with multiple kernels on the input array using specified stride, dilation, and padding.

    Args:
        array (torch.Tensor): 1D tensor of shape (batch_size, channels, length).
        kernels (list): List of kernel functions to be applied.
        kernel_size (int): Size of the 1D kernel.
        out_channels (int): Number of output channels.
        stride (int): Stride along the length of the array. Default is 1.
        dilation (int): Dilation rate along the length of the array. Default is 1.
        padding (int): Number of elements to pad on each side. Default is 0.
        device (str): Device to perform calculations on. Default is "cuda".

    Returns:
        torch.Tensor: Feature map after convolution with shape (batch_size, out_channels, length_out).
    """
    length_out, batch_size = calc_out_dims_1d(array, kernel_size, stride, dilation, padding)
    n_convs = len(kernels)

    array_out = torch.zeros((batch_size, out_channels, length_out)).to(device)

    array = F.pad(array, (padding, padding), mode='constant', value=0)
    conv_groups = array.unfold(2, kernel_size, stride)
    conv_groups = conv_groups.contiguous()

    kern_per_out = len(kernels) // out_channels

    for c_out in range(out_channels):
        out_channel_accum = torch.zeros((batch_size, length_out), device=device)

        for k_idx in range(kern_per_out):
            kernel = kernels[c_out * kern_per_out + k_idx]
            conv_result = kernel(conv_groups.view(-1, 1, kernel_size))
            out_channel_accum += conv_result.view(batch_size, length_out)

        array_out[:, c_out, :] = out_channel_accum

    return array_out

In [None]:
def kan_conv1d(matrix: torch.Tensor,
               kernel,
               kernel_size: int,
               stride: int = 1,
               dilation: int = 1,
               padding: int = 0,
               device: str = "cpu") -> torch.Tensor:
    """
    Performs a 1D convolution with the given kernel over a 1D matrix using the defined stride, dilation, and padding.

    Args:
        matrix (torch.Tensor): 3D tensor (batch_size, channels, width) to be convolved.
        kernel (function): Kernel function to apply on the 1D patches of the matrix.
        kernel_size (int): Size of the kernel (assumed to be square).
        stride (int, optional): Stride along the width axis. Defaults to 1.
        dilation (int, optional): Dilation along the width axis. Defaults to 1.
        padding (int, optional): Padding along the width axis. Defaults to 0.
        device (str): Device to perform the operation on (e.g., "cuda" or "cpu").

    Returns:
        torch.Tensor: 1D Feature map after convolution.
    """

    batch_size, n_channels, width_in = matrix.shape
    width_out = ((width_in + 2 * padding - dilation * (kernel_size - 1) - 1) // stride) + 1
    matrix_out = torch.zeros((batch_size, n_channels, width_out), device=device)

    matrix_padded = torch.nn.functional.pad(matrix, (padding, padding))

    for i in range(width_out):

        start = i * stride
        end = start + kernel_size * dilation
        patch = matrix_padded[:, :, start:end:dilation]

        matrix_out[:, :, i] = kernel.forward(patch).squeeze(-1)

    return matrix_out

In [None]:
class KAN_Convolutional_Layer_1D(torch.nn.Module):
    def __init__(self, in_channels=1, out_channels=1, kernel_size=5, stride=1, padding=0, dilation=1, device="cuda"):
        super(KAN_Convolutional_Layer_1D, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.dilation = dilation
        self.device = device
        self.convs = torch.nn.ModuleList([KAN_Convolution_1D(kernel_size, stride, padding, dilation, device) for _ in range(in_channels * out_channels)])

    def forward(self, x: torch.Tensor):
        return torch.cat([conv(x[:, i, :].unsqueeze(1)) for i, conv in enumerate(self.convs)], dim=1)

In [None]:
class KAN_Convolutional_Layer_1D(torch.nn.Module):
    def __init__(
            self,
            in_channels: int = 1,
            out_channels: int = 1,
            kernel_size: int = 2,
            stride: int = 1,
            padding: int = 0,
            dilation: int = 1,
            grid_size: int = 5,
            spline_order: int = 3,
            scale_noise: float = 0.1,
            scale_base: float = 1.0,
            scale_spline: float = 1.0,
            base_activation=torch.nn.SiLU,
            grid_eps: float = 0.02,
            grid_range: tuple = [-1, 1],
            device: str = "cpu"
        ):
        super(KAN_Convolutional_Layer_1D, self).__init__()
        self.out_channels = out_channels
        self.in_channels = in_channels
        self.kernel_size = kernel_size
        self.dilation = dilation
        self.padding = padding
        self.stride = stride


        self.convs = torch.nn.ModuleList()
        for _ in range(in_channels * out_channels):
            self.convs.append(
                KAN_Convolution_1D(
                    kernel_size=kernel_size,
                    stride=stride,
                    padding=padding,
                    dilation=dilation,
                    grid_size=grid_size,
                    spline_order=spline_order,
                    scale_noise=scale_noise,
                    scale_base=scale_base,
                    scale_spline=scale_spline,
                    base_activation=base_activation,
                    grid_eps=grid_eps,
                    grid_range=grid_range,
                )
            )

    def forward(self, x: torch.Tensor):
        batch_size, in_channels, length = x.shape
        output_length = (length + 2 * self.padding - self.dilation * (self.kernel_size - 1) - 1) // self.stride + 1
        output = torch.zeros((batch_size, self.out_channels, output_length), device=x.device)


        for i in range(self.out_channels):
            output_accum = torch.zeros((batch_size, output_length), device=x.device)
            for j in range(self.in_channels):
                kernel_idx = i * self.in_channels + j
                conv_result = self.convs[kernel_idx].forward(x[:, j, :].unsqueeze(1))
                output_accum += conv_result.squeeze(1)  # Squeeze
            output[:, i, :] = output_accum  # A to output channel

        return output

class KAN_Convolution_1D(torch.nn.Module):
    def __init__(
            self,
            kernel_size: int = 2,
            stride: int = 1,
            padding: int = 0,
            dilation: int = 1,
            grid_size: int = 50,
            spline_order: int = 3,
            scale_noise: float = 0.1,
            scale_base: float = 1.0,
            scale_spline: float = 1.0,
            base_activation=torch.nn.SiLU,
            grid_eps: float = 0.02,
            grid_range: tuple = [-1, 1]
        ):
        super(KAN_Convolution_1D, self).__init__()
        self.kernel_size = kernel_size
        self.stride = stride
        self.padding = padding
        self.dilation = dilation
        self.conv = KANLinear(
            in_features = kernel_size,
            out_features = 1,
            grid_size=grid_size,
            spline_order=spline_order,
            scale_noise=scale_noise,
            scale_base=scale_base,
            scale_spline=scale_spline,
            base_activation=base_activation,
            grid_eps=grid_eps,
            grid_range=grid_range
        )

    def forward(self, x: torch.Tensor):
        self.device = x.device
        return kan_conv1d(x, self.conv, self.kernel_size,self.stride, self.dilation, self.padding, self.device)

Example with changed mapper

In [None]:
# class SeqNN(nn.Module):
#     """
#     SeqNN neural network for binary classification with modified mapper using KAN_Convolutional_Layer_1D.
#     """
#     def __init__(self,
#                  seqsize,
#                  use_single_channel,
#                  use_reverse_channel,
#                  use_multisubstate_channel,
#                  block_sizes=[256, 256, 128, 128, 64, 64, 32, 32],
#                  ks=5,
#                  resize_factor=4,
#                  activation=nn.SiLU,
#                  filter_per_group=2,
#                  se_reduction=4,
#                  final_ch=1,
#                  bn_momentum=0.1):
#         super().__init__()

#         self.block_sizes = block_sizes
#         self.resize_factor = resize_factor
#         self.se_reduction = se_reduction
#         self.seqsize = seqsize
#         self.use_single_channel = use_single_channel
#         self.use_reverse_channel = use_reverse_channel
#         self.use_multisubstate_channel = use_multisubstate_channel
#         self.final_ch = final_ch
#         self.bn_momentum = bn_momentum

#         seqextblocks = OrderedDict()
#         in_channels_first_block = 4  # 4 channels for A, T, C, G
#         if self.use_single_channel:
#             in_channels_first_block += 1
#         if self.use_reverse_channel:
#             in_channels_first_block += 1
#         if self.use_multisubstate_channel:
#             in_channels_first_block += 1

#         # First layer
#         block = nn.Sequential(
#             nn.Conv1d(
#                 in_channels=in_channels_first_block,
#                 out_channels=block_sizes[0],
#                 kernel_size=ks,
#                 padding='same',
#                 bias=False
#             ),
#             nn.BatchNorm1d(block_sizes[0], momentum=self.bn_momentum),
#             activation()
#         )
#         seqextblocks['blc0'] = block

#         # Building remaining blocks
#         for ind, (prev_sz, sz) in enumerate(zip(block_sizes[:-1], block_sizes[1:])):
#             block = nn.Sequential(
#                 nn.Conv1d(prev_sz, sz * self.resize_factor, kernel_size=1, padding='same', bias=False),
#                 nn.BatchNorm1d(sz * self.resize_factor, momentum=self.bn_momentum),
#                 activation(),

#                 nn.Conv1d(sz * self.resize_factor, sz * self.resize_factor, kernel_size=ks,
#                           groups=sz * self.resize_factor // filter_per_group, padding='same', bias=False),
#                 nn.BatchNorm1d(sz * self.resize_factor, momentum=self.bn_momentum),
#                 activation(),

#                 SELayer(prev_sz, sz * self.resize_factor, reduction=self.se_reduction),

#                 nn.Conv1d(sz * self.resize_factor, prev_sz, kernel_size=1, padding='same', bias=False),
#                 nn.BatchNorm1d(prev_sz, momentum=self.bn_momentum),
#                 activation(),
#             )
#             seqextblocks[f'inv_res_blc{ind}'] = block

#             resize_block = nn.Sequential(
#                 nn.Conv1d(2 * prev_sz, sz, kernel_size=ks, padding='same', bias=False),
#                 nn.BatchNorm1d(sz, momentum=self.bn_momentum),
#                 activation()
#             )
#             seqextblocks[f'resize_blc{ind}'] = resize_block

#         self.seqextractor = nn.ModuleDict(seqextblocks)

#         self.mapper = nn.Sequential(
#             KAN_Convolutional_Layer_1D(
#                 in_channels=block_sizes[-1],
#                 out_channels=self.final_ch,
#                 kernel_size=1,
#                 padding=0,
#                 device="cuda"
#             ),
#             activation()
#         )

#         self.register_buffer('bins', torch.arange(start=0, end=self.final_ch, step=1, requires_grad=False))

#     def feature_extractor(self, x):
#         x = self.seqextractor['blc0'](x)
#         for i in range(len(self.block_sizes) - 1):
#             x = torch.cat([x, self.seqextractor[f'inv_res_blc{i}'](x)], dim=1)
#             x = self.seqextractor[f'resize_blc{i}'](x)
#         return x

#     def forward(self, x):
#         f = self.feature_extractor(x)
#         x = self.mapper(f)
#         x = F.adaptive_avg_pool1d(x, 1)
#         x = x.squeeze(2)
#         prob = torch.sigmoid(x).squeeze(1)

#         return prob

In [None]:
device = 'cuda'

In [None]:
model = SeqNN(
    seqsize=512,
    use_single_channel=False,
    use_reverse_channel=False,
    use_multisubstate_channel=False,
    final_ch=1
)

criterion = nn.BCELoss()
optimizer = optim.AdamW(model.parameters(), lr=5e-4, weight_decay=0.005)


In [None]:
model.to(device)

SeqNN(
  (seqextractor): ModuleDict(
    (blc0): Sequential(
      (0): Conv1d(4, 256, kernel_size=(5,), stride=(1,), padding=same, bias=False)
      (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU()
    )
    (inv_res_blc0): Sequential(
      (0): Conv1d(256, 1024, kernel_size=(1,), stride=(1,), padding=same, bias=False)
      (1): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU()
      (3): Conv1d(1024, 1024, kernel_size=(5,), stride=(1,), padding=same, groups=512, bias=False)
      (4): BatchNorm1d(1024, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (5): SiLU()
      (6): SELayer(
        (avg_pool): AdaptiveAvgPool1d(output_size=1)
        (fc): Sequential(
          (0): KANLinear(
            (base_activation): SiLU()
          )
          (1): KANLinear(
            (base_activation): SiLU()
          )
          (2): Sigmoid()
        )
      )
      (

In [None]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params}")

Total number of parameters: 4990177


In [None]:
def train(model, loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    all_preds = []
    all_labels = []

    for sequences, labels in loader:
        sequences, labels = sequences.to(device), labels.to(device)
        optimizer.zero_grad()

        probs = model(sequences)
        loss = criterion(probs, labels.float())

        loss.backward()
        optimizer.step()

        running_loss += loss.item() * sequences.size(0)
        all_preds.extend(probs.detach().cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

    epoch_loss = running_loss / len(loader.dataset)

    all_preds = [1 if p >= 0.5 else 0 for p in all_preds]
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    auc = roc_auc_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    mcc = matthews_corrcoef(all_labels, all_preds)

    return epoch_loss, accuracy, f1, auc, precision, recall, mcc


def evaluate(model, loader, criterion, device):
    model.eval()
    eval_loss = 0.0
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for sequences, labels in loader:
            sequences, labels = sequences.to(device), labels.to(device)

            probs = model(sequences)
            loss = criterion(probs, labels.float())
            eval_loss += loss.item() * sequences.size(0)

            all_preds.extend(probs.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    epoch_loss = eval_loss / len(loader.dataset)

    all_preds = [1 if p >= 0.5 else 0 for p in all_preds]
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    auc = roc_auc_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    mcc = matthews_corrcoef(all_labels, all_preds)

    return epoch_loss, accuracy, f1, auc, precision, recall, mcc

Shin

In [None]:
num_epochs = 7
for epoch in range(num_epochs):
    train_loss, train_acc, train_f1, train_auc, train_precision, train_recall, train_mcc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_f1, val_auc, val_precision, val_recall, val_mcc = evaluate(model, test_loader, criterion, device)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, F1: {train_f1:.4f}, AUC: {train_auc:.4f}, "
          f"Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, MCC: {train_mcc:.4f}")
    print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}, "
          f"Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, MCC: {val_mcc:.4f}")

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Epoch 1/7
Train - Loss: 0.5691, Acc: 0.7962, F1: 0.8223, AUC: 0.7959, Precision: 0.7299, Recall: 0.9414, MCC: 0.6188
Val   - Loss: 0.6939, Acc: 0.5034, F1: 0.0000, AUC: 0.5000, Precision: 0.0000, Recall: 0.0000, MCC: 0.0000
Epoch 2/7
Train - Loss: 0.4617, Acc: 0.9206, F1: 0.9246, AUC: 0.9205, Precision: 0.8812, Recall: 0.9724, MCC: 0.8456
Val   - Loss: 0.4923, Acc: 0.7034, F1: 0.7701, AUC: 0.7055, Precision: 0.6261, Recall: 1.0000, MCC: 0.5072
Epoch 3/7
Train - Loss: 0.3911, Acc: 0.9741, F1: 0.9741, AUC: 0.9741, Precision: 0.9758, Recall: 0.9724, MCC: 0.9482
Val   - Loss: 0.3671, Acc: 1.0000, F1: 1.0000, AUC: 1.0000, Precision: 1.0000, Recall: 1.0000, MCC: 1.0000
Epoch 4/7
Train - Loss: 0.3751, Acc: 0.9793, F1: 0.9792, AUC: 0.9793, Precision: 0.9826, Recall: 0.9759, MCC: 0.9586
Val   - Loss: 0.3656, Acc: 0.9862, F1: 0.9863, AUC: 0.9863, Precision: 0.9730, Recall: 1.0000, MCC: 0.9728
Epoch 5/7
Train - Loss: 0.3567, Acc: 0.9827, F1: 0.9829, AUC: 0.9827, Precision: 0.9762, Recall: 0.9897,

Kou

In [None]:
for epoch in range(num_epochs):
    train_loss, train_acc, train_f1, train_auc, train_precision, train_recall, train_mcc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_f1, val_auc, val_precision, val_recall, val_mcc = evaluate(model, test_loader, criterion, device)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, F1: {train_f1:.4f}, AUC: {train_auc:.4f}, "
          f"Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, MCC: {train_mcc:.4f}")
    print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}, "
          f"Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, MCC: {val_mcc:.4f}")

HDNA

In [None]:
for epoch in range(num_epochs):
    train_loss, train_acc, train_f1, train_auc, train_precision, train_recall, train_mcc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_f1, val_auc, val_precision, val_recall, val_mcc = evaluate(model, test_loader, criterion, device)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, F1: {train_f1:.4f}, AUC: {train_auc:.4f}, "
          f"Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, MCC: {train_mcc:.4f}")
    print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}, "
          f"Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, MCC: {val_mcc:.4f}")

Endo

In [None]:
for epoch in range(num_epochs):
    train_loss, train_acc, train_f1, train_auc, train_precision, train_recall, train_mcc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_f1, val_auc, val_precision, val_recall, val_mcc = evaluate(model, test_loader, criterion, device)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, F1: {train_f1:.4f}, AUC: {train_auc:.4f}, "
          f"Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, MCC: {train_mcc:.4f}")
    print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}, "
          f"Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, MCC: {val_mcc:.4f}")

g4chip

In [None]:
for epoch in range(num_epochs):
    train_loss, train_acc, train_f1, train_auc, train_precision, train_recall, train_mcc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_f1, val_auc, val_precision, val_recall, val_mcc = evaluate(model, test_loader, criterion, device)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, F1: {train_f1:.4f}, AUC: {train_auc:.4f}, "
          f"Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, MCC: {train_mcc:.4f}")
    print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}, "
          f"Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, MCC: {val_mcc:.4f}")

g4cut

In [None]:
for epoch in range(num_epochs):
    train_loss, train_acc, train_f1, train_auc, train_precision, train_recall, train_mcc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_f1, val_auc, val_precision, val_recall, val_mcc = evaluate(model, test_loader, criterion, device)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, F1: {train_f1:.4f}, AUC: {train_auc:.4f}, "
          f"Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, MCC: {train_mcc:.4f}")
    print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}, "
          f"Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, MCC: {val_mcc:.4f}")

g4seq

In [None]:
for epoch in range(num_epochs):
    train_loss, train_acc, train_f1, train_auc, train_precision, train_recall, train_mcc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc, val_f1, val_auc, val_precision, val_recall, val_mcc = evaluate(model, test_loader, criterion, device)

    print(f"Epoch {epoch+1}/{num_epochs}")
    print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}, F1: {train_f1:.4f}, AUC: {train_auc:.4f}, "
          f"Precision: {train_precision:.4f}, Recall: {train_recall:.4f}, MCC: {train_mcc:.4f}")
    print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}, "
          f"Precision: {val_precision:.4f}, Recall: {val_recall:.4f}, MCC: {val_mcc:.4f}")