In [1]:
import cv2
import numpy as np
import mediapipe as mp
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import pandas as pd
import csv
import os

2024-11-07 16:20:00.984498: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# Load Model

In [2]:
BaseOptions = mp.tasks.BaseOptions
FaceLandmarker = mp.tasks.vision.FaceLandmarker
FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode

options = FaceLandmarkerOptions(
    base_options=BaseOptions(model_asset_path='../model/face_landmarker.task'),
    running_mode=VisionRunningMode.IMAGE)

face_mesh_connections = mp.solutions.face_mesh.FACEMESH_TESSELATION

In [3]:
DIRECTORY = r"../Image_data/DATASET/train"
CATEGORIES = []

try:
    folders = os.listdir(DIRECTORY)
    print(f"Directories in '{DIRECTORY}':")
    for folder in folders:
        if os.path.isdir(os.path.join(DIRECTORY, folder)):
            CATEGORIES.append(folder)
except ValueError as e:
    print(e)

print("Categories:", CATEGORIES)

Directories in '../Image_data/DATASET/train':
Categories: ['7', '6', '1', '4', '3', '2', '5']


# Get file

In [16]:
# 指定處理類別
desired_category = '7'

def paths(desired_category):
    # Check if the category directory exists
    category_path = os.path.join(DIRECTORY, desired_category)

    if not os.path.isdir(category_path):
        print(f"Category '{desired_category}' does not exist in '{DIRECTORY}'.")
        # List available categories if the desired one is missing
        available_categories = [folder for folder in os.listdir(DIRECTORY) if os.path.isdir(os.path.join(DIRECTORY, folder))]
        print("Available categories:", available_categories)
        # Raise an error if the category does not exist
        raise FileNotFoundError(f"Category '{category_path}' not found.")

    # Create the output directory structure if it does not exist
    output_dir = 'output_data'
    landmarks_dir = os.path.join(output_dir, 'landmarks')
    connections_dir = os.path.join(output_dir, 'connections')

    # Ensure output directories exist
    for path in [landmarks_dir, connections_dir]:
        if not os.path.exists(path):
            os.makedirs(path)
            print(f"Created directory: {path}")

    # Specify the output file paths for landmarks and connections
    landmarks = os.path.join(landmarks_dir, f'face_landmarks_{desired_category}.csv')
    connections = os.path.join(connections_dir, f'connections_{desired_category}.csv')
    
    # Return the paths for further use
    return landmarks, connections, category_path

landmarks_file, connections_file, category_path = paths(desired_category)

In [None]:
# create CSV
with open(landmarks_file, 'w', newline='') as landmarks_csv, \
     open(connections_file, 'w', newline='') as connections_csv:
    
    landmarks_writer = csv.writer(landmarks_csv)
    connections_writer = csv.writer(connections_csv)
    
    landmarks_writer.writerow(["image_name", "category", "landmark_index", "x", "y", "z"])  # Landmarks
    connections_writer.writerow(["image_name", "category", "point1", "point2"])  # Connections
    
    # create FaceLandmarker
    with FaceLandmarker.create_from_options(options) as landmarker:
        folder = category_path
        print(f"Processing category: {desired_category}")
        
        # Iterate through each image in the specified category
        for image_name in os.listdir(folder):
            image_path = os.path.join(folder, image_name)
            frame = cv2.imread(image_path)
            
            if frame is None:
                print(f"Cannot read image: {image_name}")
                continue

            h, w = frame.shape[:2]
            # to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            # create Mediapipe image
            mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)
            
            # FaceLandmarker identify face
            face_landmarker_result = landmarker.detect(mp_image)
            
            # Get face feature
            face_landmarks_list = face_landmarker_result.face_landmarks

            if not face_landmarks_list:
                print(f"No face detected in image: {image_name}")
                continue

            # process face feature
            for face_landmarks in face_landmarks_list:
                # save landmarks
                for idx, landmark in enumerate(face_landmarks):
                    x = landmark.x * w
                    y = landmark.y * h
                    z = landmark.z * w
                    landmarks_writer.writerow([image_name, desired_category, idx, x, y, z])
                
                # save connection
                for connection in face_mesh_connections:
                    point1 = connection[0]
                    point2 = connection[1]
                    connections_writer.writerow([image_name, desired_category, point1, point2])
            
            print(f"Processed: {image_name} in category {desired_category}")

