In [None]:
#@title Installs
!pip install torch torchvision transformers open_clip_torch

In [None]:
#@title Define RoofNet Path
import os
from google.colab import drive
from pathlib import Path

# UPDATE BELOW
ROOFNET_DIR = ""


# === Load Dataset & Compute Class Weights ===
ROOFNET_SUBSET_DIR  = ""
CSV_PATH = ""
df = pd.read_csv(CSV_PATH)
class_names = sorted(df['image'].apply(lambda x: x.split('/')[0]).unique())
class_to_idx = {cls: idx for idx, cls in enumerate(class_names)}
df['label'] = df['image'].apply(lambda x: class_to_idx[x.split('/')[0]])

In [None]:
# @title Load packages and RemoteCLIP download model weights
# Models from, Code adapted from https://github.com/ChenDelong1999/RemoteCLIP?tab=readme-ov-file
from huggingface_hub import hf_hub_download
import torch, open_clip
from PIL import Image
from IPython.display import display

for model_name in ['ViT-L-14']:
    checkpoint_path = hf_hub_download("chendelong/RemoteCLIP", f"RemoteCLIP-{model_name}.pt", cache_dir='checkpoints')
    print(f'{model_name} is downloaded to {checkpoint_path}.')

In [None]:
# @title Load in RemoteClip Model
model_name = 'ViT-L-14'
model, preprocess_train, preprocess = open_clip.create_model_and_transforms(model_name)
tokenizer = open_clip.get_tokenizer(model_name)

path_to_your_checkpoints = 'checkpoints/models--chendelong--RemoteCLIP/snapshots/bf1d8a3ccf2ddbf7c875705e46373bfe542bce38'

ckpt = torch.load(f"{path_to_your_checkpoints}/RemoteCLIP-{model_name}.pt", map_location="cpu")
message = model.load_state_dict(ckpt)
print(message)

<All keys matched successfully>


In [None]:
#@title Finetune RemoteCLIP for 5 epochs using class rebalancing
import os
import torch
import numpy as np
import pandas as pd
from tqdm import tqdm
from PIL import Image
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms
import open_clip

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)

# === Dataset Definition ===
class RoofDataset(Dataset):
    def __init__(self, csv_file, img_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.transform = transform
        self.classes = self.data['image'].apply(lambda x: x.split('/')[0])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        img_path = os.path.join(self.img_dir, row['image'])
        image = Image.open(img_path).convert("RGB")
        image = self.transform(image)
        return image, row['prompt'], row['image'].split('/')[0]  # returns class name

# === Compute class weights
class_counts = df['label'].value_counts().sort_index()
weights = 1.0 / class_counts
sample_weights = df['label'].map(weights).values
sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)

train_dataset = RoofDataset(CSV_PATH, ROOFNET_SUBSET_DIR, transform=preprocess_train)
train_loader = DataLoader(train_dataset, batch_size=32, sampler=sampler)

# === Training Setup ===
optimizer = optim.AdamW(model.parameters(), lr=1e-5)
loss_fn = nn.CrossEntropyLoss(label_smoothing=0.1)
temperature = 0.07
save_path = os.path.join(img_dir, "best_clip_model_balanced.pth")
best_loss = float("inf")

# === Training Loop ===
for epoch in range(5):
    model.train()
    total_loss = 0

    for images, texts, labels_str in tqdm(train_loader):
        images = images.to(device)
        tokenized_texts = tokenizer(texts).to(device)
        labels = torch.tensor([class_to_idx[l] for l in labels_str], device=device)

        image_features = model.encode_image(images)
        text_features = model.encode_text(tokenized_texts)
        normalized_image_features = image_features / image_features.norm(dim=-1, keepdim=True)
        normalized_text_features = text_features / text_features.norm(dim=-1, keepdim=True)

        logits = (normalized_image_features @ normalized_text_features.T) / temperature
        loss = (loss_fn(logits, torch.arange(len(images)).to(device)) +
                loss_fn(logits.T, torch.arange(len(images)).to(device))) / 2

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch} Avg Loss: {avg_loss:.4f}")
    if avg_loss < best_loss:
        best_loss = avg_loss
        torch.save(model.state_dict(), save_path)
        print(f"New best model saved at epoch {epoch} with loss {avg_loss:.4f}")

