<a href="https://colab.research.google.com/github/zooodung/Face_Image_Emotion_Classification/blob/JS/FP_PREPROCESSING.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Settings

### Drive Mount

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).


### Import

In [None]:
! pip install mtcnn

Collecting mtcnn
  Downloading mtcnn-0.1.1-py3-none-any.whl (2.3 MB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.4/2.3 MB[0m [31m11.5 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m2.3/2.3 MB[0m [31m43.5 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m31.8 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: mtcnn
Successfully installed mtcnn-0.1.1


In [None]:
import json
import numpy as np
import glob
import os
import matplotlib.pyplot as plt
import cv2
import pandas as pd
from matplotlib.colors import ListedColormap
from sklearn.preprocessing import StandardScaler
from skimage import color
import dlib
from google.colab.patches import cv2_imshow
from mtcnn import MTCNN
from torch.utils.data import DataLoader, Dataset
import torch


### PATH

In [None]:
TRAIN_JSON_PATH = '/content/drive/MyDrive/Project/Data/label/train/'
VALID_JSON_PATH = '/content/drive/MyDrive/Project/Data/label/val/'

TRAIN_IMG_PATH = '/content/drive/MyDrive/Project/Data/img/train/'
VALID_IMG_PATH = '/content/drive/MyDrive/Project/Data/img/val/'

TRAIN_SEGMENT_PATH = '/content/drive/MyDrive/Project/Data/segmentation/train/'
VALID_SEGMENT_PATH = '/content/drive/MyDrive/Project/Data/segmentation/val/'

TRAIN_AFTER_PREPROCESSING_IMG_PATH = '/content/drive/MyDrive/Project/Data/PREPROCESSED_IMG/train/'
VALID_AFTER_PREPROCESSING_IMG_PATH = '/content/drive/MyDrive/Project/Data/PREPROCESSED_IMG/val/'

### Data Load

In [None]:
with open (TRAIN_JSON_PATH + "train_anger.json", "r", encoding='euc-kr') as f:
    anger_label = json.load(f)
with open (TRAIN_JSON_PATH + "train_happy.json", "r", encoding='euc-kr') as f:
    happy_label = json.load(f)
with open (TRAIN_JSON_PATH + "train_panic.json", "r", encoding='euc-kr') as f:
    panic_label = json.load(f)
with open (TRAIN_JSON_PATH + "train_sadness.json", "r", encoding='euc-kr') as f:
    sadness_label = json.load(f)

with open (VALID_JSON_PATH + "val_anger.json", "r", encoding='euc-kr') as f:
    anger_label_val = json.load(f)
with open (VALID_JSON_PATH + "val_happy.json", "r", encoding='euc-kr') as f:
    happy_label_val = json.load(f)
with open (VALID_JSON_PATH + "val_panic.json", "r", encoding='euc-kr') as f:
    panic_label_val = json.load(f)
with open (VALID_JSON_PATH + "val_sadness.json", "r", encoding='euc-kr') as f:
    sadness_label_val = json.load(f)

anger_segment = np.load(TRAIN_SEGMENT_PATH + 'train_anger.npz')
happy_segment = np.load(TRAIN_SEGMENT_PATH + 'train_happy.npz')
panic_segment = np.load(TRAIN_SEGMENT_PATH + 'train_panic.npz')
sadness_segment = np.load(TRAIN_SEGMENT_PATH + 'train_sadness.npz')

anger_segment_val = np.load(VALID_SEGMENT_PATH + 'val_anger.npz')
happy_segment_val = np.load(VALID_SEGMENT_PATH + 'val_happy.npz')
panic_segment_val = np.load(VALID_SEGMENT_PATH + 'val_panic.npz')
sadness_segment_val = np.load(VALID_SEGMENT_PATH + 'val_sadness.npz')

## 전처리 함수

### Label 검증 (train, val)

In [None]:
def print_faceExp_count(counts, emotion):
    print(f"Counts for emotion '{emotion}'")
    print(f"  Zero annotation same   : {counts['zero']}")
    print(f"  One annotation same    : {counts['one']}")
    print(f"  Two annotations same   : {counts['two']}")
    print(f"  Three annotations same : {counts['three']}")
    print(f"  Sum of two, three : {counts['two'] + counts['three']}")
    print(f"  Sum of all        : {counts['zero'] + counts['one'] + counts['two'] + counts['three']}")

def verify_faceExp_consistent(label, emotion):
  counts = {'zero': 0, 'one': 0, 'two': 0, 'three': 0}
  verified_data = []

  for i in range(len(label)):
    emotion_count = 0
    for annot in ['annot_A', 'annot_B', 'annot_C']:
      if label[i][annot]['faceExp'] == emotion:
        emotion_count += 1

    if emotion_count == 1:
      counts['one'] += 1
    elif emotion_count == 2: # 2개 이상 일치시 리스트 추가
      counts['two'] += 1
      verified_data.append(label[i])
    elif emotion_count == 3: # 3개 일치시 리스트 추가
      counts['three'] += 1
      verified_data.append(label[i])
    else :
      counts['zero'] += 1

  print_faceExp_count(counts, emotion)

  return verified_data

In [None]:
verified_anger = verify_faceExp_consistent(anger_label, '분노')
verified_happy = verify_faceExp_consistent(happy_label, '기쁨')
verified_panic = verify_faceExp_consistent(panic_label, '당황')
verified_sadness = verify_faceExp_consistent(sadness_label, '슬픔')

anger = verified_anger[:1102]
happy = verified_happy[:1102]
panic = verified_panic[:1102]
sadness = verified_sadness[:1102]

Counts for emotion '분노'
  Zero annotation same   : 151
  One annotation same    : 231
  Two annotations same   : 398
  Three annotations same : 720
  Sum of two, three : 1118
  Sum of all        : 1500
Counts for emotion '기쁨'
  Zero annotation same   : 7
  One annotation same    : 13
  Two annotations same   : 76
  Three annotations same : 1398
  Sum of two, three : 1474
  Sum of all        : 1494
Counts for emotion '당황'
  Zero annotation same   : 170
  One annotation same    : 228
  Two annotations same   : 386
  Three annotations same : 716
  Sum of two, three : 1102
  Sum of all        : 1500
Counts for emotion '슬픔'
  Zero annotation same   : 146
  One annotation same    : 234
  Two annotations same   : 414
  Three annotations same : 706
  Sum of two, three : 1120
  Sum of all        : 1500


In [None]:
verified_anger_val = verify_faceExp_consistent(anger_label_val, '분노')
verified_happy_val = verify_faceExp_consistent(happy_label_val, '기쁨')
verified_panic_val = verify_faceExp_consistent(panic_label_val, '당황')
verified_sadness_val = verify_faceExp_consistent(sadness_label_val, '슬픔')

anger_val = verified_anger_val[:195]
happy_val = verified_happy_val[:195]
panic_val = verified_panic_val[:195]
sadness_val = verified_sadness_val[:195]

Counts for emotion '분노'
  Zero annotation same   : 22
  One annotation same    : 45
  Two annotations same   : 71
  Three annotations same : 162
  Sum of two, three : 233
  Sum of all        : 300
Counts for emotion '기쁨'
  Zero annotation same   : 3
  One annotation same    : 3
  Two annotations same   : 13
  Three annotations same : 281
  Sum of two, three : 294
  Sum of all        : 300
Counts for emotion '당황'
  Zero annotation same   : 40
  One annotation same    : 47
  Two annotations same   : 79
  Three annotations same : 134
  Sum of two, three : 213
  Sum of all        : 300
Counts for emotion '슬픔'
  Zero annotation same   : 45
  One annotation same    : 60
  Two annotations same   : 69
  Three annotations same : 126
  Sum of two, three : 195
  Sum of all        : 300


### IMG 데이터 전처리 함수 정의

In [None]:
# Face만 출력하는 함수
def mask_img(filename, path, segment):
  image = cv2.imread(path + filename)
  image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

  new_img = image.copy()

  # 이미지 face 만 남기고 나머지는 가림
  new_img[segment[filename] == 0] = 0
  new_img[segment[filename] == 1] = 0
  new_img[segment[filename] == 2] = 0
  new_img[segment[filename] == 4] = 0
  new_img[segment[filename] == 5] = 0

  return new_img

# Label 데이터를 기반으로 이미지 Crop하는 함수
def crop_face(label_data, img):

  boxes = [label_data['annot_A']['boxes'], label_data['annot_B']['boxes'], label_data['annot_C']['boxes']]
  avg_box = { # annot_A/B/C 각 좌표의 평균 값 구해서 저장
      'maxX': np.mean([box['maxX'] for box in boxes]),
      'maxY': np.mean([box['maxY'] for box in boxes]),
      'minX': np.mean([box['minX'] for box in boxes]),
      'minY': np.mean([box['minY'] for box in boxes])
  }

  new_image = img[int(avg_box['minY']):int(avg_box['maxY']), int(avg_box['minX']):int(avg_box['maxX'])] # 평균값으로 이미지 crop

  return new_image

# MTCNN 바운딩 박스와 특징점을 추출하는 함수
def extract_face_info_mtcnn(label_data, img):
  detector = MTCNN() # MTCNN 인스턴스 생성

  faces = detector.detect_faces(img) # 얼굴 감지
  #face_info_list = [] # 감지된 얼굴 정보를 저장할 리스트 // 하나의 이미지에 다수의 얼굴이 존재할 경우 사용

  # 감지된 모든 얼굴에 대해 반복
  for face in faces:
    # 얼굴의 바운딩 박스와 특징점 추출
    bounding_box = face['box']
    keypoints = face['keypoints']

    # 추출한 정보를 딕셔너리로 저장
    face_info = {
      'filename': label_data['filename'],
      'bounding_box': bounding_box,
      'keypoints': keypoints
    }

    # 얼굴 정보 리스트에 추가
    #face_info_list.append(face_info)

  return face_info

# 'left_eye'와 'right_eye'를 수평으로 이미지를 회전하는 함수
def rotate_image(image, angle):
  # 이미지의 중심 탐색
  height, width = image.shape[:2]
  center = (width / 2, height / 2)

  # 회전 변환 매트릭스 생성
  rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)

  # 이미지 회전
  rotated_image = cv2.warpAffine(image, rotation_matrix, (width, height))

  return rotated_image

# 기울기를 계산하는 함수
def calculate_angle(face_info):
  left_eye = face_info['keypoints']['left_eye']
  right_eye = face_info['keypoints']['right_eye']

  return np.arctan2(right_eye[1] - left_eye[1], right_eye[0] - left_eye[0]) * 180 / np.pi

# 랜드마크 비율 정규화 (눈 사이 거리 정규화/코 끝을 중심으로 이동/output_size 224x224)
def normalize_face(image, landmarks, output_size=(224, 224)):
    left_eye = np.array(landmarks['left_eye'])
    right_eye = np.array(landmarks['right_eye'])
    nose_tip = np.array(landmarks['nose'])

    # 눈 사이 거리
    eye_distance = np.linalg.norm(left_eye - right_eye)

    # 스케일링 비율
    desired_eye_distance = 0.3 * output_size[0]
    scale = desired_eye_distance / eye_distance

    # 눈 사이 거리 정규화
    M = np.array([[scale, 0, 0], [0, scale, 0]])
    scaled_image = cv2.warpAffine(image, M, (image.shape[1], image.shape[0]))

    # 코 끝을 기준으로 중심 이동
    eyes_center = ((left_eye + right_eye) * scale) / 2
    nose_center = nose_tip * scale
    offset_x = output_size[0] / 2 - nose_center[0]
    offset_y = output_size[1] / 2 - nose_center[1]
    M = np.array([[1, 0, offset_x], [0, 1, offset_y]])
    normalized_image = cv2.warpAffine(scaled_image, M, output_size)

    return normalized_image

# Dlib 이미지 특징점 추출
detector = dlib.get_frontal_face_detector()
predictor = dlib.shape_predictor('/content/drive/MyDrive/Project/Data/shape_predictor_68_face_landmarks.dat')

def extract_face_info_dlib(img):
  faces = detector(img, 1)

  #if len(faces) == 0:
  #  print("얼굴을 찾을 수 없습니다.")
  #  return

  for face in faces:
    # 얼굴 영역 추출
    x1 = face.left()
    y1 = face.top()
    x2 = face.right()
    y2 = face.bottom()
    face_area = img[y1:y2, x1:x2]

    # 얼굴 특징점 추출
    landmarks = predictor(img, face)

  return landmarks

# Dlib 얼굴 특징점 라벨링
def label_landmarks_by_region(landmarks):
  regions = {
      "face_outline": landmarks.parts()[0:17],
      "right_eyebrow": landmarks.parts()[17:22],
      "left_eyebrow": landmarks.parts()[22:27],
      "nose": landmarks.parts()[27:36],
      "right_eye": landmarks.parts()[36:42],
      "left_eye": landmarks.parts()[42:48],
      "mouth": landmarks.parts()[48:68],
  }
  return regions

## ProcessedImageData 클래스

In [None]:
class ProcessedImageData:
  def __init__(self, img_info, img, emotion):
    self.img_info = img_info
    self.img = img
    self.emotion = emotion

## Train, Val 전처리

In [None]:
def preprocess_data(path, segment_data, label_data, length, export_path):
  #processed_data = []
  #error_data = []

  for i in range(length):
    try:
      img_masked = mask_img(label_data[i]['filename'], path, segment_data)
      img_cropped = crop_face(label_data[i], img_masked)
      img_info = extract_face_info_mtcnn(label_data[i], img_cropped)
      img_rotated = rotate_image(img_cropped, calculate_angle(img_info))
      img_normalized = normalize_face(img_rotated, img_info['keypoints'])
      #img_label = label_landmarks_by_region(extract_face_info_dlib(img_normalized))
      rgb_image = cv2.cvtColor(img_normalized, cv2.COLOR_BGR2RGB)
      if(cv2.imwrite(export_path + label_data[i]['filename'], rgb_image)): print("true")
      #processed_data.append(ProcessedImageData(img_label, img_normalized, label_data[i]['faceExp_uploader']))
      print(path, " :", i)

    except ValueError as e:
      print(f"Unexpected error processing {label_data[i]['filename']}: {e}")
      ##error_data.append({
      #    'path': path,
      #    'filename': label_data[i]['filename'],
      #    'error': str(e)
      #})
    except Exception as e:
      print(f"Unexpected error processing {label_data[i]['filename']}: {e}")
      #error_data.append({
      #    'path': path,
      #    'filename': label_data[i]['filename'],
      #    'error': str(e)
      #})

  #return processed_data, error_data

In [None]:
preprocess_data(TRAIN_IMG_PATH + 'anger/', anger_segment, anger, len(anger), TRAIN_AFTER_PREPROCESSING_IMG_PATH + 'anger/')
preprocess_data(TRAIN_IMG_PATH + 'happy/', happy_segment, happy, len(happy), TRAIN_AFTER_PREPROCESSING_IMG_PATH + 'happy/')
preprocess_data(TRAIN_IMG_PATH + 'panic/', panic_segment, panic, len(panic), TRAIN_AFTER_PREPROCESSING_IMG_PATH + 'panic/')
preprocess_data(TRAIN_IMG_PATH + 'sadness/', sadness_segment, sadness, len(sadness), TRAIN_AFTER_PREPROCESSING_IMG_PATH + 'sadness/')

[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 773
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 774
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 775
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 776
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 777
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 778
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 779
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 780
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 781
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 782
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 783
true
/content/drive/MyDrive/Project/Data/img/train/sadness/sadness/  : 784
true
/content/drive/MyDrive/Project/Data/img/train

In [None]:
preprocess_data(VALID_IMG_PATH + 'anger/', anger_segment_val, anger_val, len(anger_val), VALID_AFTER_PREPROCESSING_IMG_PATH + 'anger/')
preprocess_data(VALID_IMG_PATH + 'happy/', happy_segment_val, happy_val, len(happy_val), VALID_AFTER_PREPROCESSING_IMG_PATH + 'happy/')
preprocess_data(VALID_IMG_PATH + 'panic/', panic_segment_val, panic_val, len(panic_val), VALID_AFTER_PREPROCESSING_IMG_PATH + 'panic/')
preprocess_data(VALID_IMG_PATH + 'sadness/', sadness_segment_val, sadness_val, len(sadness_val), VALID_AFTER_PREPROCESSING_IMG_PATH + 'sadness/')

true
/content/drive/MyDrive/Project/Data/img/val/anger/anger/  : 0




true
/content/drive/MyDrive/Project/Data/img/val/anger/anger/  : 1




[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 60
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 61
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 62
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 63
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 64
Unexpected error processing mjzi1250ecc9fac4dfb4eaf4a9b17e90eb5be2912c1c8b9cd2f5810a1c24e3jz0.jpg: local variable 'face_info' referenced before assignment
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 66
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 67
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 68
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 69
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 70
true
/content/drive/MyDrive/Project/Data/img/val/happy/happy/  : 71
true
/content/drive/MyDrive/Project/Data/img/va