In [1]:
import base64
import json
from datetime import datetime
import time

import cv2  # type:ignore
from requests import post
from httpx import get
from uuid import getnode as get_mac

SERVER_URL_SANITATION = "http://higienize.senfio.com.br:8080/senfio/HM"
SERVER_URL_FACE = "http://higienize.senfio.com.br:8080/senfio/leito"
FACES_FOUND_HANDS_SANITATION = {}
FACES_FOUND = {}
PATH_TO_SAVE_BY_HANDS_SANITATION = "./hands/"
PATH_TO_SAVE_BY_GET_FACE = "./get_face/"

def get_datetime():
    return datetime.now()

def get_timestamp():
    return time.time()

def get_mac_str():
    mac = get_mac()
    mac = ':'.join(("%012X" % mac)[i:i+2] for i in range(0, 12, 2))
    return mac

def cv2_to_base64(image) -> str:
    return base64.b64encode(image).decode("utf8")

def storage_face_by_hands_sanitation(image_face):    
    global FACES_FOUND_HANDS_SANITATION
    FACES_FOUND_HANDS_SANITATION.update({get_timestamp():image_face})

def storage_face_by_get_face(image_face):    
    global FACES_FOUND
    FACES_FOUND.update({get_timestamp():image_face})

def send_image_to_server(action):
    global FACES_FOUND_HANDS_SANITATION
    global FACES_FOUND
    dict_to_send = {}
    url_to_send = SERVER_URL_SANITATION
    if action == "sanitation":
        dict_to_send = FACES_FOUND_HANDS_SANITATION
    else:
        dict_to_send = FACES_FOUND
        url_to_send = SERVER_URL_FACE
    if not len(dict_to_send) > 0:
        return None
    for timestamp, image_face in dict_to_send.items():
        break
    _, encoded_image = cv2.imencode(".jpg", image_face)
    encoded_image = encoded_image.tobytes()
    image_64 = cv2_to_base64(encoded_image)
    data = {"images":[image_64], "timestamp":str(timestamp), "mac":str(get_mac_str())}
    try:
        response = post(
            url=url_to_send,
            headers={"Content-type": "application/json"},
            data=json.dumps(data),
            timeout=10,
        ).json()
        if response.ok:
            print(f"Imagem enviada! Status code: {response.status_code}")
        else:
            print(f"Imagem não enviada! Status code: {response.status_code}")
    except:
        pass
    del dict_to_send[timestamp]
    if action == "sanitation":
        FACES_FOUND_HANDS_SANITATION = dict_to_send
    else:
        FACES_FOUND = dict_to_send

def send_request_to_led(function):
    server = "http://localhost:8080/" + str(function)
    try:
        get(server, timeout = 0.01)
    except:
        pass

def save_hands_sanitation_imagem_in_local():
    global FACES_FOUND_HANDS_SANITATION
    if not len(FACES_FOUND_HANDS_SANITATION) > 0:
        return None     
    for timestamp, image_to_save in FACES_FOUND_HANDS_SANITATION.items():
        break
    cv2.imwrite(f"{PATH_TO_SAVE_BY_HANDS_SANITATION}{str(timestamp)}.jpg", image_to_save)
    print("Salvando imagem em: "+f"{PATH_TO_SAVE_BY_HANDS_SANITATION}{str(timestamp)}.jpg")
    del FACES_FOUND_HANDS_SANITATION[timestamp]

def save_face_imagem_in_local():
    global FACES_FOUND
    #print("Salvando face no local...")
    if not len(FACES_FOUND) > 0:
        return None
    for timestamp, image_to_save in FACES_FOUND.items():
        break
    cv2.imwrite(f"{PATH_TO_SAVE_BY_GET_FACE}{str(timestamp)}.jpg", image_to_save)
    print("Salvando imagem em: "+f"{PATH_TO_SAVE_BY_GET_FACE}{str(timestamp)}.jpg")
    del FACES_FOUND[timestamp]

import numpy as np  # type:ignore
import PIL.Image
import torch  # type:ignore
import torch2trt  # type:ignore
import torchvision.transforms as transforms  # type:ignore
import trt_pose.coco  # type:ignore
import trt_pose.models  # type:ignore
from jetcam.csi_camera import CSICamera  # type:ignore
from torch2trt import TRTModule  # type:ignore
from trt_pose.draw_objects import DrawObjects  # type:ignore
from trt_pose.parse_objects import ParseObjects  # type:ignore

