In [1]:
import cv2
import numpy as np

In [2]:
PATH = 'pedestrian'

In [3]:
f = open(f'{PATH}/temporalROI.txt','r') 
line = f.readline()
roi_start , roi_end = line.split() 
roi_start = int(roi_start) 
roi_end = int(roi_end)

I_start = cv2.imread(f'{PATH}/input/in%06d.jpg' % roi_start)
HEIGHT, WIDTH, _ = I_start.shape

In [4]:
def vis_flow(u, v, YX, name):
    magnitude, angle = cv2.cartToPolar(u, v)
    hsv_image = np.zeros((YX[0], YX[1], 3), dtype=np.uint8)
    hsv_image[..., 0] = angle * 90 / np.pi
    hsv_image[..., 1] = cv2.normalize(magnitude, None, 0, 255, cv2.NORM_MINMAX)
    hsv_image[..., 2] = 255
    img = cv2.cvtColor(hsv_image, cv2.COLOR_HSV2RGB)

    cv2.imshow("optical flow", img)
    cv2.waitKey(3)

## Wyznaczanie przypływu optycznego za pomocą metody Farnebacka

In [5]:
img = cv2.imread(f'{PATH}/input/in%06d.jpg' % roi_start)
prev = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
for i in range(roi_start + 1, roi_end, 1):
    img = cv2.imread(f'{PATH}/input/in%06d.jpg' % i)
    next = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    flow_instance = cv2.FarnebackOpticalFlow_create()
    flow = flow_instance.calc(prev, next, None)
    u, v = flow[..., 0], flow[..., 1]
    u, v = flow[..., 0], flow[..., 1]
    prev = next
    vis_flow(u, v, (HEIGHT, WIDTH), 'Optical Flow Farneback')

## Detekcja kierunku ruchu i klasyfikacja obiektów z wykorzystaniem przepływu optycznego

In [6]:
def indexation(I, stats, mean_magnitude, std_magnitude, mean_angle, std_angle):
    
    I_VIS = I
    if (stats.shape [0] > 1):
        tab = stats [1: ,4]
        for i in range(len(tab)):
            pi = i + 1
            area = stats[pi, 2] * stats[pi, 3]

            if area > 1000:
                cv2.rectangle(I_VIS ,(stats[pi ,0],stats[pi ,1]) ,(stats[pi ,0]+ stats[pi ,2], stats[pi,1]+ stats[pi ,3]) ,(255,0,0) ,2)
                cv2.putText(I_VIS ,f'{mean_magnitude[pi]:.2f}',( stats[pi ,0],stats[pi ,1]),cv2.FONT_HERSHEY_SIMPLEX ,0.5 ,(255 ,0 ,0))
                cv2.putText(I_VIS ,f'{std_magnitude[pi]:.2f}',(stats[pi ,0], stats[pi,1] + (stats[pi,1] + stats[pi ,3])//4),cv2.FONT_HERSHEY_SIMPLEX ,0.5 ,(255 ,0 ,0))
                cv2.putText(I_VIS ,f'{mean_angle[pi]:.2f}',( stats[pi ,0], stats[pi,1] + (stats[pi,1] + stats[pi ,3])//2),cv2.FONT_HERSHEY_SIMPLEX ,0.5 ,(255 ,0 ,0))
                cv2.putText(I_VIS ,f'{std_angle[pi]:.2f}',( stats[pi ,0], stats[pi ,1] + stats[pi, 3]),cv2.FONT_HERSHEY_SIMPLEX ,0.5 ,(255 ,0 ,0))

    return I_VIS

In [7]:
def filtration(img):
    kernel = np.ones((3,3),np.uint8)
    M = cv2.medianBlur(img, 3)
    M = cv2.erode(M, kernel, iterations=1)
    M = cv2.dilate(M, kernel, iterations=3)
    return M

In [8]:
from statistics import mean, stdev

fgbg = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=30, detectShadows=False)
prev_frame = cv2.imread(f'{PATH}/input/in%06d.jpg' % roi_start, cv2.IMREAD_GRAYSCALE)

for i in range(roi_start + 1, 472, 1):
    frame = cv2.imread(f'{PATH}/input/in%06d.jpg' % i)
    gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

    fgmask = fgbg.apply(gray_frame, learningRate=0.01)

    fgmask = filtration(fgmask)

    ret, labels, stats, centroids = cv2.connectedComponentsWithStats(fgmask)

    magnitudes = [[]for _ in range(ret)]
    angles = [[] for _ in range(ret)]
    if ret > 0:
        flow = cv2.calcOpticalFlowFarneback(prev_frame, gray_frame, None, 0.5, 3, 15, 3, 5, 1.2, 0)
        magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])

        for y in range(0, labels.shape[0]):
            for x in range(0, labels.shape[1]):
                
                if labels[y, x] > 0:
                    if magnitude[y, x] > 1:
                        angles[labels[y, x]].append(angle[y,x])
                        magnitudes[labels[y, x]].append(magnitude[y, x])
        
        mean_magnitude = []
        std_dev_magnitude = []
        mean_angle = []
        std_dev_angle = []

        for magn in magnitudes:
            if len(magn) > 1:
                mean_magn = mean(magn)
                mean_magnitude.append(mean_magn)
                try:
                    std_dev_magnitude.append(stdev(magn, mean_magn))
                except AssertionError:
                    std_dev_magnitude.append(0)
            else:
                mean_magnitude.append(0)
                std_dev_magnitude.append(0)
        for ang in angles:
            if len(ang) > 1:
                mean_ang = mean(ang)
                mean_angle.append(mean_ang)
                try:
                    std_dev_angle.append(stdev(ang, mean_ang))
                except AssertionError:
                    std_dev_angle.append(0)
            else:
                mean_angle.append(0)
                std_dev_angle.append(0)

    prev_frame = gray_frame.copy()
    img  = indexation(frame, stats, mean_magnitude, std_dev_magnitude, mean_angle, std_dev_angle)
    cv2.imshow("indeksacja", img)
    cv2.imshow("maska", fgmask)
    cv2.waitKey(3)