In [None]:
#@title Material Descriptions
material_descriptions = {
    "Thatch": "thatch roof",
    "GreenVegetative": "roof with vegetation on it",
    "StoneSlates": "dark stone slate roof",
    "ClayTiles": "clay / ceramic tile roof ",
    "AsphaltTiles": "asphalt shingle pitched roof",
    "ConcreteTiles": "concrete / cement tile roof",
    "WoodTiles": "wood shingle roof",
    "MetalSheetMaterials": "corrugated or tiled metal roof (silver / dark / painted)",
    "PolycarbonateSheetMaterials": "polycarbonate roof",
    "GlassSheetMaterials": "glass roof (clear or mirrored)",
    "AmorphousConcrete": "flat concrete roof",
    "AmorphousAsphalt": "asphalt-coated roof (bitumen layer or rolled roofing)",
    "AmorphousMembrane": "membrane roof (bright EPDM/TPO)",
    "AmorphousFabric": "tensile fabric roof (PVC / PTFE / canvas)",
    "Unknown": "unknown material, image may be too low resolution or obstructed"
}

In [None]:
#@title FineTuned Image Classification
import os
import torch
import shutil
from torchvision import transforms
from PIL import Image
import open_clip

# === CONFIG ===
model_weights_path = os.path.join(DATASET_DIR, "") # <--Update with your model
output_base_dir = "" # <--Update with your output directiory
device = 'cuda' if torch.cuda.is_available() else 'cpu'

# Your list of material classes (labels)
material_classes = [
    "Thatch", "StoneSlates", "ClayTiles", "AsphaltTiles",
    "ConcreteTiles", "WoodTiles", "MetalSheetMaterials", "PolycarbonateSheetMaterials",
    "GlassSheetMaterials", "AmorphousConcrete", "AmorphousAsphalt",
    "AmorphousMembrane", "AmorphousFabric", "Unknown", "GreenVegetative"
]

model, _, _ = open_clip.create_model_and_transforms('ViT-L-14', pretrained='laion2b_s32b_b82k')

preprocess = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.48145466, 0.4578275, 0.40821073],
                         std=[0.26862954, 0.26130258, 0.27577711])
])
tokenizer = open_clip.get_tokenizer('ViT-L-14')

# Load your fine-tuned weights
model.load_state_dict(torch.load(model_weights_path, map_location=device))
model.to(device)
model.eval()

# Ensure output folders exist
for material in material_classes:
    os.makedirs(os.path.join(output_base_dir, material), exist_ok=True)

# === Functions ===

def build_prompts(city_name):
    prompts = [f"{material} in {city_name}" for material in material_classes]
    return prompts

def extract_city_name_from_filename(filename):
    base = os.path.splitext(os.path.basename(filename))[0]
    base = Path(filename).stem
    if '-' in base:
        city_part = base.split('-')[0]
        city_name = city_part.replace('_', ' ').title()
        return city_name
    elif 'height' in base:
        city_part = base.split('_height')[0]
        city_name = city_part.replace('_', ' ').title()
        return city_name
    elif 'imsat' in base:
        city_part = base.split('_imsat')[0]
        city_name = city_part.replace('_', ' ').title()
    return city_name

def already_classified(img_name):
    """Check if image already exists in any material folder."""
    for material in material_classes:
        target_path = os.path.join(output_base_dir, material, img_name)
        if os.path.exists(target_path):
            return True
    return False

def predict_and_move(image_path, city_name):
    # Load image
    image = Image.open(image_path).convert("RGB")

    # Check area
    width, height = image.size
    area = width * height

    if area <= 1000:
        print(f"{os.path.basename(image_path)} too small ({area} px), skipping.")
        return

    # Preprocess for CLIP
    image = preprocess(image).unsqueeze(0).to(device)

    # Build prompts
    prompts = build_prompts(city_name)
    tokenized_prompts = tokenizer(prompts).to(device)

    # Encode
    with torch.no_grad():
        image_features = model.encode_image(image)
        text_features = model.encode_text(tokenized_prompts)

    # Normalize
    image_features /= image_features.norm(dim=-1, keepdim=True)
    text_features /= text_features.norm(dim=-1, keepdim=True)

    # Similarity
    similarities = (100.0 * image_features @ text_features.T).squeeze(0)
    best_idx = similarities.argmax().item()
    predicted_material = material_classes[best_idx]

    # Move to correct folder
    target_dir = os.path.join(output_base_dir, predicted_material)
    try:
      shutil.move(image_path, target_dir)

      print(f"{os.path.basename(image_path)} classified as {predicted_material} and moved.")
    except:
      print(f"{os.path.basename(image_path)} already present.")

# === Main Loop ===
for material in material_classes:
  input_images_dir = os.path.join(XBD_DATASET_DIR_TIER3)

  for img_file in os.listdir(input_images_dir):
      if img_file.lower().endswith(('.jpg', '.jpeg', '.png')):
          full_path = os.path.join(input_images_dir, img_file)
          city_name = extract_city_name_from_filename(img_file)
          predict_and_move(full_path, city_name)