In [2]:
import pytorch_lightning as pl
from torch import nn 
import torch 



def cal_accuracy(probs, target):
    predictions = probs.argmax(dim=1)
    corrects = predictions == target
    accuracy = corrects.sum().float() / float(target.size(0))
    return accuracy
class TN(nn.Module):

    # tensor network unit
    def __init__(self, rank, output_size):
        super(TN, self).__init__()

        self.rank = rank
        self.output_size = output_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # self.device = torch.device('cpu')
        self.i2h = nn.Linear(self.rank, self.rank)
        self.h2o = nn.Linear(self.rank, output_size)

    def forward(self, data, m):
        # input = torch.cat((data, m.squeeze(1)), 1)

        # hidden = self.i2h(input)
        # output = self.h2o(hidden)

        # unit = self.i2h(data)
        unit = data.contiguous().view(-1, self.rank, self.rank)
        # get hidden
        activition = torch.nn.Tanh()
        # batch_size = unit.size(0)

        # weight = self.i2h.weight.unsqueeze(0).repeat([batch_size,1,1])
        # unit = torch.einsum("bij,bjk->bik",[unit,weight])
        m = activition(torch.einsum("bij,bjk->bik", [m, unit]))

        # # m = unit
        hidden = self.i2h(m)
        output = self.h2o(hidden)
        return hidden, output

    def init_m1(self, batch_size):
        return torch.ones(batch_size, 1, self.rank).to(self.device)
        # return nn.Linear(1,self.rank).to(self.device)

    def init_m2(self):
        return nn.Linear(self.rank, self.output_size)

    def init_hidden(self, batch_size):
        return torch.zeros(batch_size, self.rank).to(self.device)


class TN_layer(nn.Module):
    def __init__(self, rank, vocab_size, output_size):
        super(TN_layer, self).__init__()
        self.tn = TN(rank, output_size)
        self.rank = rank
        self.embedding = nn.Embedding(vocab_size, self.rank * self.rank, padding_idx=0)

        # self.embedding.weight.requires_grad = False
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        batch_size = x.size(0)
        seq_len = x.size(1)

        encoding = self.embedding(x)

        # m = self.tn.init_hidden(batch_size)
        m = self.tn.init_m1(batch_size)
        # m = m.weight.view(-1,self.rank).unsqueeze(0).repeat([batch_size,1,1])
        hiddens = []
        # recurrent tn
        for i in range(seq_len):
            m, output = self.tn(encoding[:, i, :], m)
            hiddens.append(m)
        final_hidden = m
        hidden_tensor = torch.cat(hiddens, 1)
        return hidden_tensor, final_hidden


class TN_model_for_classfication(nn.Module):
    def __init__(self, rank, vocab_size, output_size):
        super(TN_model_for_classfication, self).__init__()

        self.rank = rank
        self.output_size = output_size
        self.vocab_size = vocab_size

        self.tn = TN_layer(self.rank, self.vocab_size, output_size)
        self.fc = nn.Linear(self.rank, output_size)
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.softmax = nn.Softmax(dim=1)
        self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, x, lens):
        seq_output, hidden = self.tn(x)
        # out = out.contiguous().view(-1,self.rank)
        output = self.fc(hidden.squeeze(1))

        return output

    def init_hidden(self, batch_size):
        hidden = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(self.device)
        return hidden

class LitModel(pl.LightningModule):

    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(nn.Linear(256, 512), nn.BatchNorm1d(512))
        self.example_input_array = torch.zeros(10, 256)  # optional

    def forward(self, x):
        return self.net(x)

class Baselignthing(pl.LightningModule):
    def __init__(
        self,
    ):
        super().__init__()

    def forward(self, x, lengths):
        encode = self.model(x, lengths)
        return encode

    # optimizers go into configure_optimizer

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

    # train and validation
    def training_step(self, train_batch, batch_idx):
        text, label, lengths = train_batch
        predictions = self.model(text, lengths)
        criterion = nn.CrossEntropyLoss()

        loss = criterion(predictions, label)
        acc = cal_accuracy(predictions, label)
        self.log("train_loss", loss)
        self.log("acc", acc)
        return {"loss": loss, "train_acc": acc}

    def validation_step(self, val_batch, batch_idx):
        text, label, lengths = val_batch
        predictions = self.model(text, lengths)
        criterion = nn.CrossEntropyLoss()

        loss = criterion(predictions, label)
        acc = cal_accuracy(predictions, label)
        self.log("val_loss", loss)
        self.log("val_acc", acc)
        return acc

    def training_epoch_end(self, train_step_outputs) -> None:
        all_acc = torch.stack([x["train_acc"] for x in train_step_outputs])
        print("train_epoch_acc:", torch.mean(all_acc))
        self.log("train_epoch_acc", torch.mean(all_acc))

    def validation_epoch_end(self, validation_step_outputs):
        all_acc = torch.stack(validation_step_outputs)
        print("val_epoch_acc:", torch.mean(all_acc))
        self.log("val_epoch_acc", torch.mean(all_acc))

