In [2]:
pip install tensorflow-addons h5py tqdm



In [None]:
from google.colab import drive
drive.mount('/content/drive')

ValueError: mount failed

In [None]:
import numpy as np
import pickle
from tqdm import tqdm
import tensorflow as tf
from sklearn.decomposition import PCA
import os
import h5py
from PIL import Image

# GPU Configuration
physical_devices = tf.config.list_physical_devices('GPU')
if physical_devices:
    for device in physical_devices:
        try:
            tf.config.experimental.set_memory_growth(device, True)
            tf.config.set_logical_device_configuration(
                device, [tf.config.LogicalDeviceConfiguration(memory_limit=13000)]
            )
            print("Configured GPU with a memory limit of 13,000 MB.")
        except Exception as e:
            print(f"Error configuring GPU: {e}")
else:
    print("No GPU detected, running on CPU.")

try:
    tf.keras.mixed_precision.set_global_policy('mixed_float16')
    print("Mixed precision enabled for speedup.")
except ValueError:
    print("Mixed precision not supported, running with default precision.")

# File paths
spatial_fake_path = 'drive/MyDrive/features/spatial_valid_fake.pkl'
spatial_real_path = 'drive/MyDrive/features/spatial_valid_real.pkl'
landmark_fake_path = 'drive/MyDrive/features/landmarks_fake_valid.pkl'
landmark_real_path = 'drive/MyDrive/features/landmarks_real_valid.pkl'
fused_output_path = 'drive/MyDrive/features/fused_features.h5'
os.makedirs(os.path.dirname(fused_output_path), exist_ok=True)

# Function to load and preprocess images from file paths inside dictionaries
def load_and_preprocess_images(file_data, process_images=True):
    processed_images = []
    for data in tqdm(file_data, desc="Loading images"):
        try:
            image_path = data.get('image_name')  # Updated key for image path
            if process_images and image_path:
                image = Image.open(image_path).convert("RGB")  # Open and convert to RGB
                image = image.resize((224, 224))  # Resize to a fixed size
                image_array = np.array(image, dtype=np.float32) / 255.0  # Normalize
                processed_images.append(image_array)
            elif not process_images:
                features = data.get('features')  # Use features directly if no image loading
                if features is not None:
                    processed_images.append(features)
                else:
                    print(f"Warning: No features found in data: {data}")
            else:
                print(f"Warning: No image path found in data: {data}")
        except Exception as e:
            print(f"Error loading image {data.get('image_name', 'Unknown')}: {e}")
    return np.array(processed_images)

# Adjusted data loading with feature-only handling
print("Converting file paths to image arrays...")
spatial_fake = load_and_preprocess_images(spatial_fake_data, process_images=True)
spatial_real = load_and_preprocess_images(spatial_real_data, process_images=True)
landmark_fake = load_and_preprocess_images(landmark_fake_data, process_images=True)
landmark_real = load_and_preprocess_images(landmark_real_data, process_images=True)

# Function to load features
def load_features(file_path):
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
        return data

# Data augmentation function
def augment_data(features, target_size):
    current_size = len(features)
    augmented_samples = []
    while len(augmented_samples) + current_size < target_size:
        sample = features[np.random.randint(current_size)]
        noise = np.random.normal(0, 0.01, sample.shape)
        augmented_samples.append(sample + noise)
    return np.concatenate([features, np.stack(augmented_samples)], axis=0)

# Align features by padding
def align_features(features):
    max_shape = tuple(np.max([np.array(f).shape for f in features], axis=0))
    aligned_features = []
    for feature in features:
        feature = np.array(feature)
        pad_width = [(0, max_dim - cur_dim) for cur_dim, max_dim in zip(feature.shape, max_shape)]
        aligned_features.append(np.pad(feature, pad_width, mode='constant'))
    return np.stack(aligned_features, axis=0)

# PCA for dimensionality reduction
def apply_pca(features, variance_threshold=0.95):
    reshaped_features = features.reshape(features.shape[0], -1)
    pca = PCA(n_components=variance_threshold)
    reduced_features = pca.fit_transform(reshaped_features)
    print(f"PCA reduced dimensions from {reshaped_features.shape[1]} to {reduced_features.shape[1]} "
          f"(explained variance: {pca.explained_variance_ratio_.sum():.2f})")
    return reduced_features

# Load features
print("Loading features...")
spatial_fake_data = load_features(spatial_fake_path)
spatial_real_data = load_features(spatial_real_path)
landmark_fake_data = load_features(landmark_fake_path)
landmark_real_data = load_features(landmark_real_path)

# Convert dictionaries with file paths to image arrays
print("Converting file paths to image arrays...")
spatial_fake = load_and_preprocess_images(spatial_fake_data)
spatial_real = load_and_preprocess_images(spatial_real_data)
landmark_fake = load_and_preprocess_images(landmark_fake_data)
landmark_real = load_and_preprocess_images(landmark_real_data)

# Balance the dataset
print("Balancing the dataset...")
if len(spatial_fake) < len(spatial_real):
    spatial_fake = augment_data(spatial_fake, len(spatial_real))
if len(landmark_fake) < len(landmark_real):
    landmark_fake = augment_data(landmark_fake, len(landmark_real))

