<a href="https://colab.research.google.com/github/suvaisnav/Video-Anomaly-Detection-Usig-Deep-Learning/blob/main/Video_Anomaly_Detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pickle5

To make use of google drive for taking input and storing output

In [None]:
from google.colab import drive
drive.mount('/content/drive')

RGB Feature extraction code using I3D

In [None]:
! git clone https://github.com/v-iashin/video_features.git
! pip install omegaconf==2.0.6

In [None]:
%cd video_features

In [None]:
from models.i3d.extract_i3d import ExtractI3D
from utils.utils import build_cfg_path, action_on_extraction
from omegaconf import OmegaConf
import torch

device = 'cuda' if torch.cuda.is_available() else 'cpu'
torch.cuda.get_device_name(0)

In [None]:
feature_type = 'i3d'
folder_name='/content/drive/MyDrive/normal'
args = OmegaConf.load(build_cfg_path(feature_type))
args.video_paths = [''.join([folder_name, '/', i]) for i in os.listdir(folder_name)]
args.extraction_fps = 32
args.flow_type = 'raft' 
# Load the model
extractor = ExtractI3D(args)
model, class_head = extractor.load_model(device)

In [None]:
import pickle5 as pickle

In [None]:
for video_path in args.video_paths:
    print(f'Extracting for {video_path}') 
    features = extractor.extract(device, model, class_head, video_path)
    ls.append({'feature':features['rgb'], 'label':0,'num':features['rgb'].shape[0]})
with open('/content/drive/MyDrive/data/train/data_1.pkl', 'wb') as handle:
  pickle.dump(ls, handle, protocol=pickle.HIGHEST_PROTOCOL)

Changing python torch version

In [None]:
!pip3 install torch==1.2.0+cu92 torchvision==0.4.0+cu92 -f https://download.pytorch.org/whl/torch_stable.html

Model 

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import pdb


class Attention(nn.Module):
    def __init__(self, opt):
        super(Attention, self).__init__()
        self.conv = nn.Conv2d(1024, 256, 1)
        self.gcn = GCN(opt, 256, 256)
        self.fc = nn.Linear(256, 1)

    def forward(self, x):
        x = x.view(-1, 1024, 1, 1)
        x = self.conv(x)
        # x = torch.tanh(x)
        x = F.relu(x,inplace=False)
        x = x.clone().view(-1, 256)
        A = self.gcn(x)
        x = self.fc(x)
        mask = torch.sigmoid(x) + 1e-5
        inverse_mask = torch.reciprocal(mask)
        return mask, inverse_mask


class Classification(nn.Module):
    def __init__(self):
        super(Classification, self).__init__()
        self.fc = nn.Linear(1024, 1)

    def forward(self, x):
        return torch.sigmoid(self.fc(x))


class Network(nn.Module):

    def __init__(self, opt):
        super(Network, self).__init__()
        self.attention = Attention(opt)
        self.classification = Classification()

    def forward(self, x):
        
        mask, inverse_mask = self.attention(x)
        video_feature = torch.sum(x * mask, dim=0, keepdim=True) / torch.sum(mask)
        video_score = self.classification(video_feature)
        inverse_video_feature = torch.sum(x * inverse_mask, dim=0, keepdim=True) / torch.sum(inverse_mask)
        inverse_video_score = self.classification(inverse_video_feature)
        segments_scores = self.classification(x)
        return video_score, inverse_video_score, mask, segments_scores


class GCN(nn.Module):
    def __init__(self, opt, in_channels, out_channels):
        super(GCN, self).__init__()
        self.opt = opt
        if self.opt.C:
            self.theta = nn.Linear(in_channels, in_channels)
            self.phi = nn.Linear(in_channels, in_channels)
        self.conv_d = nn.Linear(in_channels, out_channels)
        if opt.residual:
            self.down = nn.Sequential(nn.Conv1d(in_channels, out_channels, kernel_size=1))

    def forward(self, x):
        t, c = x.size()
        A, M = self.generate_A(t, self.opt.width)
        M = M.detach()
        if self.opt.A:
            A = A.detach()
        else:
            A = 0.
        if self.opt.C:
            theta = self.theta(x)
            phi = self.phi(x)
            C = torch.mm(theta, phi.permute(1, 0))
            if self.opt.CM:
                tmp = torch.exp(C - torch.max(C*M, dim=-1, keepdim=True)[0]) * M
                A += tmp / tmp.sum(dim=-1, keepdim=True)
            else:
                A += F.softmax(C, dim=-1)
        if self.opt.residual:
            out = self.conv_d(torch.bmm(A, x.permute(0, 2, 1)).permute(0, 2, 1)) + self.down(x)
        else:
            out = self.conv_d(torch.mm(A, x))
        return out

    @staticmethod
    def generate_A(dim, width=3):
        A = torch.zeros(dim, dim, device='cuda', requires_grad=False)
        min_value = -(width - 1) // 2
        extent = [min_value+i for i in range(width)]
        for i in range(dim):
            for j in extent:
                if i+j >=0 and i+j <=dim-1:
                  A[i, i+j] = 1.
        M = A
        A = A/A.sum(dim=1, keepdim=True)
        return A, M