with open("human_pose.json", "r") as f:
    human_pose = json.load(f)

TOPOLOGY = trt_pose.coco.coco_category_to_topology(human_pose)
NUM_PARTS = len(human_pose["keypoints"])
NUM_LINKS = len(human_pose["skeleton"])
MODEL = trt_pose.models.resnet18_baseline_att(NUM_PARTS, 2 * NUM_LINKS).cuda().eval()
MODEL_WEIGHTS = "resnet18_baseline_att_224x224_A_epoch_249.pth"
MODEL.load_state_dict(torch.load(MODEL_WEIGHTS))
WIDTH = 224
HEIGHT = 224

DATA = torch.zeros((1, 3, HEIGHT, WIDTH)).cuda()

TRT_MODEL = torch2trt.torch2trt(
    MODEL, [DATA], fp16_mode=True, max_workspace_size=1 << 25
)

OPTIMIZED_MODEL = "resnet18_baseline_att_224x224_A_epoch_249_trt.pth"
torch.save(TRT_MODEL.state_dict(), OPTIMIZED_MODEL)

TRT_MODEL = TRTModule()
TRT_MODEL.load_state_dict(torch.load(OPTIMIZED_MODEL))
MEAN = torch.Tensor([0.485, 0.456, 0.406]).cuda()
STD = torch.Tensor([0.229, 0.224, 0.225]).cuda()
DEVICE = torch.device("cuda")
CAMERA = CSICamera(
    width=WIDTH, height=HEIGHT, capture_width=3264, capture_height=2464, capture_fps=21
)
CAMERA.running = True
WRIST_IN_BBOX = []
ACCEPTABLE_DISTANCE = []
VERIFIED_WRIST = []

PARSE_OBJECTS = ParseObjects(TOPOLOGY)
DRAW_OBJECTS = DrawObjects(TOPOLOGY)

def get_fps():
    t0 = time.time()
    torch.cuda.current_stream().synchronize()
    for _ in range(50):
        _ = TRT_MODEL(DATA)
    torch.cuda.current_stream().synchronize()
    t1 = time.time()
    fps = (50.0 / (t1 - t0))
    print(f"FPS: {fps}")
    return int(fps)

APPLICATION_FPS = get_fps()
APPLICATION_FPS = get_fps()

def preprocess(image):
    global DEVICE
    DEVICE = torch.device("cuda")
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(DEVICE)
    image.sub_(MEAN[:, None, None]).div_(STD[:, None, None])
    return image[None, ...]

def seek_peaks(topology, image, persons, normalized_peaks):
    color = (0, 255, 0)
    normalized_peaks = normalized_peaks[0]
    persons = persons[0]
    left_eye = 1
    right_eye = 2
    left_wrist = 9
    right_wrist = 10
    left_shoulder = 5
    right_shoulder = 6
    height = image.shape[0]
    width = image.shape[1]
    amount_peaks = 0
    wrist_coordinates = [0, 0, 0, 0]
    face_coordinates = [
        (0, 0),
        (0, 0),
        (0, 0),
        (0, 0),
    ]  # [left_shoulder, left_eye, right_eye, right_shoulder]
    person_number = 0
    person = persons[person_number]
    K = topology.shape[0]
    for landmark in range(person.shape[0]):
        peak_number = int(person[landmark])
        if peak_number < 0:
            continue
        else:
            amount_peaks += 1
        peak = normalized_peaks[landmark][peak_number]
        x = round(float(peak[1]) * width)
        y = round(float(peak[0]) * height)
        cv2.circle(image, (x, y), 3, color, 2)
        if landmark == left_shoulder:
            face_coordinates[0] = (x, y)
        if landmark == left_eye:
            face_coordinates[1] = (x, y)
        if landmark == right_eye:
            face_coordinates[2] = (x, y)
        if landmark == right_shoulder:
            face_coordinates[3] = (x, y)
        if landmark == left_wrist:
            wrist_coordinates[0] = x
            wrist_coordinates[1] = y
        if landmark == right_wrist:
            wrist_coordinates[2] = x
            wrist_coordinates[3] = y
    
    for k in range(K):
        c_a = topology[k][2]
        c_b = topology[k][3]
        if person[c_a] >= 0 and person[c_b] >= 0:
            peak0 = normalized_peaks[c_a][person[c_a]]
            peak1 = normalized_peaks[c_b][person[c_b]]
            x0 = round(float(peak0[1]) * width)
            y0 = round(float(peak0[0]) * height)
            x1 = round(float(peak1[1]) * width)
            y1 = round(float(peak1[0]) * height)
            cv2.line(image, (x0, y0), (x1, y1), color, 2)

    return wrist_coordinates, face_coordinates, amount_peaks

