In [1]:
import numpy as np
import pandas as pd
import cv2
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Flatten, Dropout, Conv2D, MaxPooling2D, UpSampling2D, Input, concatenate, BatchNormalization
from tensorflow.keras.preprocessing.image import img_to_array, load_img, ImageDataGenerator
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import tensorflow as tf
import os
import logging
from scipy.interpolate import splprep, splev
from shapely.geometry import LineString

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Paths
train_folder = 'project_directory/train_folder'
test_csv_file = 'project_directory/test_folder/test.csv'
output_csv_file = 'output.csv'

IMAGE_SIZE = (128, 128)
NUM_CLASSES = 10

def log_info(message):
    logging.info(message)

def load_images_from_folder(folder, image_size):
    images = []
    labels = []
    for label in os.listdir(folder):
        label_folder = os.path.join(folder, label)
        if os.path.isdir(label_folder):
            for filename in os.listdir(label_folder):
                img_path = os.path.join(label_folder, filename)
                image = load_img(img_path, target_size=image_size, color_mode='rgb')
                image = img_to_array(image)
                images.append(image)
                labels.append(label)
    return np.array(images), np.array(labels)

def preprocess_doodles(train_folder, test_csv_file, image_size):
    X_train, y_train = load_images_from_folder(train_folder, image_size)
    X_test, _ = preprocess_test_csv(test_csv_file, image_size)
    validate_input_data(X_train, y_train)
    return X_train, X_test, y_train, _

def preprocess_test_csv(csv_file, image_size):
    df = pd.read_csv(csv_file, header=None)
    images = []
    for _, row in df.iterrows():
        bezier_points = np.array(row).reshape(-1, 2)
        image = bezier_to_image(bezier_points, image_size)
        images.append(image)
    return np.array(images), None

def bezier_to_image(bezier_points, image_size):
    image = np.zeros(image_size, dtype=np.uint8)
    for i in range(len(bezier_points) - 1):
        pt1 = (int(bezier_points[i][0] * image_size[0]), int(bezier_points[i][1] * image_size[1]))
        pt2 = (int(bezier_points[i + 1][0] * image_size[0]), int(bezier_points[i + 1][1] * image_size[1]))
        cv2.line(image, pt1, pt2, 255, 1)
    image = cv2.cvtColor(image, cv2.COLOR_GRAY2RGB)
    return image

def build_model(input_shape, num_classes):
    model = Sequential([
        Conv2D(32, (3, 3), activation='relu', input_shape=input_shape, kernel_regularizer=l2(0.01)),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu', kernel_regularizer=l2(0.01)),
        BatchNormalization(),
        MaxPooling2D((2, 2)),
        Conv2D(64, (3, 3), activation='relu', kernel_regularizer=l2(0.01)),
        BatchNormalization(),
        Flatten(),
        Dense(64, activation='relu', kernel_regularizer=l2(0.01)),
        BatchNormalization(),
        Dropout(0.5),
        Dense(num_classes, activation='softmax')
    ])
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

def build_unet(input_size=(128, 128, 3)):
    inputs = Input(input_size)

    def conv_block(x, filters):
        x = Conv2D(filters, 3, activation='relu', padding='same', kernel_regularizer=l2(0.01))(x)
        x = BatchNormalization()(x)
        x = Conv2D(filters, 3, activation='relu', padding='same', kernel_regularizer=l2(0.01))(x)
        x = BatchNormalization()(x)
        return x

    conv1 = conv_block(inputs, 64)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = conv_block(pool1, 128)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = conv_block(pool2, 256)

    up4 = UpSampling2D(size=(2, 2))(conv3)
    up4 = concatenate([up4, conv2])
    conv4 = conv_block(up4, 128)

    up5 = UpSampling2D(size=(2, 2))(conv4)
    up5 = concatenate([up5, conv1])
    conv5 = conv_block(up5, 64)

    outputs = Conv2D(3, 1, activation='sigmoid')(conv5)

    model = Model(inputs=inputs, outputs=outputs)
    model.compile(optimizer='adam', loss='mse')
    return model

def create_masks(images, mask_percentage=0.2):
    masks = np.ones_like(images)
    for i in range(len(images)):
        n_pixels = images[i].shape[0] * images[i].shape[1]
        n_mask = int(n_pixels * mask_percentage)
        mask_indices = np.random.choice(n_pixels, n_mask, replace=False)
        masks[i].reshape(-1, 3)[mask_indices] = 0
    return masks

def train_model(model, images, labels):
    datagen = ImageDataGenerator(
        rotation_range=15,
        width_shift_range=0.15,
        height_shift_range=0.15,
        zoom_range=0.15,
        shear_range=0.1,
        brightness_range=[0.8, 1.2],
        horizontal_flip=True,
        fill_mode='nearest'
    )
    datagen.fit(images)

    checkpoint = ModelCheckpoint('best_model.h5', monitor='val_loss', save_best_only=True, mode='min')
    early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

    model.fit(datagen.flow(images, labels, batch_size=32), epochs=50, validation_split=0.1,
              callbacks=[checkpoint, early_stop])