class litTNLM(Baselignthing):
    def __init__(self, rank, vocab_size, output_size):
        super().__init__()
        self.model = TN_model_for_classfication(
            rank=rank, vocab_size=vocab_size, output_size=2
        )

In [3]:
vocab = 10000
model = litTNLM(rank=100, vocab_size=vocab, output_size=2)

In [4]:
from pytorch_lightning.utilities.model_summary import ModelSummary
ModelSummary(model, max_depth=4)

  | Name               | Type                       | Params
------------------------------------------------------------------
0 | model              | TN_model_for_classfication | 100 M 
1 | model.tn           | TN_layer                   | 100 M 
2 | model.tn.tn        | TN                         | 10.3 K
3 | model.tn.tn.i2h    | Linear                     | 10.1 K
4 | model.tn.tn.h2o    | Linear                     | 202   
5 | model.tn.embedding | Embedding                  | 100 M 
6 | model.tn.dropout   | Dropout                    | 0     
7 | model.fc           | Linear                     | 202   
8 | model.softmax      | Softmax                    | 0     
9 | model.log_softmax  | LogSoftmax                 | 0     
------------------------------------------------------------------
100 M     Trainable params
0         Non-trainable params
100 M     Total params
400.042   Total estimated model params size (MB)

In [5]:
class CNN(nn.Module):
    def __init__(self, vocab_dim, e_dim, h_dim, o_dim):
        super(CNN, self).__init__()
        self.emb = nn.Embedding(vocab_dim, e_dim, padding_idx=0)
        self.dropout = nn.Dropout(0.2)
        self.conv1 = nn.Conv2d(1, h_dim, (3, e_dim))
        self.conv2 = nn.Conv2d(1, h_dim, (4, e_dim))
        self.conv3 = nn.Conv2d(1, h_dim, (5, e_dim))
        self.fc = nn.Linear(h_dim * 3, o_dim)
        # self.softmax = nn.Softmax(dim=1)
        # self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, x):
        embed = self.dropout(self.emb(x)).unsqueeze(1)
        c1 = torch.relu(self.conv1(embed).squeeze(3))
        p1 = torch.max_pool1d(c1, c1.size()[2]).squeeze(2)
        c2 = torch.relu(self.conv2(embed).squeeze(3))
        p2 = torch.max_pool1d(c2, c2.size()[2]).squeeze(2)
        c3 = torch.relu(self.conv3(embed).squeeze(3))
        p3 = torch.max_pool1d(c3, c3.size()[2]).squeeze(2)
        pool = self.dropout(torch.cat((p1, p2, p3), 1))
        hidden = self.fc(pool)
        # return self.softmax(hidden), self.log_softmax(hidden)
        return hidden


class litCNN(Baselignthing):
    def __init__(self, vocab_dim, e_dim, h_dim, o_dim):
        super().__init__()
        self.model = CNN(vocab_dim, e_dim, h_dim, o_dim)

        # find the batch_size
        self.save_hyperparameters()

    def forward(self, x):
        encode = self.model(x)
        return encode

    # train and validation
    def training_step(self, train_batch, batch_idx):
        text, label, lengths = train_batch
        predictions = self.model(text)
        criterion = nn.CrossEntropyLoss()

        loss = criterion(predictions, label)
        acc = cal_accuracy(predictions, label)
        self.log("train_loss", loss)
        self.log("acc", acc)
        return {"loss": loss, "train_acc": acc}

    def validation_step(self, val_batch, batch_idx):
        text, label, lengths = val_batch
        predictions = self.model(text)
        criterion = nn.CrossEntropyLoss()

        loss = criterion(predictions, label)
        acc = cal_accuracy(predictions, label)
        self.log("val_loss", loss)
        self.log("val_acc", acc)
        return acc