# BBOX: [A B
#       D C]
def join_bbox(bbox_max, bbox_min):
    add = 5
    bbox_max_points = bbox_max.tolist()
    bbox_min_points = bbox_min.tolist()
    contour_points = bbox_max_points + bbox_min_points
    x_points: list[int] = [point[0] for point in contour_points]
    y_points: list[int] = [point[1] for point in contour_points]

    min_x = min(x_points)
    min_y = min(y_points)
    max_x = max(x_points)
    max_y = max(y_points)

    min_x -= add * 3
    min_y -= add * 3
    max_x += add * 3
    max_y += add * 3

    bbox = np.int0(
        [[min_x, min_y], [max_x, min_y], [max_x, max_y], [min_x, max_y]]  # type:ignore
    )
    return bbox

def identify_lines(image):
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    lower = np.array([0, 150, 150])
    upper = np.array([10, 255, 255])
    mask = cv2.inRange(hsv, lower, upper)
    red_contours = cv2.findContours(
        mask.copy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
    )[-2]
    try:
        ordered_red_contours = sorted(red_contours, reverse=True, key=cv2.contourArea)
        max_red_box = cv2.boxPoints(cv2.minAreaRect(ordered_red_contours[0]))
        min_red_box = cv2.boxPoints(cv2.minAreaRect(ordered_red_contours[1]))
        max_red_box = np.int0(max_red_box)
        min_red_box = np.int0(min_red_box)
        red_bbox = join_bbox(max_red_box, min_red_box)
        cv2.drawContours(image, [red_bbox], 0, (255, 0, 255), 2)
    except:
        red_bbox = np.int0([[0, 0], [0, 0], [0, 0], [0, 0]])
    return red_bbox

def verify_wrist_in_bbox(wrist, bbox):
    return wrist[0] in range(bbox[0][0] + 1, bbox[1][0]) and wrist[1] in range(
        bbox[0][1] + 1, bbox[2][1]
    )

def verify_wrists_in_bbox(image, wrist_coordinates):
    bbox = identify_lines(image)
    left_wrist = np.int0([wrist_coordinates[0], wrist_coordinates[1]])  # type:ignore
    right_wrist = np.int0([wrist_coordinates[2], wrist_coordinates[3]])  # type:ignore
    if verify_wrist_in_bbox(left_wrist, bbox) or verify_wrist_in_bbox(
        right_wrist, bbox
    ):
        WRIST_IN_BBOX.append(True)
        if WRIST_IN_BBOX.count(True) == 3:
            print("pulso por 3x seguidas")
            VERIFIED_WRIST.append(True)
            del WRIST_IN_BBOX[:]
    else:
        del VERIFIED_WRIST[:]
        del WRIST_IN_BBOX[:]

def verify_hands_sanitation(acceptable_distance, image_face, image):
    if acceptable_distance.count(True) >= int(len(acceptable_distance) * 0.8):
        print(
            str(acceptable_distance.count(True))
            + " >= "
            + str(int(len(acceptable_distance) * 0.8))
        )
        print("Higienização")
        storage_face_by_hands_sanitation(image_face)
        cv2.putText(image, "H", (112 , 112), cv2.FONT_HERSHEY_SIMPLEX, 3, (255, 0, 0), 1, cv2.LINE_AA)
        send_request_to_led("hands")
    else:
        print(
            str(acceptable_distance.count(True))
            + " < "
            + str(int(len(acceptable_distance) * 0.8))
        )
        print("NÃO higienização")
    del acceptable_distance[:]

