<a href="https://colab.research.google.com/github/yannsusu/SwiftMart/blob/main/Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/facebookresearch/TimeSformer.git
%cd TimeSformer


Cloning into 'TimeSformer'...
remote: Enumerating objects: 392, done.[K
remote: Counting objects: 100% (148/148), done.[K
remote: Compressing objects: 100% (74/74), done.[K
remote: Total 392 (delta 100), reused 74 (delta 74), pack-reused 244 (from 1)[K
Receiving objects: 100% (392/392), 336.91 KiB | 11.23 MiB/s, done.
Resolving deltas: 100% (176/176), done.
/content/TimeSformer


In [4]:
#!pip install git+https://github.com/facebookresearch/TimeSformer.git
!pip install opencv-python
!pip install simplejson
!pip install fvcore
!pip install torch==2.0.1 torchvision torchaudio
!pip install transformers==4.30.1



In [19]:
import numpy as np
import cv2
from PIL import Image
import torch
from timesformer.models.vit import TimeSformer

In [8]:
def compute_optical_flow(prev_frame, curr_frame):
  prev_frame_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
  curr_frame_gray = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)

  flow = cv2.calcOpticalFlowFarneback(prev_frame_gray, curr_frame_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

  return flow

In [9]:
def flow_to_image(flow):
  h, w = flow.shape[:2]
  hsv = np.zeros((h, w, 3), dtype = np.uint8)

  mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

  hsv[..., 0] = ang * 180 / np.pi / 2

  hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)

  hsv[..., 1] = 255

  return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)

In [10]:
def stack_frames(frames, num_frames=8):
    if len(frames) < num_frames:
        frames = [frames[0]] * (num_frames - len(frames)) + frames

    frames = [cv2.resize(frame, (224, 224)) for frame in frames]

    frames_np = np.array(frames, dtype=np.float32)

    frames_tensor = torch.from_numpy(frames_np)
    frames_tensor = frames_tensor.permute(3, 0, 1, 2)
    frames_tensor = frames_tensor.float()
    frames_tensor = frames_tensor.unsqueeze(0)

    return frames_tensor

In [11]:
model = TimeSformer(
    img_size = 224,
    num_classes = 3,
    num_frames = 8,
    attention_type = "divided_space_time"
)

model.eval()

Downloading: "https://github.com/rwightman/pytorch-image-models/releases/download/v0.1-vitjx/jx_vit_base_p16_224-80ecf9dd.pth" to /root/.cache/torch/hub/checkpoints/jx_vit_base_p16_224-80ecf9dd.pth


TimeSformer(
  (model): VisionTransformer(
    (dropout): Dropout(p=0.0, inplace=False)
    (patch_embed): PatchEmbed(
      (proj): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
    )
    (pos_drop): Dropout(p=0.0, inplace=False)
    (time_drop): Dropout(p=0.0, inplace=False)
    (blocks): ModuleList(
      (0): Block(
        (norm1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (attn): Attention(
          (qkv): Linear(in_features=768, out_features=2304, bias=True)
          (proj): Linear(in_features=768, out_features=768, bias=True)
          (proj_drop): Dropout(p=0.0, inplace=False)
          (attn_drop): Dropout(p=0.0, inplace=False)
        )
        (temporal_norm1): LayerNorm((768,), eps=1e-06, elementwise_affine=True)
        (temporal_attn): Attention(
          (qkv): Linear(in_features=768, out_features=2304, bias=True)
          (proj): Linear(in_features=768, out_features=768, bias=True)
          (proj_drop): Dropout(p=0.0, inplace=False)
   

In [12]:
def classify_action(frames):
    input_sensor = stack_frames(frames)

    with torch.no_grad():
        outputs = model(input_sensor)

    predicted_idx = torch.argmax(outputs, dim=-1).item()
    return ["Taking", "Putting", "Dropped"][predicted_idx]

In [13]:
def estimate_direction(flow):
    h, w = flow.shape[:2]
    ang = np.arctan2(flow[..., 1], flow[..., 0])
    left_ratio = np.sum(ang < 0) / (h * w)
    return "left" if left_ratio > 0.5 else "right"


In [None]:
cap = cv2.VideoCapture("/content/Test.mp4")
if not cap.isOpened():
    print("Error: Could not open video.")

ret, prev_frame = cap.read()
frames = []

while cap.isOpened():
    ret, next_frame = cap.read()
    if not ret:
      break

    flow = compute_optical_flow(prev_frame, next_frame)
    flow_img = flow_to_image(flow)

    frames.append(flow_img)

    direction = estimate_direction(flow)

    if len(frames) >= 8:
        action = classify_action(frames)
        print(f"action: {action}, direction:{direction}")

    prev_frame = next_frame

cap.release()

action: Putting, direction:right
action: Putting, direction:right
action: Putting, direction:right
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
action: Putting, direction:left