**INPUT_DATA**


In [None]:
import os
import random
import numpy as np
import pickle5 as pickle

num_classes = 1


class InputData(object):
    def __init__(self, folder_name, shuffle=True):
        """
        Note: Existing non data files in the folder will raise an exception
        :param folder_name: The name of folder only including data files
        :param shuffle: Whether shuffle data in each files or not
        """
        #print(folder_name)
        self.files_list = [''.join([folder_name, '/', i]) for i in os.listdir(folder_name)]
        
        self.num_file = len(self.files_list)
        self.shuffle = shuffle
        if shuffle:
            self.order_files = random.sample(list(range(self.num_file)), self.num_file)
            self.files_list = [self.files_list[i] for i in self.order_files]
            
        else:
            self.order_files = list(range(self.num_file))
        self.current_file_index = 0
        self.current_video_index = 0
        
        
        with open(self.files_list[0], 'rb') as f:
            self.data = pickle.load(f)
            # print(self.files_list[self.current_file_index])  ##
        self.num_feature = len(self.data)
        if shuffle:
            self.order_feature = random.sample(list(range(self.num_feature)), self.num_feature)
            self.data = [self.data[i] for i in self.order_feature]
        else:
            self.order_feature = list(range(self.num_feature))

    def __check_index(self, size):
        if self.current_video_index + size <= self.num_feature:
            data = self.data[self.current_video_index: self.current_video_index+size]
            self.current_video_index += size
            return data
        else:
            num_excess = self.current_video_index + size - self.num_feature
            data1 = self.data[self.current_video_index: self.num_feature]
            self.current_file_index += 1
            if self.current_file_index == self.num_file:
                if self.shuffle:
                    self.order_files = random.sample(list(range(self.num_file)), self.num_file)
                    self.files_list = [self.files_list[i] for i in self.order_files]
                else:
                    self.order_files = list(range(self.num_file))
                self.current_file_index = 0
            with open(self.files_list[self.current_file_index], 'rb') as f:
                self.data = pickle.load(f)
            self.num_feature = len(self.data)
            if self.shuffle:
                self.order_feature = random.sample(list(range(self.num_feature)), self.num_feature)
                self.data = [self.data[i] for i in self.order_feature]
            else:
                self.order_feature = list(range(self.num_feature))
            data2 = self.data[0: num_excess]
            self.current_video_index = num_excess
            return data1 + data2
    
    def next_batch(self, size):
        data = self.__check_index(size)
        feature = []
        labels = []
        dims = []
        for i in range(size):
            # if data[i]['feature'].shape[0] > 400 and data[i]['feature'].ndim != 1:
            #     feat = data[i]['feature'][0: 400, :]
            #     feature.append(feat)
            # else:
            feature.append(data[i]['feature'])
            
            if data[i]['label'] == 0:
                labels.append([0.])
            else:
                labels.append([1.])
            dims.append(data[i]['num'])
        return feature, np.array(labels, dtype=np.float32), np.array(dims, dtype=np.float32)

**TRAIN**                                                 
train_feature_code_path is path of pickle file of train_data                                                      
test_feature_code_path is path of pickle file of test_data


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
import csv
import argparse

import pdb

train_feature_code_path = '/content/drive/MyDrive/t1'
test_feature_code_path = '/content/drive/MyDrive/ttest'


def parse_args():
    parser = argparse.ArgumentParser()
    # input for training
    parser.add_argument('--batch_size', default=10, type=int)
    parser.add_argument('--iterations', default=10, type=int)
    parser.add_argument('--epochs', default=100, type=int)
    parser.add_argument('--lr', default=0.5e-3, type=float)
    parser.add_argument('--restore', default=False, type=bool)
    parser.add_argument('--sal_coe', default=0.5, type=float)
    parser.add_argument('--weight_decay', default=0.2e-5, type=float)
    parser.add_argument('--sal_ratio', default=0.3, type=float)
    parser.add_argument('--save_path', default='/content/drive/MyDrive/train_out_3/checkpoints/', type=str)
    parser.add_argument('--gpu_list', default=[0], type=list)
    parser.add_argument('--TEST', default=True, type=bool)
    parser.add_argument('--A', action='store_false')
    parser.add_argument('--C', action='store_true')
    parser.add_argument('--CM', action='store_false')
    parser.add_argument('--residual', action='store_true')
    parser.add_argument('--num_gcn', default=1, type=int)
    parser.add_argument('--width', default=3, type=int)
    args,unknown = parser.parse_known_args()
    return args


