In [1]:
# 모듈 생성
import json
import os
import cv2
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
from tensorflow import random # random seed 설정용
from sklearn.model_selection import train_test_split
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt

In [2]:
def getKeys():
    path = '[라벨]General_train_B_1/district/'
    direct = os.listdir(path)
    folder = []
    for d in direct:
        folder.append(path+d)

    get_on = []
    get_off = []
    keys = []

    for f in folder:
        path = f+'/'
        for i in os.listdir(path):
            with open(path+i,'r') as f:
                info = json.load(f)
                for i in range(len(info['annotations'])):
                    if len(info['annotations'][i]) == 0: pass
                    else: 
                        annotations = info['annotations'][i]
                        keypoints = annotations.get('keypoints')
                        keys.append(keypoints)
                        get_on.append(annotations.get('get_on'))
                        get_off.append(annotations.get('get_off'))
    keys = np.array(keys).reshape(-1,16,1,3)
    return keys, get_on, get_off

In [3]:
def getTarget():
    target = []
    _, get_on, get_off = getKeys()

    for i in range(len(get_on)):
        if get_on[i] == True: target.append(0)
        elif get_off[i] == True: target.append(1)
        elif (get_on[i] == False) and (get_off[i] == False): target.append(2)
    target = np.array(target).reshape(-1,1)
    return target

In [4]:
# 8:2 비율
train_X, test_X, train_y, test_y = train_test_split(getKeys()[0], getTarget(),
                                                  stratify=getTarget(),
                                                  test_size=0.2,
                                                  random_state=11)

In [5]:
train_X, val_X, train_y, val_y = train_test_split(train_X, train_y,
                                                  stratify=train_y,
                                                  test_size=0.2,
                                                  random_state=11)

In [6]:
train_X.shape, test_X.shape, val_X.shape

((16856, 16, 1, 3), (5268, 16, 1, 3), (4214, 16, 1, 3))

---

In [7]:
random.set_seed(11)

model=Sequential()
model.add(Conv2D(3, kernel_size=3, padding='same', input_shape=(16, 1, 3))) # 3차원
# 1차원으로 데이터 형태 변환 Layer => Flatten
model.add(Flatten())
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.3))
model.add(Dense(3, activation='softmax')) # 1차원

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 16, 1, 3)          84        
                                                                 
 flatten (Flatten)           (None, 48)                0         
                                                                 
 dense (Dense)               (None, 64)                3136      
                                                                 
 dropout (Dropout)           (None, 64)                0         
                                                                 
 dense_1 (Dense)             (None, 3)                 195       
                                                                 
Total params: 3,415
Trainable params: 3,415
Non-trainable params: 0
_________________________________________________________________


In [8]:
# 검증 데이터에 대한 loss 값이 3번 연속 개선되지 않으면 학습 중지
stopCB = EarlyStopping(monitor='val_loss', patience=3)

model.compile(loss='sparse_categorical_crossentropy',
              optimizer='adam',
              metrics='accuracy')

hist = model.fit(train_X, train_y,
                 batch_size=36,
                 validation_data = (val_X, val_y),
                 epochs=20,
                 callbacks=[stopCB])

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20


In [9]:
model.evaluate(test_X, test_y)



[0.2974095344543457, 0.8969248533248901]

---

In [10]:
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True)

In [11]:
hub_model = hub.load('https://tfhub.dev/google/movenet/multipose/lightning/1')
movenet = hub_model.signatures['serving_default']

In [121]:
cap = cv2.VideoCapture('KakaoTalk_20221115_102417080.mp4')
pre_val = ['get_on', 'get_off', 'nothing']
while cap.isOpened():
    ret, frame = cap.read()
    point = []
# frame = cv2.imread('image3.jpg')    
    # Resize image
    img = frame.copy()
    img = tf.image.resize_with_pad(tf.expand_dims(img, axis=0), 192,256)
    input_img = tf.cast(img, dtype=tf.int32)

    # Detection section
    results = movenet(input_img)
    keypoints_with_scores = results['output_0'].numpy()[:,:,:51].reshape((6,17,3))

    # Render keypoints 
    loop_through_people(frame, keypoints_with_scores, EDGES, 0.1)

    result = model.predict(np.array(point).reshape(-1,16,1,3))

    cv2.putText(frame, pre_val[result.argmax()], (200, 200), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 1, lineType=cv2.LINE_AA)
    cv2.imshow('Movenet Multipose', frame)
    #plt.figure(figsize=(15,20))
    #plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        
    if cv2.waitKey(1) & 0xFF==ord('q'):
        break