In [6]:
model = litCNN(10000,e_dim = 300,h_dim = 64, o_dim = 2)
print(model)
# ModelSummary(model, max_depth=4)

litCNN(
  (model): CNN(
    (emb): Embedding(10000, 300, padding_idx=0)
    (dropout): Dropout(p=0.2, inplace=False)
    (conv1): Conv2d(1, 64, kernel_size=(3, 300), stride=(1, 1))
    (conv2): Conv2d(1, 64, kernel_size=(4, 300), stride=(1, 1))
    (conv3): Conv2d(1, 64, kernel_size=(5, 300), stride=(1, 1))
    (fc): Linear(in_features=192, out_features=2, bias=True)
  )
)


In [27]:
class RNN(nn.Module):

    # you can also accept arguments in your model constructor

    #  we don't use the output in this implemention
    def __init__(self, embed_size, hidden_size, output_size):
        super(RNN, self).__init__()

        self.hidden_size = hidden_size
        input_size = embed_size + hidden_size
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.Wih = nn.Parameter(torch.FloatTensor(embed_size,hidden_size))
        self.Whh = nn.Parameter(torch.FloatTensor(hidden_size,hidden_size))
        # self.i2h = nn.Linear(input_size, hidden_size)
        self.h2o = nn.Linear(input_size, output_size)

    def forward(self, data, last_hidden):
        input = torch.cat((data, last_hidden), 1)
        # hidden = torch.sigmoid(self.i2h(input))
        wi = torch.mm(data,self.Wih)
        wh = torch.mm(last_hidden,self.Whh)


        hidden = torch.tanh(wi + wh)
        output = self.h2o(input)
        return output, hidden

    def initHidden(self, batch_size):
        # return torch.zeros(batch_size,self.hidden_size).to(self.device)
        return nn.init.kaiming_uniform_(torch.empty(batch_size, self.hidden_size)).to(
            self.device
        )


class RNN_layer(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_dim, output_size):
        super(RNN_layer, self).__init__()
        self.rnn = RNN(embed_size, hidden_dim, output_size)
        self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=0)
        # self.embedding.weight.requires_grad = False
        self.dropout = nn.Dropout(0.2)

    def forward(self, x):
        batch_size = x.size(0)
        seq_len = x.size(1)

        x = self.dropout(self.embedding(x))

        hidden = self.rnn.initHidden(batch_size)
        hiddens = []
        # recurrent rnn
        for i in range(seq_len):
            output, hidden = self.rnn(x[:, i, :], hidden)
            hiddens.append(hidden.unsqueeze(1))
        final_hidden = hidden
        hidden_tensor = torch.cat(hiddens, 1)
        return hidden_tensor, final_hidden, output


class RNN_Model_for_classfication(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_dim, output_size):
        super(RNN_Model_for_classfication, self).__init__()

        self.hidden_dim = hidden_dim
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.vocab_size = vocab_size
        # define the layer
        # self.rnn = nn.RNN(embed_size,hidden_dim,num_layers = 1,batch_first= True)
        self.rnn = RNN_layer(self.vocab_size, embed_size, hidden_dim, output_size)
        self.fc = nn.Linear(hidden_dim, output_size)
        self.softmax = nn.Softmax(dim=1)
        self.log_softmax = nn.LogSoftmax(dim=1)

    def forward(self, x, lens):

        hidden_tensor, final_hidden, output = self.rnn(x)

        out = self.fc(final_hidden)
        return out


class litSimpleRNN(Baselignthing):
    def __init__(self, vocab_size, embed_size, hidden_dim, output_size):
        super().__init__()
        self.model = RNN_Model_for_classfication(
            vocab_size, embed_size, hidden_dim, output_size
        )

        # find the batch_size
        self.save_hyperparameters()

In [28]:
model = litSimpleRNN(
    vocab_size=10000, embed_size=300, hidden_dim=256, output_size=2
)
for name, _ in model.named_parameters():
    print(name)

model.rnn.rnn.Wih
model.rnn.rnn.Whh
model.rnn.rnn.h2o.weight
model.rnn.rnn.h2o.bias
model.rnn.embedding.weight
model.fc.weight
model.fc.bias