def tower_loss(net, features, labels, dims, args):
    loss = []
    inverse_loss = []
    sum_sal_loss = []
    labels = torch.from_numpy(labels).cuda()
    for i in range(len(features)):
        feature = torch.from_numpy(features[i]).cuda()
        feature=feature.type(torch.cuda.FloatTensor) 
        video_score, inverse_video_score, mask, seg_scores = net(feature)
        entropy_loss = F.binary_cross_entropy_with_logits(video_score, labels[i: i+1, :])
        margin = torch.max(torch.tensor(0., device='cuda', requires_grad=False), (torch.sigmoid(seg_scores) - mask) ** 2 - args.sal_ratio ** 2)
        count_nonzero = (margin != 0.).sum().detach().to(torch.float32)
        sal_loss = torch.sum(margin) / (count_nonzero + 1e-6)
        inverse_entropy_loss = labels[i, 0] * F.binary_cross_entropy_with_logits(inverse_video_score, torch.tensor([[0.]], requires_grad=False, device='cuda'))
        loss.append(entropy_loss)
        inverse_loss.append(inverse_entropy_loss + args.sal_coe * sal_loss)
        m=sum(inverse_loss) / args.batch_size
        sum_sal_loss.append(args.sal_coe * sal_loss)
    return sum(loss) / args.batch_size,m , sum(sum_sal_loss) / args.batch_size


def train():
    args = parse_args()
    print('Hyper-parameters:')
    d_args = vars(args)
    for i in d_args:
        print('{}: {}'.format(i, d_args[i]))
    gpu_list = args.gpu_list
    num_gpus = len(gpu_list)
    os.environ["CUDA_VISIBLE_DEVICES"] = ','.join([str(i) for i in gpu_list])
    net = Network(args)
    net.to('cuda')
    net.train()
    optimizer = torch.optim.Adam(net.parameters(), lr=args.lr, weight_decay=args.weight_decay)
    optimizer_ass = torch.optim.Adam(net.attention.parameters(), lr=args.lr)
    train_data = InputData(train_feature_code_path, shuffle=True)
    if not os.path.exists(args.save_path):
        os.makedirs(args.save_path)
    for i in range(args.epochs):
        print('[*] Current epochs: %d ---' % i)
        sum_loss = 0.
        sum_inverse_loss = 0.0
        sum_sum_sal_loss = 0.0
        for j in range(args.iterations):
            list_features, numpy_labels, numpy_dims = train_data.next_batch(size=args.batch_size)
            loss, inverse_loss, sum_sal_loss = tower_loss(net, list_features, numpy_labels, numpy_dims, args)
            optimizer.zero_grad()
            optimizer_ass.zero_grad()
            loss.backward(retain_graph=True)
            optimizer.step()
            torch.autograd.set_detect_anomaly(True)
            inverse_loss.backward()
            optimizer_ass.step()
            sum_loss += loss.item()
            sum_inverse_loss += inverse_loss.item()
            sum_sum_sal_loss += sum_sal_loss.item()
        print('Loss: {:.3f}, Inverse Loss: {:.3f}, sal_loss: {:.3f}'.format(sum_loss / args.iterations, sum_inverse_loss / args.iterations, sum_sum_sal_loss / args.iterations))
        if i > 30:
            print(i)
            torch.save(net.state_dict(), args.save_path + '{}.param'.format(i))
    if args.TEST:
        test(args)


if __name__ == '__main__':
    train()


Needed for test

In [None]:
!pip install xlsxwriter

Manually creating groundtruth values 

In [None]:
import numpy as np
a=[0 for i in range(30)]
a[2]=1
a[3]=1
a1=[0 for i in range(24)]
a1[21]=a1[22]=a1[23]=1
ra1=[]
ra2=[]
for i in range(len(a)):
  for j in range(60):
      ra1.append(a[i])

for i in range(len(a1)):
  for j in range(60):
    ra2.append(a1[i])
# b=np.array(ra1)
b1=np.array(ra2)
b2=[]
# b2.append(b)
b2.append(b1)
print(b2)


Test 

In [None]:
from sklearn import metrics

import numpy as np
import pickle5 as pickle
import torch
import argparse
import os

import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
import xlsxwriter
import pdb
from scipy.interpolate import interp1d
from torch.autograd import Variable