In [18]:
landmarks_file = f'output_data/landmarks/face_landmarks_{desired_category}.csv'
connections_file = f'output_data/connections/connections_{desired_category}.csv'

landmarks_df = pd.read_csv(landmarks_file)
connections_df = pd.read_csv(connections_file)

output_folder = f'output_data/adjacency/adjacency_{desired_category}'
os.makedirs(output_folder, exist_ok=True)

In [None]:
# If 'image_name' column is not present in connections data, retrieve connection indices in advance
if 'image_name' not in connections_df.columns:
    point_indices_1 = connections_df['point1'].astype(int).values
    point_indices_2 = connections_df['point2'].astype(int).values
else:
    connections_grouped = connections_df.groupby('image_name')

for image_name in landmarks_df['image_name'].unique():
    # Filter landmarks for the current image
    image_landmarks_df = landmarks_df[landmarks_df['image_name'] == image_name].reset_index(drop=True)
    
    # Extract coordinates and the number of landmarks for the image
    points_coordinates = image_landmarks_df[['x', 'y', 'z']].values
    num_points = len(points_coordinates)
    
    # Initialize the adjacency matrix
    adjacency_matrix = np.zeros((num_points, num_points))
    
    if 'image_name' in connections_df.columns:
        # Filter connection data for the current image
        if image_name in connections_grouped.groups:
            image_connections_df = connections_grouped.get_group(image_name)
            point_indices_1 = image_connections_df['point1'].astype(int).values
            point_indices_2 = image_connections_df['point2'].astype(int).values
        else:
            print(f"No connections found for image: {image_name}")
            continue
    else:
        pass  # Already retrieved outside, no need to repeat

    # Ensure indices are within valid range
    valid_indices = (point_indices_1 < num_points) & (point_indices_2 < num_points)
    point_indices_1 = point_indices_1[valid_indices]
    point_indices_2 = point_indices_2[valid_indices]
    
    # Retrieve the corresponding coordinates
    coords1 = points_coordinates[point_indices_1]
    coords2 = points_coordinates[point_indices_2]
    
    # Compute distances for all connection pairs
    distances = np.linalg.norm(coords1 - coords2, axis=1)
    
    # Fill the adjacency matrix with distances
    adjacency_matrix[point_indices_1, point_indices_2] = distances
    adjacency_matrix[point_indices_2, point_indices_1] = distances  # Symmetric
    
    adjacency_df = pd.DataFrame(adjacency_matrix)
    output_path = os.path.join(output_folder, f'adjacency_matrix_{image_name}.csv')
    adjacency_df.to_csv(output_path, index=False)
    print(f"Saved adjacency matrix for {image_name} to {output_path}")

# Test

In [20]:
import os

# Define the directories for images and adjacency matrices
IMAGE_DIRECTORY = r"../Image_data/DATASET/train/" + desired_category
ADJACENCY_DIRECTORY = r"output_data/adjacency/adjacency_" + desired_category

def verify_file_pairing(image_dir, adjacency_dir):
    """
    Verify if each adjacency matrix file has a corresponding image file.
    """
    # List all adjacency matrix files
    adjacency_files = [f for f in os.listdir(adjacency_dir) if f.endswith('.csv')]

    unmatched_files = []

    for adjacency_file in adjacency_files:
        # Extract the base name without extensions for matching
        base_name = os.path.splitext(adjacency_file.replace('adjacency_matrix_', '').replace('.csv', ''))[0]

        # Construct the corresponding image file name
        image_file = f"{base_name}.jpg"
        image_path = os.path.join(image_dir, image_file)

        if not os.path.exists(image_path):
            unmatched_files.append((adjacency_file, image_file))

    if unmatched_files:
        print("The following adjacency matrices do not have matching image files:")
        for adj, img in unmatched_files:
            print(f"Adjacency Matrix: {adj} | Expected Image: {img}")
    else:
        print("All adjacency matrices have corresponding image files.")

# Run the verification
verify_file_pairing(IMAGE_DIRECTORY, ADJACENCY_DIRECTORY)


All adjacency matrices have corresponding image files.
