In [1]:
import torch
import torchvision 
from tqdm import tqdm
import matplotlib

device = "cuda:0" if torch.cuda.is_available() else "cpu"
transform = torchvision.transforms.Compose([torchvision.transforms.ToTensor(),
                                torchvision.transforms.Normalize(mean = [0.5],std = [0.5])])

path = './data/' 
trainData = torchvision.datasets.MNIST(path,train = True,transform = transform,download = True)
testData = torchvision.datasets.MNIST(path,train = False,transform = transform)

BATCH_SIZE = 256  
trainDataLoader = torch.utils.data.DataLoader(dataset = trainData,batch_size = BATCH_SIZE,shuffle = True)
testDataLoader = torch.utils.data.DataLoader(dataset = testData,batch_size = BATCH_SIZE)

class Net(torch.nn.Module):
    def __init__(self):
        super(Net,self).__init__()
        self.model = torch.nn.Sequential(
            torch.nn.Conv2d(in_channels = 1,out_channels = 16,kernel_size = 3,stride = 1,padding = 1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size = 2,stride = 2),
            torch.nn.Conv2d(in_channels = 16,out_channels = 32,kernel_size = 3,stride = 1,padding = 1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(kernel_size = 2,stride = 2),
            torch.nn.Conv2d(in_channels = 32,out_channels = 64,kernel_size = 3,stride = 1,padding = 1),
            torch.nn.ReLU(),
            torch.nn.Flatten(),
            torch.nn.Linear(in_features = 7 * 7 * 64,out_features = 128),
            torch.nn.ReLU(),
            torch.nn.Linear(in_features = 128,out_features = 10),
            torch.nn.Softmax(dim=1)
        )
        
    def forward(self,input):
        output=self.model(input)
        return output


In [2]:
net = Net().to(device)
lossF = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters())
EPOCHS = 10

for epoch in range(1,EPOCHS + 1):
    processBar = tqdm(trainDataLoader,unit = 'step')
    net.train(True)
    for step,(trainImgs,labels) in enumerate(processBar):
        trainImgs = trainImgs.to(device)
        labels = labels.to(device)
        net.zero_grad()
        outputs = net(trainImgs)
        loss = lossF(outputs,labels)
        predictions = torch.argmax(outputs, dim = 1)
        accuracy = torch.sum(predictions == labels)/labels.shape[0]
        loss.backward()
        optimizer.step()
        processBar.set_description("[%d/%d] Loss: %.4f, Acc: %.4f" % 
                                    (epoch,EPOCHS,loss.item(),accuracy.item()))
        
torch.save(net.state_dict(), 'model.pth')

[1/10] Loss: 1.7162, Acc: 0.7396: 100%|██████████| 235/235 [00:18<00:00, 13.02step/s]
[2/10] Loss: 1.5094, Acc: 0.9479: 100%|██████████| 235/235 [00:16<00:00, 14.41step/s]
[3/10] Loss: 1.4741, Acc: 0.9896: 100%|██████████| 235/235 [00:16<00:00, 13.98step/s]
[4/10] Loss: 1.4842, Acc: 0.9792: 100%|██████████| 235/235 [00:16<00:00, 13.95step/s]
[5/10] Loss: 1.4618, Acc: 1.0000: 100%|██████████| 235/235 [00:16<00:00, 14.26step/s]
[6/10] Loss: 1.4849, Acc: 0.9792: 100%|██████████| 235/235 [00:16<00:00, 14.26step/s]
[7/10] Loss: 1.4653, Acc: 1.0000: 100%|██████████| 235/235 [00:16<00:00, 14.06step/s]
[8/10] Loss: 1.4614, Acc: 1.0000: 100%|██████████| 235/235 [00:17<00:00, 13.82step/s]
[9/10] Loss: 1.4842, Acc: 0.9792: 100%|██████████| 235/235 [00:16<00:00, 14.46step/s]
[10/10] Loss: 1.4842, Acc: 0.9792: 100%|██████████| 235/235 [00:18<00:00, 12.97step/s]


In [3]:
import cv2
import numpy as np

model = Net()
model_path = 'model.pth'
model.load_state_dict(torch.load(model_path))
model.eval()