def parse_args():
    parser = argparse.ArgumentParser()
    # input for training
    parser.add_argument('--batch_size', default=10, type=int)
    parser.add_argument('--iterations', default=9, type=int)
    parser.add_argument('--epochs', default=80, type=int)
    parser.add_argument('--lr', default=0.5e-3, type=float)
    parser.add_argument('--restore', default=False, type=bool)
    parser.add_argument('--sal_coe', default=0.5, type=float)
    parser.add_argument('--weight_decay', default=0.2e-5, type=float)
    parser.add_argument('--sal_ratio', default=0.3, type=float)
    parser.add_argument('--save_path', default='/content/drive/MyDrive/train_out_3/checkpoints/', type=str)
    parser.add_argument('--gpu_list', default=[0], type=list)
    parser.add_argument('--TEST', default=True, type=bool)

    parser.add_argument('--A', action='store_false')
    parser.add_argument('--B', action='store_false')
    parser.add_argument('--C', action='store_true')
    parser.add_argument('--BM', action='store_false')
    parser.add_argument('--CM', action='store_false')
    parser.add_argument('--residual', action='store_true')
    parser.add_argument('--num_gcn', default=1, type=int)
    parser.add_argument('--width', default=3, type=int)
    args,unknown = parser.parse_known_args()
    return args

def test(args):
    def draw_roc(tpr, fpr, auc):
        plt.figure()
        lw = 2
        plt.plot(fpr, tpr, color='darkorange', lw=lw, label='ROC curve (area = %0.4f)' % auc)
        plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
        plt.xlim([0.0, 1.0])
        plt.ylim([0.0, 1.0])
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('Receiver operating characteristic example')
        plt.legend(loc="lower right")
        plt.savefig("./test.png")
        plt.cla()
        plt.clf()
        plt.close()

    gts = np.load('/content/drive/MyDrive/mini/Contrastive-Attention-for-Video-Anomaly-Detection-main/gts.npy',allow_pickle=True)

    test_features = test_data = InputData('/content/drive/MyDrive/ttest', shuffle=False)
    net = Network(args).to('cuda')
    net.eval()
    best_auc = 0
    best_epoch = 0
    with torch.no_grad():
        for i in range(31,args.epochs):
            workbook = xlsxwriter.Workbook('./record.xlsx')
            mask_sheet = workbook.add_worksheet('mask')
            score_sheet = workbook.add_worksheet('score')
            cell_format = workbook.add_format({'font_color': 'red'})
            cell_format2 = workbook.add_format({'font_color': 'blue'})
            print(net.load_state_dict(torch.load('/content/drive/MyDrive/train_out_3/checkpoints/' + '{}.param'.format(i))))
            
            pred = []
            y = []
            for j in range(2): 
                features = torch.from_numpy(test_features.next_batch(1)[0][0]).float().cuda()
                video_scores, inverse_video_scores, masks, segments_scores = net(Variable(features))
                row = np.squeeze(masks.cpu().numpy(), axis=1)
                mask_sheet.write_row(j, 1, row.tolist())
                mask_sheet.write(j, 0, np.mean(row), cell_format2)
                mask_sheet.conditional_format(j, np.argmax(row)+1, j, np.argmax(row)+1, {'type': 'no_errors', 'format': cell_format})
                row = np.squeeze(segments_scores.cpu().numpy(), axis=1)
                score_sheet.write_row(j, 0, row.tolist())
                score_sheet.conditional_format(j, np.argmax(row), j, np.argmax(row), {'type': 'no_errors', 'format': cell_format})
                scores = np.squeeze(segments_scores.cpu().numpy())
                video_score = video_scores.cpu().numpy()
                if video_score[0, 0] < -2:
                    scores += video_score[0, 0]
                x = np.arange(0, scores.shape[0])
                f = interp1d(x, scores, kind='linear', axis=0, fill_value='extrapolate')
                scale_x = np.arange(0, scores.shape[0], 1 / 60)
                pred += list(f(scale_x))
                y += b2[j].tolist()
            fpr, tpr, thresholds = metrics.roc_curve(y, pred)
            thresholds=list(map(lambda x:x-0.002,thresholds))
            auc = metrics.auc(fpr, tpr)
            draw_roc(tpr, fpr, auc)
            print('Epoch: {}, AUC: {}'.format(i, auc))
            if auc > best_auc:
                best_auc = auc
                best_epoch = i
            workbook.close()
    print('Best_Epoch: {}, Best_AUC: {}'.format(best_epoch, best_auc))
    return best_auc

# Uncomment to perform only testing
args = parse_args()
print('Hyper-parameters:')
d_args = vars(args)
for i in d_args:
  print('{}: {}'.format(i, d_args[i]))
test(args)