In [63]:
import db_builder.db_handler as dbh
import numpy as np
from torchvision import transforms, datasets
import logging
from getpass import getpass
from PIL import Image
import torchvision.models as models
import torch.nn as nn
import torch
import os
import cv2
import time
from random import randint, seed
from os import path
from collections import Counter

In [26]:
logging.getLogger('sqlalchemy').setLevel(logging.WARNING)
db_params = {
    'user': 'postgres',
    'password': getpass('Please enter DB pw'),  # enter your DB password
    'host': 'localhost',  # 'localhost' or IP address
    'port': '5432',  # '5432'
    'database': 'ttdatabase',  #tensionTerminator
}
toolcheck = dbh.DB_Conn(db_params)
toolcheck.connect()
engine = toolcheck.get_engine()

Connected to PostgreSQL, DB: ttdatabase


In [27]:
transforms_wt = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        [0.485, 0.456, 0.406],
        [0.229, 0.224, 0.225]
    )
])

In [28]:
transforms_person = transforms.Compose([
    transforms.Resize(150),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        [0.485, 0.456, 0.406],
        [0.229, 0.224, 0.225]
    )
])

In [29]:
def dataset_path(model_folder: str,timestamp: str):
    abs_path = os.getcwd()
    two_up =  path.abspath(path.join(abs_path ,"../.."))
    return path.join(two_up, 'data', model_folder, timestamp)

In [31]:
tool_dataset = datasets.ImageFolder(
    root=dataset_path('tool_finder', '10_11_2023_21_05_33'),
    transform=transforms_wt
)

bodyside_dataset = datasets.ImageFolder(
    root=dataset_path('bodyside_finder', '20_11_2023_20_33_30'),
    transform=transforms_wt
)

person_dataset = datasets.ImageFolder(
    root=dataset_path('person_test', '29_11_2023_07_24_46'),
    transform=transforms_person
)

In [32]:
tool_dataset.classes

['duoballs', 'trigger']

In [33]:
bodyside_dataset.classes

['left', 'middle', 'right']

In [34]:
person_dataset.classes

['Christina_Greiderer',
 'Christine_Lackinger',
 'Juergen_Zangerl',
 'Lukas_Prenner',
 'MartinPO_Feuerstein',
 'Martin_Hofer',
 'Philipp_Egger',
 'Pirmin_Aster',
 'Robert_Goller',
 'Suganthi_Manoharan']

In [35]:
transfer_tool_model = models.resnet152()
transfer_tool_model.fc = nn.Sequential(
    nn.Linear(transfer_tool_model.fc.in_features, 2048),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(2048, 1024),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(1024, 500),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(500, 2)
)
transfer_tool_model_state_dict = torch.load("C:\\Users\\Pirmin.000\\PycharmProjects\\IGP\\models\\tool_finder\\10_11_2023_21_05_33\\model.pt")
transfer_tool_model.load_state_dict(transfer_tool_model_state_dict)
transfer_tool_model.to("cuda")
transfer_tool_model.eval()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [36]:
transfer_bodyside_model = models.densenet201()
num_features = transfer_bodyside_model.classifier.in_features
transfer_bodyside_model.classifier = nn.Sequential(
    nn.Linear(num_features, 2048),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(2048,1024),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(1024,500),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(500,3)
)
transfer_model_state_dict = torch.load("C:\\Users\\Pirmin.000\\PycharmProjects\\IGP\models\\bodyside_finder\\20_11_2023_20_33_30\\model_26_11_2023_22_26_06.pt")
transfer_bodyside_model.load_state_dict(transfer_model_state_dict)
transfer_bodyside_model.to("cuda")
transfer_bodyside_model.eval()

