In [3]:
import cv2
import torch
import torch.nn as nn
import torchvision.models as models
import numpy as np

In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [None]:
class GaugeNet(nn.Module):
    def __init__(self, num_keypoints=4):
        super(GaugeNet, self).__init__()
        # 백본 ResNet-18
        resnet = models.resnet18(weights=models.ResNet18_Weights.DEFAULT)
        
        # resnet의 마지막 avgpool과 fc 레이어를 제외한 특징 추출부만 사용
        self.backbone = nn.Sequential(*list(resnet.children())[:-2]) 
        
        # resnnt18의 최종 출력 채널은 512
        # 고해상도 히트맵 복원을 위한 upsampling
        self.deconv = nn.Sequential(
            nn.ConvTranspose2d(512, 256, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(128, 64, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, num_keypoints, kernel_size=1) 
        )
        
    def forward(self, x):
        x = self.backbone(x) # 출력 크기: [Batch, 512, H/32, W/32]
        x = self.deconv(x)   # 출력 크기: [Batch, num_keypoints, H/4, W/4]
        return x

# 모델 초기화
model = GaugeNet(num_keypoints=4).to(device).eval()
if device.type == 'cuda':
    model = model.half()

# GStreamer 파이프라인 (448x448)
def gstreamer_pipeline(sensor_id=0, flip_method=2):
    return (
        f"nvarguscamerasrc sensor-id={sensor_id} ! "
        f"video/x-raw(memory:NVMM), width=1280, height=720, framerate=30/1 ! "
        f"nvvidconv left=280 right=1000 top=0 bottom=720 flip-method={flip_method} ! " 
        f"video/x-raw, width=448, height=448, format=BGRx ! "      
        f"videoconvert ! video/x-raw, format=BGR ! appsink"
    )

def run():
    cap = cv2.VideoCapture(gstreamer_pipeline(), cv2.CAP_GSTREAMER)
    
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break

        # 전처리
        img_input = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img_input = img_input.astype(np.float32) / 255.0
        tensor = torch.from_numpy(img_input).permute(2, 0, 1).unsqueeze(0).to(device)
        if device.type == 'cuda': tensor = tensor.half()

        # 추론
        with torch.no_grad():
            heatmaps = model(tensor)

        # 바늘 중심, 끝점 추출
        hm = heatmaps[0].cpu().float().numpy()
        points = []
        for i in range(hm.shape[0]):
            _, conf, _, max_loc = cv2.minMaxLoc(hm[i])
            # 히트맵 좌표를 원본 이미지 좌표로 변환
            x = int(max_loc[0] * (448 / hm.shape[2]))
            y = int(max_loc[1] * (448 / hm.shape[1]))
            points.append((x, y))
            cv2.circle(frame, (x, y), 5, (255, 0, 0), -1)

        # 중심점 끝점 연결
        if len(points) >= 2:
            cv2.line(frame, points[0], points[1], (0, 255, 0), 2)

        cv2.imshow("Gauge", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

In [None]:
run()