def verify_wrist_union(wrist_coordinates, image_face, image):
    diff_axis_ok = 30
    left_wrist = np.int0([wrist_coordinates[0], wrist_coordinates[1]])  # type:ignore
    right_wrist = np.int0([wrist_coordinates[2], wrist_coordinates[3]])  # type:ignore

    x_distance = abs(right_wrist[0] - left_wrist[0])  # type:ignore
    y_distance = abs(right_wrist[1] - left_wrist[1])  # type:ignore
    if (
        not 0 in (x_distance, y_distance)
        and x_distance <= diff_axis_ok
        and y_distance <= diff_axis_ok
    ):
        ACCEPTABLE_DISTANCE.append(True)
    else:
        ACCEPTABLE_DISTANCE.append(False)

    if len(ACCEPTABLE_DISTANCE) >= 5 * APPLICATION_FPS:
        verify_hands_sanitation(ACCEPTABLE_DISTANCE, image_face, image)
        del VERIFIED_WRIST[:]

def get_face(image, face_coordinates):
    required_size = (160, 160)
    if face_coordinates[1] != (0, 0) != face_coordinates[2]:
        start_x = min(face_coordinates[0][0], face_coordinates[3][0])
        end_x = max(face_coordinates[0][0], face_coordinates[3][0])

        start_y = (min(face_coordinates[1][1], face_coordinates[2][1])) - 30
        end_y = max(face_coordinates[0][1], face_coordinates[3][1])
        image_face = image[start_y:end_y, start_x:end_x]
        try:
            image_face = cv2.resize(
                image_face, required_size, interpolation=cv2.INTER_AREA
            )
            storage_face_by_get_face(image_face)
            send_request_to_led("bed_entry")
        except:
            pass
    else:
        image_face = image[0, 0]
    return image_face

def verify_people_on_local(amount_peaks, image):
    posit = 45
    amount_peaks_necessary = 3
    if amount_peaks >= amount_peaks_necessary:
        #print("Há pessoa no local")
        cv2.putText(image, "P", (posit , posit), cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 0, 0), 2, cv2.LINE_AA)
        send_request_to_led("person")
    else:
        #print("Não há pessoa no local")
        cv2.putText(image, "N", (posit, posit), cv2.FONT_HERSHEY_SIMPLEX, 2, (28, 184, 255), 2, cv2.LINE_AA)

from jetcam.utils import bgr8_to_jpeg
import ipywidgets
from IPython.display import display

image_w = ipywidgets.Image(format='jpeg')
image_f = ipywidgets.Image(format='jpeg')

def execute(change):
    wrist_coordinates = [0, 0, 0, 0]  # x_left, y_left, x_right, y_right
    face_coordinates = [
        (0, 0),
        (0, 0),
        (0, 0),
        (0, 0),
    ]  # Ombro esquerdo, olho esquerdo, olho direito, ombro direito
    amount_peaks = 0
    image = change["new"]
    image_copy = image.copy()
    data = preprocess(image)
    cmap, paf = TRT_MODEL(data)
    cmap, paf = cmap.detach().cpu(), paf.detach().cpu()
    _, persons, peaks = PARSE_OBJECTS(cmap, paf)
    wrist_coordinates, face_coordinates, amount_peaks = seek_peaks(
        TOPOLOGY, image, persons, peaks
    )
    image_face = get_face(image_copy, face_coordinates)
    if VERIFIED_WRIST.count(True) >= 1:
        verify_wrist_union(wrist_coordinates, image_face, image)
    else:
        verify_wrists_in_bbox(image, wrist_coordinates)
    verify_people_on_local(amount_peaks, image)
    try:
        image_f.value = bgr8_to_jpeg(image_face)
    except:
        pass
    image = cv2.resize(image, (600,600), interpolation=cv2.INTER_AREA)
    image_w.value = bgr8_to_jpeg(image)

print("-" * 5 + " Iniciando App SENFIO - Jetson " + "-" * 5)
send_request_to_led("init")
execute({"new": CAMERA.value})
CAMERA.observe(execute, names="value")

In [24]:
display(image_w)

In [28]:
display(image_f)

Image(value=b'', format='jpeg')

In [None]:
while True:
    send_image_to_server("face")
    send_image_to_server("sanitation")

In [34]:
CAMERA.unobserve_all()