# Align features
print("Aligning features...")
aligned_spatial_fake = align_features(spatial_fake)
aligned_spatial_real = align_features(spatial_real)
aligned_landmark_fake = align_features(landmark_fake)
aligned_landmark_real = align_features(landmark_real)

# Fuse features
print("Fusing features...")
fused_fake = np.concatenate([aligned_spatial_fake, aligned_landmark_fake], axis=-1)
fused_real = np.concatenate([aligned_spatial_real, aligned_landmark_real], axis=-1)

# Apply PCA
print("Applying PCA for dimensionality reduction...")
pca_fake = apply_pca(fused_fake)
pca_real = apply_pca(fused_real)

# Save fused features
print("Saving fused features...")
with h5py.File(fused_output_path, 'w') as h5f:
    h5f.create_dataset('pca_fake_features', data=pca_fake)
    h5f.create_dataset('pca_real_features', data=pca_real)

print(f"PCA-reduced features successfully saved to {fused_output_path}.")


Output hidden; open in https://colab.research.google.com to view.

In [None]:
import os
import pickle
import h5py
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.mixed_precision import set_global_policy

# Set mixed precision policy
set_global_policy('mixed_float16')

# File paths
spatial_fake_path = 'drive/MyDrive/features/spatial_valid_fake.pkl'
spatial_real_path = 'drive/MyDrive/features/spatial_valid_real.pkl'
landmark_fake_path = 'drive/MyDrive/features/landmarks_fake_valid.pkl'
landmark_real_path = 'drive/MyDrive/features/landmarks_real_valid.pkl'
fused_output_path = 'drive/MyDrive/features/fused_features.h5'

# Load features from pickle files and convert to NumPy array
def load_features(file_path):
    with open(file_path, 'rb') as f:
        data = pickle.load(f)
    if isinstance(data, dict):
        data = np.array([np.array(v, dtype=np.float32) for v in data.values()], dtype=np.float32)  # Ensure consistent dtype
    elif isinstance(data, list):
        data = np.array([np.array(item, dtype=np.float32) for item in data], dtype=np.float32)  # Handle list of arrays
    else:
        data = np.array(data, dtype=np.float32)  # Convert to NumPy array with float32 dtype
    return data

# Ensure shape compatibility by padding or truncating features
def align_shapes(features, target_shape):
    features = np.atleast_2d(features)  # Ensure at least 2D
    current_shape = features.shape
    if current_shape[1] > target_shape[1]:
        features = features[:, :target_shape[1]]  # Truncate
    elif current_shape[1] < target_shape[1]:
        padding = target_shape[1] - current_shape[1]
        features = np.pad(features, ((0, 0), (0, padding)), mode='constant', constant_values=0)  # Pad with zeros
    return features

# Determine the target shape dynamically
def get_target_shape(*feature_arrays):
    max_width = 0
    for array in feature_arrays:
        if array.size == 0:  # Check if array is empty
            raise ValueError("One of the feature arrays is empty.")
        array = np.atleast_2d(array)  # Ensure at least 2D
        max_width = max(max_width, array.shape[1])
    return (None, max_width)  # None for the first dimension

# Fuse features by concatenation
def fuse_features(spatial_features, landmark_features):
    return np.concatenate([spatial_features, landmark_features], axis=1)

# Main fusion process
def process_and_fuse(spatial_fake_path, spatial_real_path, landmark_fake_path, landmark_real_path, fused_output_path):
    # Load features
    with ThreadPoolExecutor(max_workers=4) as executor:
        spatial_fake_future = executor.submit(load_features, spatial_fake_path)
        spatial_real_future = executor.submit(load_features, spatial_real_path)
        landmark_fake_future = executor.submit(load_features, landmark_fake_path)
        landmark_real_future = executor.submit(load_features, landmark_real_path)

        spatial_fake = spatial_fake_future.result()
        spatial_real = spatial_real_future.result()
        landmark_fake = landmark_fake_future.result()
        landmark_real = landmark_real_future.result()

    # Determine target shape
    target_shape = get_target_shape(spatial_fake, spatial_real, landmark_fake, landmark_real)

    # Align shapes
    spatial_fake = align_shapes(spatial_fake, target_shape)
    spatial_real = align_shapes(spatial_real, target_shape)
    landmark_fake = align_shapes(landmark_fake, target_shape)
    landmark_real = align_shapes(landmark_real, target_shape)

    # Fuse features
    fused_fake = fuse_features(spatial_fake, landmark_fake)
    fused_real = fuse_features(spatial_real, landmark_real)

    # Save fused features to an H5 file
    with h5py.File(fused_output_path, 'w') as h5f:
        h5f.create_dataset('fused_fake', data=fused_fake, compression='gzip', dtype='float32')
        h5f.create_dataset('fused_real', data=fused_real, compression='gzip', dtype='float32')

    print(f"Fused features saved to {fused_output_path}")

# GPU configuration
def configure_gpu():
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
            print(f"Configured {len(gpus)} GPU(s) for memory growth.")
        except RuntimeError as e:
            print(e)

if __name__ == "__main__":
    configure_gpu()
    process_and_fuse(spatial_fake_path, spatial_real_path, landmark_fake_path, landmark_real_path, fused_output_path)


Configured 1 GPU(s) for memory growth.


TypeError: float() argument must be a string or a real number, not 'dict'