def train_inpainting_model(images, epochs=50, batch_size=32):
    model = build_unet(input_size=images.shape[1:])

    for epoch in range(epochs):
        log_info(f"Epoch {epoch+1}/{epochs}")
        masks = create_masks(images)
        masked_images = images * masks
        model.fit(masked_images, images, batch_size=batch_size, epochs=1, verbose=1)

    return model

def detect_symmetry(image, threshold=0.95):
    gray = cv2.cvtColor(image.astype(np.uint8), cv2.COLOR_RGB2GRAY)
    height, width = gray.shape
    left = gray[:, :width//2]
    right = cv2.flip(gray[:, width//2:], 1)
    
    similarity = np.sum(left == right) / (height * (width//2))
    return similarity > threshold

def detect_rotational_symmetry(image, threshold=0.95):
    gray = cv2.cvtColor(image.astype(np.uint8), cv2.COLOR_RGB2GRAY)
    height, width = gray.shape
    rotated = cv2.rotate(gray, cv2.ROTATE_180)
    similarity = np.sum(gray == rotated) / (height * width)
    return similarity > threshold

def apply_symmetry(image):
    height, width, _ = image.shape
    left_half = image[:, :width//2]
    symmetrical_image = np.concatenate((left_half, cv2.flip(left_half, 1)), axis=1)
    return symmetrical_image

def complete_incomplete_curves(images, model):
    completed_images = []
    for image in images:
        is_symmetric = detect_symmetry(image)
        is_rotational_symmetric = detect_rotational_symmetry(image)

        mask = np.all(image == [255, 255, 255], axis=-1).astype(np.float32)
        mask = np.expand_dims(mask, axis=-1)
        mask = np.repeat(mask, 3, axis=-1)

        masked_image = image * (1 - mask)
        completed = model.predict(np.expand_dims(masked_image, 0))[0]
        result = image * (1 - mask) + completed * mask
        
        if is_symmetric:
            result = apply_symmetry(result)
        
        if is_rotational_symmetric:
            result = apply_symmetry(cv2.rotate(result, cv2.ROTATE_180))
        
        completed_images.append(result)

    return np.array(completed_images)

def save_bezier_coordinates_to_csv(bezier_coordinates, output_csv_file):
    df = pd.DataFrame(bezier_coordinates)
    df.to_csv(output_csv_file, index=False)

def process_curve_points(points, method='spline'):
    if method == 'spline':
        tck, _ = splprep(points.T, s=0)
        new_points = splev(np.linspace(0, 1, 100), tck)
        return np.array(new_points).T
    elif method == 'polynomial':
        coeffs = np.polyfit(points[:, 0], points[:, 1], deg=3)
        poly = np.poly1d(coeffs)
        return np.vstack([points[:, 0], poly(points[:, 0])]).T
    elif method == 'fourier':
        # Implement Fourier transform based regularization
        pass
    else:
        return points  # Default to no regularization

def simplify_curve(points, tolerance=0.01):
    line = LineString(points)
    simplified = line.simplify(tolerance, preserve_topology=True)
    return np.array(simplified)

def validate_input_data(images, labels):
    if images.shape[0] != labels.shape[0]:
        raise ValueError("Number of images does not match number of labels.")
    if images.shape[1:] != (128, 128, 3):
        raise ValueError(f"Image shape {images.shape[1:]} is not as expected (128, 128, 3).")
    if labels.ndim != 1:
        raise ValueError("Labels should be a 1D array.")

def plot_curves(original, processed, title='Curve'):
    import matplotlib.pyplot as plt

    plt.figure(figsize=(6, 3))
    plt.plot(original[:, 0], original[:, 1], label='Original', linestyle='--', color='gray')
    plt.plot(processed[:, 0], processed[:, 1], label='Processed', color='blue')
    plt.title(title)
    plt.legend()
    plt.show()

def detect_edges(image):
    gray = cv2.cvtColor(image.astype(np.uint8), cv2.COLOR_RGB2GRAY)
    edges = cv2.Canny(gray, 100, 200)
    points = np.argwhere(edges > 0)
    return points

def evaluate_model(model, X_test, y_test):
    predictions = model.predict(X_test)
    predicted_labels = np.argmax(predictions, axis=1)
    print("Classification Report:")
    print(classification_report(y_test, predicted_labels))

def main():
    log_info("Loading and preprocessing data...")
    X_train, X_test, y_train, y_test = preprocess_doodles(train_folder, test_csv_file, IMAGE_SIZE)

    log_info("Building and training model...")
    model = build_model(X_train.shape[1:], NUM_CLASSES)
    train_model(model, X_train, y_train)

    log_info("Training inpainting model...")
    inpainting_model = train_inpainting_model(X_train)

    log_info("Completing incomplete curves...")
    completed_images = complete_incomplete_curves(X_test, inpainting_model)

    log_info("Processing curves...")
    bezier_coordinates = []
    for image in completed_images:
        points = detect_edges(image)
        points = process_curve_points(points, method='spline')
        points = simplify_curve(points)
        bezier_coordinates.append(points.flatten())

    log_info("Saving bezier coordinates...")
    save_bezier_coordinates_to_csv(bezier_coordinates, output_csv_file)

    log_info("Evaluating model...")
    evaluate_model(model, X_test, y_test)

if __name__ == "__main__":
    main()


2024-08-10 01:39:34,912 - INFO - Loading and preprocessing data...
