In [None]:
import os
import shutil
import glob
import csv
import cv2
from tqdm import tqdm

In [None]:
ROOT_DIR = 'raw_data'
OUTPUT_DIR = '1024'

IMAGE_DIR = os.path.join(OUTPUT_DIR, "images")
MASK_DIR = os.path.join(OUTPUT_DIR, "masks")
CSV_PATH = os.path.join(OUTPUT_DIR, "split.csv")

os.makedirs(IMAGE_DIR, exist_ok=True)
os.makedirs(MASK_DIR, exist_ok=True)

In [None]:
def copy(src_img, src_mask, dst_img, dst_mask, split, writer):
    shutil.copy(src_img, dst_img)
    shutil.copy(src_mask, dst_mask)
    writer.writerow({
        'filename': os.path.basename(dst_img),
        'maskname': os.path.basename(dst_mask),
        'split': split
    })

In [None]:
def deepglobe(writer, target_size=(1024, 1024)):
    print("🔹 Processing DeepGlobe...")
    dg_train = os.path.join(ROOT_DIR, 'DeepGlobe', 'train')
    sat_imgs = sorted(glob.glob(os.path.join(dg_train, '*_sat.jpg')))
    for idx, img_path in enumerate(tqdm(sat_imgs)):
        mask_path = img_path.replace('_sat.jpg', '_mask.png')
        if not os.path.exists(mask_path):
            continue

        img = cv2.imread(img_path, cv2.IMREAD_COLOR)
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

        if img is None or mask is None:
            print(f"❌ Failed to read image or mask for {img_path}, {mask_path}")
            continue

        img_resized = cv2.resize(img, target_size, interpolation=cv2.INTER_LINEAR)
        mask_resized = cv2.resize(mask, target_size, interpolation=cv2.INTER_NEAREST)

        img_name = f"deepglobe_{idx:05d}.jpg"
        mask_name = f"deepglobe_{idx:05d}.png"

        dst_img = os.path.join(IMAGE_DIR, img_name)
        dst_mask = os.path.join(MASK_DIR, mask_name)

        cv2.imwrite(dst_img, img_resized)
        cv2.imwrite(dst_mask, mask_resized)

        writer.writerow({
            'filename': img_name,
            'maskname': mask_name,
            'split': 'train'
        })
    print("✅ DeepGlobe processing complete.")

In [None]:
def massachusetts(writer, target_size=(1024, 1024)):
    print("🔹 Processing Massachusetts...")
    
    mass_root = os.path.join(ROOT_DIR,'Massachusetts_Roads_Dataset', 'tiff')
    subsets = ['train', 'val', 'test']
    counter = 0

    for subset in subsets:
        img_dir = os.path.join(mass_root, subset)
        mask_dir = os.path.join(mass_root, f"{subset}_labels")

        img_paths = sorted(glob.glob(os.path.join(img_dir, "*.tiff")))
        for img_path in tqdm(img_paths):
            base_name = os.path.splitext(os.path.basename(img_path))[0]
            mask_path = os.path.join(mask_dir, f"{base_name}.tif")
            if not os.path.exists(mask_path):
                print(f"❌ Mask not found for {img_path}")
                continue

            img = cv2.imread(img_path, cv2.IMREAD_COLOR)
            mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)

            if img is None or mask is None:
                print(f"❌ Failed to read image or mask for {img_path}, {mask_path}")
                continue

            img_resized = cv2.resize(img, target_size, interpolation=cv2.INTER_LINEAR)
            mask_resized = cv2.resize(mask, target_size, interpolation=cv2.INTER_NEAREST)

            img_name = f"mass_{counter:05d}.jpg"
            mask_name = f"mass_{counter:05d}.png"

            dst_img = os.path.join(IMAGE_DIR, img_name)
            dst_mask = os.path.join(MASK_DIR, mask_name)

            cv2.imwrite(dst_img, img_resized)
            cv2.imwrite(dst_mask, mask_resized)

            writer.writerow({
                'filename': img_name,
                'maskname': mask_name,
                'split': subset
            })

            counter += 1

    print("✅ Massachusetts processing complete.")

In [None]:
def main():
    print("🚀 Starting dataset preparation...")
    with open(CSV_PATH, mode='w', newline='') as csvfile:
        fieldnames = ['filename', 'maskname', 'split']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()

        deepglobe(writer)
        massachusetts(writer)

    print(f"\n✅ Done! Dataset prepared in `{OUTPUT_DIR}` folder.")
    print(f"📄 CSV split info saved at: {CSV_PATH}")

if __name__ == "__main__":
    main()

In [None]:
import pandas as pd
import numpy as np

CSV_PATH = '1024/split.csv'

df = pd.read_csv(CSV_PATH)

# Shuffle
df = df.sample(frac=1, random_state=42).reset_index(drop=True)

n_total = len(df)
n_train = int(n_total * 0.7)
n_val = int(n_total * 0.15)

df.loc[:n_train-1, 'split'] = 'train'
df.loc[n_train:n_train+n_val-1, 'split'] = 'val'
df.loc[n_train+n_val:, 'split'] = 'test'

# Ghi đè csv
df.to_csv(CSV_PATH, index=False)

print(f"✅ Split completed: {df['split'].value_counts().to_dict()}")