DenseNet(
  (features): Sequential(
    (conv0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (norm0): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu0): ReLU(inplace=True)
    (pool0): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (denseblock1): _DenseBlock(
      (denselayer1): _DenseLayer(
        (norm1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu1): ReLU(inplace=True)
        (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (norm2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu2): ReLU(inplace=True)
        (conv2): Conv2d(128, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      )
      (denselayer2): _DenseLayer(
        (norm1): BatchNorm2d(96, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu

In [40]:
transfer_person_model = models.resnet152()
transfer_person_model.fc = nn.Sequential(
    nn.Linear(transfer_person_model.fc.in_features, 2048),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(2048, 1024),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(1024, 500),
    nn.ReLU(),
    nn.Dropout(),
    nn.Linear(500, 10)
)
transfer_person_model_state_dict = torch.load("C:\\Users\\Pirmin.000\\PycharmProjects\\IGP\\models\\person_test\\29_11_2023_07_24_46\\model_29_11_2023_17_31_06.pt")
transfer_person_model.load_state_dict(transfer_person_model_state_dict)
transfer_person_model.to("cuda")
transfer_person_model.eval()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [41]:
def checkImage(img, transfer_model, orig_set, transforms_wt):

    img_tensor = transforms_wt(img).unsqueeze(0)
    #plt.imshow(img_tensor)
    prediction = transfer_model(img_tensor.to("cuda"))
    predicted_probabilities = torch.softmax(prediction, dim=1)
    predicted_class_idx = torch.argmax(prediction).item()
    predicted_class = orig_set.classes[predicted_class_idx]

    # Get the confidence score for the predicted class
    confidence = predicted_probabilities[0, predicted_class_idx].item() * 100  # Convert to percentage


    return predicted_class, confidence

In [82]:
def trigger_crop(image):
    crop_box = (400, 450, 550, 550)
    cropped_image = transforms.functional.crop(image, *crop_box)
    return cropped_image

def video_to_image_converter(source_path: str, output_path: str, crop=False):
    class_list = []
    vidcap = cv2.VideoCapture(source_path)
    os.makedirs(output_path, exist_ok=True)
    count = 0

    total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = 15

    transform_test = transforms.ToPILImage()
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')  # Codec for MP4 format
    output_video = cv2.VideoWriter('output_video.mp4', fourcc, 30, (int(vidcap.get(3)), int(vidcap.get(4))))
    

    
    predicted_tool_class = None
    tool_confidence = None
    predicted_bodyside_class = None
    bodyside_confidence = None
    predicted_person_class = None
    person_confidence = None

    while count < total_frames:
        success, frame = vidcap.read()
        #plt.imshow(frame)

        if count % fps == 0:
            org_image = transform_test(frame)
            image = trigger_crop(org_image)
            image = np.asarray(image)
            cv2.imwrite(f"tmp/crop_{count}.png", image)
            cv2.imwrite(f"tmp/{count}.png", image)

            crop_image = Image.open(f"tmp/crop_{count}.png")
            image = Image.open(f"tmp/{count}.png")
            predicted_tool_class, tool_confidence = checkImage(crop_image, transfer_tool_model, tool_dataset, transforms_wt)
            predicted_bodyside_class, bodyside_confidence = checkImage(image, transfer_bodyside_model, bodyside_dataset, transforms_wt)
            predicted_person_class, person_confidence = checkImage(image, transfer_person_model, person_dataset, transforms_person)
            class_list.append(predicted_person_class)
            
            
        text = f"Predicted tool: {predicted_tool_class}, {tool_confidence:.2f}\n"\
               f"Predicted bodyside: {predicted_bodyside_class}, {bodyside_confidence:.2f}\n"\
               f"Predicted person: {predicted_person_class.split('_')[1]}, {person_confidence:.2f}"

        # Split the text into lines to adjust the positioning
        lines = text.split('\n')
        y_position = 50  # Initial y-position
        
        # Write each line separately
        for line in lines:
            cv2.putText(
                frame,
                line,
                (20, y_position),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (0, 0, 255),
                2,
                cv2.LINE_AA,
            )
            y_position += 30  # Increase y-position for the next line

        output_video.write(frame)

        cv2.imshow('Frame', frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        count += 1

    print(Counter(class_list))
    vidcap.release()
    output_video.release()
    cv2.destroyAllWindows()


In [83]:
# Id 129 -150 ist not labeled
seed(int(time.time()))
rand_num = randint(129, 150)
print(f"Randomly picked loop id: {rand_num}")

#Pirmin 132, 137, 141 ,129

directory = "tmp"
if not os.path.exists(directory):
    os.makedirs(directory)
    
video_source = toolcheck.get_filepath_by_loop_id(137)
video_to_image_converter(video_source, directory, True)

Randomly picked loop id: 139
2023-11-29 19:06:55,989 INFO sqlalchemy.engine.Engine SELECT file_path FROM video WHERE loop_id = %(loop_id)s AND device = 'rgbCam'
2023-11-29 19:06:55,990 INFO sqlalchemy.engine.Engine [cached since 3651s ago] {'loop_id': 137}
Counter({'Pirmin_Aster': 47, 'Martin_Hofer': 15, 'Suganthi_Manoharan': 2, 'Juergen_Zangerl': 1})
