In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install bar-chart-race

Collecting bar-chart-race
  Downloading bar_chart_race-0.1.0-py3-none-any.whl.metadata (4.2 kB)
Downloading bar_chart_race-0.1.0-py3-none-any.whl (156 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m156.8/156.8 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: bar-chart-race
Successfully installed bar-chart-race-0.1.0


In [None]:
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

import matplotlib.pyplot as plt
import matplotlib.animation as animation
import bar_chart_race as bcr
import cv2
import numpy as np

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class FusionModel(nn.Module):
    def __init__(self):
        super(FusionModel, self).__init__()
        self.relu=nn.ReLU()
        self.sigmoid = nn.Sigmoid()

        self.maxpool_channel = nn.MaxPool3d(kernel_size = (1,2,2))
        self.maxpool_merge = nn.MaxPool3d(kernel_size = (2,2,2))

        self.layer_RGB = nn.Sequential(
            #layer1
            nn.Conv3d(3, 16, kernel_size=(1,3,3), stride=(1,1,1), padding=(0,1,1)), #spatial convolution
            self.relu,
            nn.Conv3d(16, 16, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)), #temporal convolution
            self.relu,
            self.maxpool_channel,

            #layer2
            nn.Conv3d(16, 16, kernel_size=(1,3,3), stride=(1,1,1), padding=(0,1,1)),
            self.relu,
            nn.Conv3d(16, 16, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.relu,
            self.maxpool_channel,

            #layer3
            nn.Conv3d(16, 32, kernel_size=(1,3,3), stride=(1,1,1), padding=(0,1,1)),
            self.relu,
            nn.Conv3d(32, 32, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.relu,
            self.maxpool_channel,

            #layer4
            nn.Conv3d(32, 32, kernel_size=(1,3,3), stride=(1,1,1), padding=(0,1,1)),
            self.relu,
            nn.Conv3d(32, 32, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.relu,
            self.maxpool_channel,
        )

        self.layer_OPT = nn.Sequential(
            #layer1
            nn.Conv3d(2, 16, kernel_size=(1,3,3), stride=(1,1,1), padding=(0,1,1)),
            self.relu,
            nn.Conv3d(16, 16, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.relu,
            self.maxpool_channel,

            #layer2
            nn.Conv3d(16, 16, kernel_size=(1,3,3), stride=(1,1,1), padding=(0,1,1)),
            self.relu,
            nn.Conv3d(16, 16, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.relu,
            self.maxpool_channel,

            #layer3
            nn.Conv3d(16, 32, kernel_size=(1,3,3), stride=(1,1,1), padding=(0,1,1)),
            self.relu,
            nn.Conv3d(32, 32, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.relu,
            self.maxpool_channel,

            #layer4
            nn.Conv3d(32, 32, kernel_size=(1,3,3), stride=(1,1,1), padding=(0,1,1)),
            self.sigmoid,
            nn.Conv3d(32, 32, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.sigmoid,
            self.maxpool_channel,
        )

        self.layer_merge = nn.Sequential(
            nn.Conv3d(32, 64, kernel_size=(1,3,3), stride=(1,1,1), padding = (0,1,1)),
            self.relu,
            nn.Conv3d(64, 64, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.relu,
            self.maxpool_merge,

            nn.Conv3d(64, 64, kernel_size=(1,3,3), stride=(1,1,1), padding = (0,1,1)),
            self.relu,
            nn.Conv3d(64, 64, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.relu,
            self.maxpool_merge,

            nn.Conv3d(64, 128, kernel_size=(1,3,3), stride=(1,1,1), padding = (0,1,1)),
            self.relu,
            nn.Conv3d(128, 128, kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
            self.relu,
            nn.MaxPool3d(kernel_size=(2,3,3)),
        )

        # Fully Connected Layers
        self.fc1 = nn.Linear(128, 128)
        self.dropout = nn.Dropout(0.2)
        self.fc2 = nn.Linear(128, 32)
        self.fc3 = nn.Linear(32, 2)

        # Initialize weights
        self.__init_weight()

    def forward(self, x):
        rgb = x[...,:3] # Seperate RGB data
        opt = x[...,3:5] # Seperate Optical flow data
        # Reshpae tensor into (#Batch, channel, dimension(#frame), height, width)
        rgb = rgb.contiguous().view(-1, 3, 64, rgb.shape[2], rgb.shape[3]) # 64 is fixed number of frames
        opt = opt.contiguous().view(-1, 2, 64, opt.shape[2], opt.shape[3])

        # Pass through the RGB data through the blocks of RGB layers
        rgb = self.layer_RGB(rgb)

        # Pass through the optical flow data through the blocks of RGB layers
        opt = self.layer_OPT(opt) # 4 32 64 14 14

        # Fuse by performing elementwise multiplication of rgb and opt tensors.
        fused = torch.mul(rgb, opt)
        # Perform maxpooling of fused
        fused = nn.MaxPool3d(kernel_size=(8,1,1))(fused)

        # Pass through the fused data into merging block
        merged = self.layer_merge(fused)

        # Fully Connected Layers # 1 128 1 1 1
        x = merged.view(merged.size(0), -1)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)

        return x

    def __init_weight(self):
        for m in self.modules():
            if isinstance(m, nn.Conv3d):
                nn.init.kaiming_normal_(m.weight.data)
                m.bias.data.fill_(0)

In [None]:
# import torch
# import torch.nn as nn
# import torch.nn.functional as F

# class FusionModel(nn.Module):
#     def __init__(self):
#         super(FusionModel, self).__init__()
#         self.relu=nn.ReLU(inplace=True)
#         self.maxpool_channel = nn.MaxPool3d(kernel_size=(1,2,2), stride=(1,2,2))
#         self.maxpool_merge = nn.MaxPool3d(kernel_size=(2,2,2), stride=(2,2,2))
#         ## Hint: Please refer to above table for constructing layers
#         # Construct block of RGB layers which takes RGB channel(3) as input
#         self.layer_RGB = nn.Sequential(
#             #layer1
#             nn.Conv3d(in_channels=3, out_channels=16,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=16, out_channels=16,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_channel,
#             #layer2
#             nn.Conv3d(in_channels=16, out_channels=16,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=16, out_channels=16,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_channel,
#             #layer3
#             nn.Conv3d(in_channels=16, out_channels=32,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=32, out_channels=32,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_channel,
#             #layer4
#             nn.Conv3d(in_channels=32, out_channels=32,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=32, out_channels=32,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_channel,
#         )

#         # Construct block of optical flow layers which takes the optical flow channel(2) as input
#         self.layer_OPT = nn.Sequential(
#             #layer1
#             nn.Conv3d(in_channels=2, out_channels=16,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=16, out_channels=16,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_channel,
#             #layer2
#             nn.Conv3d(in_channels=16, out_channels=16,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=16, out_channels=16,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_channel,
#             #layer3
#             nn.Conv3d(in_channels=16, out_channels=32,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=32, out_channels=32,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_channel,
#             #layer4
#             nn.Conv3d(in_channels=32, out_channels=32,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=32, out_channels=32,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_channel,
#         )

#         # Construct pooling
#         self.layer_POOL = nn.MaxPool3d(kernel_size = (8,1,1), stride=(8,1,1))

#         # Construct merging Block
#         self.layer_merge = nn.Sequential(
#             nn.Conv3d(in_channels=32, out_channels=64,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=64, out_channels=64,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_merge,
#             nn.Conv3d(in_channels=64, out_channels=64,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=64, out_channels=64,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_merge,
#             nn.Conv3d(in_channels=64, out_channels=128,
#                        kernel_size=(1,3,3), stride=(1,1,1),padding=(0,1,1)),
#             self.relu,
#             nn.Conv3d(in_channels=128, out_channels=128,
#                        kernel_size=(3,1,1), stride=(1,1,1), padding=(1,0,0)),
#             self.relu,
#             self.maxpool_merge,
#         )

#         # Fully Connected Layers
#         self.fc1 = nn.Linear(128, 128)
#         self.dropout = nn.Dropout(0.2)
#         self.fc2 = nn.Linear(128, 32)
#         self.fc3 = nn.Linear(32, 2)

#         # Initialize weights
#         self.__init_weight()

#     def forward(self, x):
#         x = x.transpose(2,4)
#         x = x.transpose(3,4)
#         x = x.transpose(1,2) #[1, 5, 64, 224, 224]
#         rgb = x[:,:3,:,:,:] #[1, 3, 64, 224, 224]
#         opt = x[:,3:5,:,:,:]

#         # # Pass through the RGB data through the blocks of RGB layers
#         rgb = self.layer_RGB(rgb)

#         # # Pass through the optical flow data through the blocks of RGB layers
#         opt = self.layer_OPT(opt)

#         # # Fuse by performing elementwise multiplication of rgb and opt tensors.
#         fused = torch.multiply(rgb, opt)
#         # # Perform maxpooling of fused
#         fused = self.layer_POOL(fused)

#         # # Pass through the fused data into merging block
#         merged = self.layer_merge(fused) #([1, 32, 8, 14, 14])

#         x = merged.contiguous().view(merged.shape[0],-1)
#         x = self.fc1(x)
#         x = self.relu(x)
#         x = self.dropout(x)
#         x = self.fc2(x)
#         x = self.relu(x)
#         x = self.fc3(x)
#         return x

#     def __init_weight(self):
#         for m in self.modules():
#             if isinstance(m, (nn.Conv3d, nn.Linear)):  # Conv3d와 Linear만 초기화
#                 torch.nn.init.kaiming_uniform_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
#                 if m.bias is not None:
#                     m.bias.data.fill_(0.01)

In [None]:
device = torch.device('cuda')

model = FusionModel().to(device)
# use your pretrained model path
model.load_state_dict(torch.load('/content/drive/MyDrive/base_model')["model_state_dict"])

<All keys matched successfully>

In [None]:
def uniform_sampling(video, target_frames=64):
    # get total frames of input video and calculate sampling interval
    len_frames = int(len(video))
    interval = int(np.ceil(len_frames/target_frames))
    # init empty list for sampled video and
    sampled_video = []
    for i in range(0,len_frames,interval):
        sampled_video.append(video[i])
    # calculate numer of padded frames and fix it
    num_pad = target_frames - len(sampled_video)
    padding = []
    if num_pad>0:
        for i in range(-num_pad,0):
            try:
                padding.append(video[i])
            except:
                padding.append(video[0])
        sampled_video += padding
    # get sampled video
    return np.array(sampled_video, dtype=np.float32)


def normalize(data):
    mean = data.mean()
    std = data.std()
    return (data - mean) / std

In [None]:
# test video path
file_path ="/content/drive/MyDrive/Assault018_x264.mp4" # use your path

cap = cv2.VideoCapture(file_path)
# Get number of frames
len_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

frames = []
flows = []
preds = []
resize = (224, 224)

prev_frame = None
for iter in range(0, len_frames-1):
    _, frame = cap.read()
    frame = cv2.resize(frame, resize, interpolation=cv2.INTER_AREA)
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame = np.reshape(frame, (224, 224, 3))
    frames.append(frame)

    img = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
    gray_img = np.reshape(img, (224, 224, 1))
    if prev_frame is None:
        prev_frame = gray_img

    flow = cv2.calcOpticalFlowFarneback(prev_frame, gray_img, None, 0.5, 3, 15, 3, 5, 1.2,
                                        cv2.OPTFLOW_FARNEBACK_GAUSSIAN)
    prev_frame = gray_img
    # subtract the mean in order to eliminate the movement of camera
    flow[..., 0] -= np.mean(flow[..., 0])
    flow[..., 1] -= np.mean(flow[..., 1])
    # normalize each component in optical flow
    flow[..., 0] = cv2.normalize(flow[..., 0], None, 0, 255, cv2.NORM_MINMAX)
    flow[..., 1] = cv2.normalize(flow[..., 1], None, 0, 255, cv2.NORM_MINMAX)

    flows.append(flow)
    result = np.zeros((len(flows), 224, 224, 5))
    result[..., :3] = frames
    result[..., 3:] = flows

    data = np.float32(result)
    # # sampling 64 frames uniformly from the entire video
    data = uniform_sampling(video=data, target_frames=64)
    # normalize rgb images and optical flows, respectively
    data[..., :3] = normalize(data[..., :3])
    data[..., 3:] = normalize(data[..., 3:])

    fr, w, h, ch = data.shape
    data = data.reshape((-1, fr, w, h, ch))
    # pred = model.predict(data)[0]
    pred = model(torch.Tensor(data).to(device))
    # fights.append(pred[0])
    # none.append(pred[1])
    pred = nn.functional.softmax(pred, dim=-1)
    preds.append(pred[0].detach().cpu().numpy())

cap.release()

In [None]:
import pandas as pd

print(np.array(preds).shape)

group_list = ["Violence", "Non-Violence"]
df = pd.DataFrame(preds, columns = ['Violence', 'Non-Violence'])

df

(187, 2)


Unnamed: 0,Violence,Non-Violence
0,0.012782,0.987218
1,0.025008,0.974992
2,0.027100,0.972900
3,0.011326,0.988674
4,0.015445,0.984555
...,...,...
182,0.999724,0.000276
183,0.999734,0.000266
184,0.999649,0.000351
185,0.998663,0.001337


In [None]:
bcr.bar_chart_race(df=df[:],
                   n_bars = 2,
                   figsize=(4, 4),
                   label_bars=False,
                   sort='desc',
                   #title='Violence detection',
                   fixed_order=['Violence', 'Non-Violence'],
                   orientation='h',
                   fixed_max=True,
                   period_length=int(1000/fps),
                  )

In [None]:
import cv2
import numpy as np

# 동영상 경로
file_path = "/content/drive/MyDrive/Assault018_x264.mp4.mp4"

# OpenCV로 동영상 읽기
cap = cv2.VideoCapture(file_path)
len_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# 결과 저장 경로 설정
output_path = "/content/drive/MyDrive/violence_overlapped_diff.avi"
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(output_path, fourcc, fps, (224, 224))

# 프레임별 처리
resize = (224, 224)
for idx, pred in enumerate(preds):
    ret, frame = cap.read()
    if not ret:
        break

    # 프레임 크기 변경
    frame = cv2.resize(frame, resize)

    # 확률 값 가져오기
    violence_prob = pred[0]
    non_violence_prob = pred[1]

    # 텍스트 추가
    cv2.putText(frame, f"Violence: {violence_prob:.2f}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
    cv2.putText(frame, f"Non-Violence: {non_violence_prob:.2f}", (10, 60),
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

    # 프로그레스 바 추가
    bar_length = 200
    violence_bar = int(bar_length * violence_prob)
    non_violence_bar = int(bar_length * non_violence_prob)

    cv2.rectangle(frame, (10, 100), (10 + violence_bar, 120), (0, 0, 255), -1)
    cv2.rectangle(frame, (10, 130), (10 + non_violence_bar, 150), (0, 255, 0), -1)

    # 프레임 저장
    out.write(frame)

# 리소스 해제
cap.release()
out.release()

print("Processed video saved at:", output_path)


Processed video saved at: /content/drive/MyDrive/violence_overlapped_diff.avi
