In [3]:
import os 
import sys
import random
import torch
import numpy as np
from torchvision import transforms
from PIL import Image, ImageFilter, ImageEnhance
import os
import shutil
import os
import shutil
import random
from PIL import Image
import pandas as pd

current_file_dir = os.getcwd()

root_dir = os.path.dirname(current_file_dir)

sys.path.append(root_dir)
dataset_dir = os.path.join(root_dir, 'data')


processed_dir = os.path.join(dataset_dir, 'processed')
if not os.path.exists(processed_dir):
    os.makedirs(processed_dir)

In [None]:



def generate_occluded_dataset_dir(org_dataset_dir, occluded_dataset_name='occluded', occulude_ratio=0.6):
    mask_dir = os.path.join(org_dataset_dir, 'masked')
    cam_dir = os.path.join(org_dataset_dir, 'cam')
    pc_dir = os.path.join(org_dataset_dir, 'pc')

    csv_path = os.path.join(org_dataset_dir, 'trainval.csv')
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"CSV file not found at {csv_path}")

    train_validate_df = pd.read_csv(csv_path)
    trainable_files = train_validate_df[train_validate_df['trainable'] == 1]['filename'].tolist()

    if not trainable_files:
        raise ValueError("No trainable files found in CSV.")

    total_samples = len(trainable_files)
    num_occluded = int(total_samples * occulude_ratio)
    occlude_indices = random.sample(range(total_samples), num_occluded)

    # Create occluded dataset directory (one level up from original)
    occluded_dataset_dir = os.path.join(os.path.dirname(org_dataset_dir), occluded_dataset_name)
    occluded_cam_dir = os.path.join(occluded_dataset_dir, 'cam')
    occluded_pc_dir = os.path.join(occluded_dataset_dir, 'pc')
    occluded_masked_dir = os.path.join(occluded_dataset_dir, 'masked')

    os.makedirs(occluded_cam_dir, exist_ok=True)
    os.makedirs(occluded_pc_dir, exist_ok=True)
    os.makedirs(occluded_masked_dir, exist_ok=True)

    # Sample image to determine size
    sample_image_path = os.path.join(cam_dir, trainable_files[0])
    if not os.path.exists(sample_image_path):
        raise FileNotFoundError(f"Sample image not found at {sample_image_path}")
    try:
        image = Image.open(sample_image_path).convert('RGB')
    except Exception as e:
        shutil.rmtree(occluded_dataset_dir)
        raise RuntimeError(f"Error reading sample image {sample_image_path}") from e

    empty_image = Image.new('RGB', image.size, (0, 0, 0))
    processed_files = []

    try:
        for idx in occlude_indices:
            original_name = trainable_files[idx]
            base_name, ext = os.path.splitext(original_name)
            occluded_name = f"{base_name}_occluded{ext}"

            # Paths to original files
            cam_path = os.path.join(cam_dir, original_name)
            pc_path = os.path.join(pc_dir, original_name)
            mask_path = os.path.join(mask_dir, original_name)
            for path in [cam_path, pc_path, mask_path]:
                if not os.path.exists(path):
                    raise FileNotFoundError(f"Required file not found: {path}")

            # Random occlusion target for each file
            target_to_occlude = random.choice(['cam', 'point_cloud'])

            if target_to_occlude == 'cam':
                empty_image.save(os.path.join(occluded_cam_dir, occluded_name))
                shutil.copy(pc_path, os.path.join(occluded_pc_dir, occluded_name))
            elif target_to_occlude == 'point_cloud':
                shutil.copy(cam_path, os.path.join(occluded_cam_dir, occluded_name))
                empty_image.save(os.path.join(occluded_pc_dir, occluded_name))
            else:
                raise ValueError(f"Invalid occlusion target: {target_to_occlude}")

            shutil.copy(mask_path, os.path.join(occluded_masked_dir, occluded_name))
            processed_files.append(occluded_name)

    except Exception as e:
        shutil.rmtree(occluded_dataset_dir, ignore_errors=True)
        raise RuntimeError(f"Failed while processing occluded dataset: {str(e)}") from e

    return occluded_dataset_dir, sorted(processed_files)


In [None]:
occluded_dataset_dir, processed_files = generate_occluded_dataset_dir(
    org_dataset_dir=os.path.join(processed_dir, 'lidarseg_images', 'trainval'),
    occluded_dataset_name='occluded',
    occulude_ratio=0.6
)
print(f"Occluded dataset created at: {occluded_dataset_dir}")
print(f"Processed files: {processed_files}")

In [None]:
len(processed_files)

In [None]:
## random split train and val
train_files = random.sample(processed_files, int(len(processed_files) * 0.8))
val_files = list(set(processed_files) - set(train_files))
train_df = pd.DataFrame({'filename': train_files, 'trainable': 1})
val_df = pd.DataFrame({'filename': val_files, 'trainable': 0})

#merge train and val df
trainval_df = pd.concat([train_df, val_df], ignore_index=True)
trainval_df.to_csv(os.path.join(occluded_dataset_dir, 'trainval.csv'), index=False)


In [5]:
base_dataset_dir = os.path.join(processed_dir, 'lidarseg_images', 'trainval')

df = pd.read_csv(os.path.join(base_dataset_dir, 'trainval.csv'))

## report dataset statistics
def report_dataset_statistics(df):
    trainable_count = df[df['trainable'] == 1].shape[0]
    non_trainable_count = df[df['trainable'] == 0].shape[0]
    print(f"Total samples: {df.shape[0]}")
    print(f"Trainable samples: {trainable_count}")
    print(f"Non-trainable samples: {non_trainable_count}")



report_dataset_statistics(df)

Total samples: 34149
Trainable samples: 27319
Non-trainable samples: 6830


In [6]:
occluded_dataset_dir = os.path.join(processed_dir, 'lidarseg_images', 'occcluded')

df = pd.read_csv(os.path.join(base_dataset_dir, 'trainval.csv'))

## report dataset statistics
def report_dataset_statistics(df):
    trainable_count = df[df['trainable'] == 1].shape[0]
    non_trainable_count = df[df['trainable'] == 0].shape[0]
    print(f"Total samples: {df.shape[0]}")
    print(f"Trainable samples: {trainable_count}")
    print(f"Non-trainable samples: {non_trainable_count}")



report_dataset_statistics(df)

Total samples: 34149
Trainable samples: 27319
Non-trainable samples: 6830