cap.release()
cv2.destroyAllWindows()

0 (360, 330, 0)
1 (364, 289, 0)
2 (363, 288, 0)
3 (364, 293, 0)
4 (367, 290, 0)
5 (364, 380, 0)
6 (378, 363, 0)
7 (355, 483, 0)
8 (385, 443, 0)
9 (346, 546, 0)
10 (380, 540, 0)
11 (357, 579, 0)
12 (377, 566, 0)
13 (347, 790, 0)
14 (380, 775, 0)
15 (368, 993, 0)
[[360 330   0]
 [364 289   0]
 [363 288   0]
 [364 293   0]
 [367 290   0]
 [364 380   0]
 [378 363   0]
 [355 483   0]
 [385 443   0]
 [346 546   0]
 [380 540   0]
 [357 579   0]
 [377 566   0]
 [347 790   0]
 [380 775   0]
 [368 993   0]]
0 (400, 758, 0)
1 (358, 549, 0)
2 (381, 540, 0)
3 (347, 753, 0)
4 (381, 750, 0)
5 (359, 902, 0)
6 (365, 886, 0)
0 (369, 309, 0)
1 (368, 299, 0)
2 (369, 289, 0)
3 (361, 307, 0)
4 (370, 301, 0)
5 (359, 362, 0)
6 (376, 345, 0)
7 (370, 387, 0)
8 (378, 395, 0)
[[400 758   0]
 [358 549   0]
 [381 540   0]
 [347 753   0]
 [381 750   0]
 [359 902   0]
 [365 886   0]
 [369 309   0]
 [368 299   0]
 [369 289   0]
 [361 307   0]
 [370 301   0]
 [359 362   0]
 [376 345   0]
 [370 387   0]
 [378 395   0]]


In [88]:
# Function to loop through each person detected and render
def loop_through_people(frame, keypoints_with_scores, edges, confidence_threshold):
    for person in keypoints_with_scores:
        draw_connections(frame, person, edges, confidence_threshold)
        draw_keypoints(frame, person, confidence_threshold)

In [110]:
def draw_keypoints(frame, keypoints, confidence_threshold):
    i = 0
    y, x, c = frame.shape
    shaped = np.squeeze(np.multiply(keypoints, [y,x,1]))

    for kp in shaped:
        if (len(point) <= 15):
            ky, kx, kp_conf = kp
            if kp_conf > confidence_threshold:
                cv2.circle(frame, (int(kx), int(ky)), 4, (0,255,0), -1)
                cv2.putText(frame, str(i), (int(kx), int(ky)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 1, lineType=cv2.LINE_AA)
                point.append((int(kx), int(ky), 0))
                print(i, (int(kx), int(ky), 0))
                i+=1
        elif len(point) > 16: break

In [98]:
EDGES = {
    (0, 1): 'm',
    (0, 2): 'c',
    (1, 3): 'm',
    (2, 4): 'c',
    (0, 5): 'm',
    (0, 6): 'c',
    (5, 7): 'm',
    (7, 9): 'm',
    (6, 8): 'c',
    (8, 10): 'c',
#    (5, 6): 'y',
    (5, 11): 'm',
    (6, 12): 'c',
#    (11, 12): 'y',
    (11, 13): 'm',
    (13, 15): 'm',
    (12, 14): 'c',
    (14, 16): 'c'
}

In [99]:
def draw_connections(frame, keypoints, edges, confidence_threshold):
    y, x, _ = frame.shape
    shaped = np.squeeze(np.multiply(keypoints, [y,x,1]))
    
    for edge, _ in edges.items():
        p1, p2 = edge
        y1, x1, c1 = shaped[p1]
        y2, x2, c2 = shaped[p2]
        
        if (c1 > confidence_threshold) & (c2 > confidence_threshold):      
            cv2.line(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0,0,255), 1)