In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from tqdm import tqdm
torch.manual_seed(1)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
cpu = torch.device('cpu')
print('Running on device: {}'.format(device))

torch.cuda.empty_cache() 

In [None]:
X = []
Y = []
for p in tqdm(np.arange(0, 11), total = 11):
    X_p = torch.load('data_images/1face_X_part' + str(p) + '.pt', map_location = device)
    Y_p = torch.load('data_images/1face_Y_part' + str(p) + '.pt', map_location = device)
    X = X + [torch.stack(X_p)]
    Y = Y + Y_p
    del X_p

In [None]:
X = torch.stack(X)

In [None]:
X.shape

In [None]:
len(Y)

In [None]:
from torch.utils.data import TensorDataset, DataLoader
dataset = TensorDataset(X[0:5], torch.from_numpy(np.array(Y[0:5])))
train_data, val_data = torch.utils.data.random_split(dataset, [4, 1])
train_batch_size = 1
val_batch_size = 1
train_loader = DataLoader(train_data, shuffle=True, batch_size=train_batch_size)
val_loader = DataLoader(val_data, shuffle=True, batch_size=val_batch_size)

In [None]:
from torch import nn
class Identity(nn.Module):
    def __init__(self):
        super(Identity, self).__init__()
    def forward(self, x):
        return x
import torch
import timm
model_ft = timm.create_model("mixnet_m", pretrained=True)
model_ft.classifier = Identity()
batch, seqlen, c, h, w =  X.shape
X = X.reshape(batch*seqlen, c, h, w) # .shape
out = model_ft(X)

In [None]:
out.shape

In [None]:
out = out.reshape(batch, seqlen, out.shape[1])


In [None]:
out.shape

In [None]:
import torch.nn as nn

class DFDCNet(nn.Module):
    def __init__(self, input_size, output_size, hidden_dim, n_layers, drop_prob=0.5):
        super(DFDCNet, self).__init__()
        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        
        self.mixnet = timm.create_model("mixnet_m", pretrained=True)
        for param in self.mixnet.parameters():
                param.requires_grad = True
            
        self.lstm = nn.LSTM(1000, hidden_dim, n_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(0.75)
        self.batchnorm = nn.BatchNorm1d(hidden_dim)
        self.elu = nn.ELU()
        self.fc1 = nn.Linear(hidden_dim, 32)
#         self.fc2 = nn.Linear(64, 32)
#         self.fc3 = nn.Linear(32, 16)
        self.fc4 = nn.Linear(32, output_size)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x, hidden):
        batch_size, seqlen, c, h, w = x.size()
        x = x.reshape(batch_size*seqlen, c, h, w).float()
        x = self.mixnet(x)
        x = x.reshape(batch_size, seqlen, x.shape[1])
        lstm_out, hidden = self.lstm(x, hidden)
        lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)
        
        out = self.dropout(lstm_out)
#         out = self.batchnorm(out)
        out = self.fc1(out)
        out = self.elu(out)
#         out = self.fc2(out)
#         out = self.elu(out)
#         out = self.fc3(out)
#         out = self.elu(out)
        out = self.fc4(out)
        out = self.sigmoid(out)
        
        out = out.view(batch_size, -1)
        out = out[:,-1]
        return out, hidden
    
    def init_hidden(self, batch_size):
        weight = next(self.parameters()).data
        hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(device),
                      weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().to(device))
        return hidden

In [None]:
input_size = 512
output_size = 1
hidden_dim = 64
n_layers = 5

model = DFDCNet(input_size, output_size, hidden_dim, n_layers)
model.to(device)
train_criterion = nn.BCELoss()
val_criterion = nn.BCELoss()

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
print(model)

In [None]:
# scheduler
from torch.optim.lr_scheduler import ReduceLROnPlateau
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=500, verbose=True)


In [None]:
epochs = 5
counter = 0
print_every = 1
clip = .5
valid_loss_min = np.Inf
val_loss = torch.tensor(np.Inf)
model.train()
for i in range(epochs):
    h = model.init_hidden(train_batch_size)
    
    for inputs, labels in train_loader:
        counter += 1
        h = tuple([e.data for e in h])
        inputs, labels = inputs.to(device), labels.to(device)
        model.zero_grad()
        output, h = model(inputs, h)
        loss = train_criterion(output.squeeze(), labels.float())
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        
        if counter%print_every == 0:
            val_h = model.init_hidden(val_batch_size)
            val_losses = []
            model.eval()
            for inp, lab in val_loader:
                val_h = tuple([each.data for each in val_h])
                inp, lab = inp.to(device), lab.to(device)
                out, val_h = model(inp, val_h)
                val_loss = val_criterion(out.squeeze(), lab.float())
                val_losses.append(val_loss.item())
                
            model.train()
            print("Epoch: {}/{}...".format(i+1, epochs),
                  "Step: {}...".format(counter),
                  "Loss: {:.6f}...".format(loss.item()),
                  "Val Loss: {:.6f}".format(np.mean(val_losses)))
            if np.mean(val_losses) <= valid_loss_min:
                torch.save(model.state_dict(), './model_1face_unfroze.pt')
                print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(valid_loss_min,np.mean(val_losses)))
                valid_loss_min = np.mean(val_losses)
    scheduler.step(val_loss.item())


# Reference
* https://github.com/ronghanghu/pytorch-gve-lrcn/blob/master/models/pretrained_models.py