cap = cv2.VideoCapture('tem.mp4')
track_id = 0
tracks = {}
output_video_path = 'output_video.mp4'
np.random.seed(0)
colors = [(0,0,0),(0,0,255),(0,255,0),(255,0,0),(255,255,0),(255,0,255),(0,255,255),(255,255,255),(50,150,200),(200,150,50)]

backSub = cv2.createBackgroundSubtractorMOG2()

def detect_digits(frame, tracks, track_id):

    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    w0,h0=gray.shape
    _, thresh = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY)
    contours, tem1= cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    out=torch.randn(1,1,28,28)
    for contour in contours:
        x, y, w, h = cv2.boundingRect(contour)
        if w<30 and h<30:
             continue
        tem=gray[max(0,y-10):min(w0,y+h+10),max(0,x-10):max(h0,x+w+10)]
        tem=cv2.resize(tem, (28, 28), interpolation=cv2.INTER_AREA)
        tem=torch.from_numpy(tem)
        tem=tem.view(1,1,28,28)
        out=torch.cat((out,tem),dim=0)
        tracks[track_id] = {'x': x, 'y': y, 'w': w, 'h': h, 'contour': contour}
        track_id += 1
    out=out[1:,:,:,:]
    num=model(out)
    _, max_indices = torch.max(num, dim=1)
    for i in range(len(max_indices)):
        tracks[i].update({'num':max_indices[i].item()})


fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    fgMask = backSub.apply(frame)
    detect_digits(frame, tracks, track_id)
    for track_id, rect in tracks.items():
        x, y, w, h,num = rect['x'], rect['y'], rect['w'], rect['h'],rect['num']
        cv2.rectangle(frame, (x, y), (x + w, y + h), colors[num], 2)
        cv2.putText(frame, str(num), (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, colors[num], 2)
    track_id = 0
    tracks = {}
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    out.write(frame)

cap.release()
out.release()
cv2.destroyAllWindows()


In [4]:

import numpy as np
import cv2

def gaussian2d_labels(sz,sigma):
    w,h=sz
    xs, ys = np.meshgrid(np.arange(w), np.arange(h))
    center_x, center_y = w / 2, h / 2
    dist = ((xs - center_x) ** 2 + (ys - center_y) ** 2) / (sigma**2)
    labels = np.exp(-0.5*dist)
    return labels
    
def cos_window(sz):
    """
    width, height = sz
    j = np.arange(0, width)
    i = np.arange(0, height)
    J, I = np.meshgrid(j, i)
    cos_window = np.sin(np.pi * J / width) * np.sin(np.pi * I / height)
    """
    cos_window = np.hanning(int(sz[1]))[:, np.newaxis].dot(np.hanning(int(sz[0]))[np.newaxis, :])
    return cos_window

class BaseCF:
    def __init__(self):
        raise NotImplementedError

    def init(self,first_frame,bbox):
        raise NotImplementedError

    def update(self,current_frame):
        raise NotImplementedError

class MOSSE(BaseCF):
    def __init__(self,interp_factor=0.125,sigma=2.):
        super(MOSSE).__init__()
        self.interp_factor=interp_factor
        self.sigma=sigma

    def init(self,first_frame,bbox):
        if len(first_frame.shape)!=2:
            assert first_frame.shape[2]==3
            first_frame=cv2.cvtColor(first_frame,cv2.COLOR_BGR2GRAY)
        first_frame=first_frame.astype(np.float32)/255
        x,y,w,h=tuple(bbox)
        self._center=(x+w/2,y+h/2)
        self.w,self.h=w,h
        w,h=int(round(w)),int(round(h))
        self.cos_window=cos_window((w,h))
        self._fi=cv2.getRectSubPix(first_frame,(w,h),self._center)
        self._G=np.fft.fft2(gaussian2d_labels((w,h),self.sigma))
        self.crop_size=(w,h)
        self._Ai=np.zeros_like(self._G)
        self._Bi=np.zeros_like(self._G)
        for _ in range(8):
            fi=self._rand_warp(self._fi)
            Fi=np.fft.fft2(self._preprocessing(fi,self.cos_window))
            self._Ai+=self._G*np.conj(Fi)
            self._Bi+=Fi*np.conj(Fi)


    def update(self,current_frame,vis=False):
        if len(current_frame.shape)!=2:
            assert current_frame.shape[2]==3
            current_frame=cv2.cvtColor(current_frame,cv2.COLOR_BGR2GRAY)
        current_frame=current_frame.astype(np.float32)/255
        Hi=self._Ai/self._Bi
        fi=cv2.getRectSubPix(current_frame,(int(round(self.w)),int(round(self.h))),self._center)
        fi=self._preprocessing(fi,self.cos_window)
        Gi=Hi*np.fft.fft2(fi)
        gi=np.real(np.fft.ifft2(Gi))
        if vis is True:
            self.score=gi
        curr=np.unravel_index(np.argmax(gi, axis=None),gi.shape)
        dy,dx=curr[0]-(self.h/2),curr[1]-(self.w/2)
        x_c,y_c=self._center
        x_c+=dx
        y_c+=dy
        self._center=(x_c,y_c)
        fi=cv2.getRectSubPix(current_frame,(int(round(self.w)),int(round(self.h))),self._center)
        fi=self._preprocessing(fi,self.cos_window)
        Fi=np.fft.fft2(fi)
        self._Ai=self.interp_factor*(self._G*np.conj(Fi))+(1-self.interp_factor)*self._Ai
        self._Bi=self.interp_factor*(Fi*np.conj(Fi))+(1-self.interp_factor)*self._Bi
        return [self._center[0]-self.w/2,self._center[1]-self.h/2,self.w,self.h]

    def _preprocessing(self,img,cos_window,eps=1e-5):
        img=np.log(img+1)
        img=(img-np.mean(img))/(np.std(img)+eps)
        return cos_window*img

    def _rand_warp(self,img):
        h, w = img.shape[:2]
        C = .1
        ang = np.random.uniform(-C, C)
        c, s = np.cos(ang), np.sin(ang)
        W = np.array([[c + np.random.uniform(-C, C), -s + np.random.uniform(-C, C), 0],
                      [s + np.random.uniform(-C, C), c + np.random.uniform(-C, C), 0]])
        center_warp = np.array([[w / 2], [h / 2]])
        tmp = np.sum(W[:, :2], axis=1).reshape((2, 1))
        W[:, 2:] = center_warp - center_warp * tmp
        warped = cv2.warpAffine(img, W, (w, h), cv2.BORDER_REFLECT)
        return warped
    



In [5]:
def track_and_save_video(video_path, output_path, initial_bbox):
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Error opening video file")
        return
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))
    ret, frame = cap.read()
    if not ret:
        print("Failed to read the first frame of the video.")
        return
    tracker=[]
    for i in initial_bbox:
        tracker1 = MOSSE()
        tracker1.init(frame, i)
        tracker.append(tracker1)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        bbox=[]
        for i in range(len(initial_bbox)):
            bbox= tracker[i].update(frame)
            x, y, w, h = bbox
            cv2.rectangle(frame, (int(x), int(y)), (int(x + w), int(y + h)), (0, 255, 0), 2)
            out.write(frame)
            cv2.imshow('Tracking', frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

video_path = 'pedestrians.avi'  
output_path = 'catch.avi' 
def select_roi_from_video(video_path):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("Error opening video file")
        return None

    ret, frame = cap.read()
    if not ret:
        print("Failed to read the first frame of the video.")
        return None
    

    cap.release()
    roi = cv2.selectROI(frame, showCrosshair=True, fromCenter=False)
    x, y, w, h = roi
    cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
    cv2.imshow('Selected ROI', frame)
    cv2.waitKey(0)
    cv2.destroyAllWindows()
    roi_image = frame[int(y):int(y+h), int(x):int(x+w)]
    return roi_image,[x, y, w, h]

n=int(input('选取数:'))
roi_image=[]
pos=[]
for i in range(n):
    roi_image1,pos1 = select_roi_from_video(video_path)
    roi_image.append(roi_image1)
    pos.append(pos1)

    if roi_image is not None:
        cv2.imshow('Cropped ROI', roi_image[-1])
        cv2.waitKey(0)
        cv2.destroyAllWindows()
initial_bbox = pos  

track_and_save_video(video_path, output_path, initial_bbox)