In [1]:
import tensorflow as tf
import pandas as pd
import numpy as np
import os
import rasterio
from tensorflow.keras.applications import VGG16
from tensorflow.keras.layers import Dense, Flatten, Dropout, Input
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import Sequence
from sklearn.model_selection import train_test_split

In [None]:
# ==========================================
# 1. SETUP
# ==========================================
# Folder containing your Cluster Subfolders (e.g. /0001/dhs_0001_2023_Q1.tif)
tif_folder = "/Users/ruben/Desktop/Thesis/TrainingData/Sentinel2/sample50-quarterly-2022"
labels_file = "viirs_ntl_labels_sample50.csv"

In [3]:
# ==========================================
# 2. CUSTOM TIF GENERATOR
# ==========================================
class SentinelGenerator(Sequence):
    def __init__(self, image_paths, labels, batch_size=32, target_size=(224, 224), n_classes=3):
        self.image_paths = image_paths
        self.labels = labels
        self.batch_size = batch_size
        self.target_size = target_size
        self.n_classes = n_classes

    def __len__(self):
        return int(np.ceil(len(self.image_paths) / self.batch_size))

    def __getitem__(self, idx):
        batch_x = self.image_paths[idx * self.batch_size : (idx + 1) * self.batch_size]
        batch_y = self.labels[idx * self.batch_size : (idx + 1) * self.batch_size]
        
        images = []
        for path in batch_x:
            images.append(self.load_and_process_tif(path))
            
        return np.array(images), np.array(batch_y)
    
    def load_and_process_tif(self, path):
        try:
            with rasterio.open(path) as src:
                # Read RGB bands (1, 2, 3)
                r = src.read(1)
                g = src.read(2)
                b = src.read(3)
                img = np.dstack((r, g, b))
                
                # Robust Normalization (The "Percentile Stretch" we discussed)
                # This ensures the CNN sees a visible image, not a black square
                p2, p98 = np.percentile(img, (2, 98))
                if p98 == p2: p98 = 255 # Prevent divide by zero for flat images
                img = np.clip((img - p2) / (p98 - p2), 0, 1)
                
                # Resize to 224x224 for VGG16
                img = tf.image.resize(img, self.target_size).numpy()
                return img
        except:
            # Return black image on error
            return np.zeros((self.target_size[0], self.target_size[1], 3))

In [4]:
# ==========================================
# 3. PREPARE FILE LIST
# ==========================================
print("Matching files to labels...")
labels_df = pd.read_csv(labels_file)
# Create a dictionary for fast lookup: {ClusterID: NTL_Class}
label_map = dict(zip(labels_df['DHSCLUST'], labels_df['NTL_Class']))

all_paths = []
all_labels = []

# Walk through folders to find every quarterly/monthly TIF
for root, dirs, files in os.walk(tif_folder):
    for file in files:
        if file.endswith(".tif") and "dhs_" in file:
            try:
                # Extract Cluster ID from filename (e.g., dhs_0001_...)
                parts = file.split('_') 
                cluster_id = int(parts[1]) # Assumes 'dhs' is part 0, '0001' is part 1
                
                if cluster_id in label_map:
                    all_paths.append(os.path.join(root, file))
                    all_labels.append(label_map[cluster_id])
            except:
                continue

print(f"Found {len(all_paths)} valid training images.")

# Split into Training and Validation
X_train, X_val, y_train, y_val = train_test_split(all_paths, all_labels, test_size=0.2, random_state=42)

# Create Generators
train_gen = SentinelGenerator(X_train, y_train, batch_size=32, n_classes=3)
val_gen = SentinelGenerator(X_val, y_val, batch_size=32, n_classes=3)

Matching files to labels...


FileNotFoundError: [Errno 2] No such file or directory: 'dhs_ntl_labels.csv'

In [None]:
# ==========================================
# 4. BUILD VGG16 PROXY MODEL
# ==========================================
def build_proxy_model():
    # Load VGG16 (Pre-trained on ImageNet)
    base_model = VGG16(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
    
    # Freeze the base layers (We only train the new head first)
    for layer in base_model.layers:
        layer.trainable = False
        
    x = base_model.output
    x = Flatten()(x)
    
    # The "Feature Vector" Layer (4096 size)
    x = Dense(4096, activation='relu', name='feature_vector')(x)
    x = Dropout(0.5)(x)
    
    # The Classification Head (3 Classes: Dark, Dim, Bright)
    predictions = Dense(3, activation='softmax')(x)
    
    model = Model(inputs=base_model.input, outputs=predictions)
    model.compile(optimizer=Adam(learning_rate=0.0001),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model

model = build_proxy_model()

In [None]:
# ==========================================
# 5. TRAIN
# ==========================================
print("Starting Proxy Training...")
history = model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=10,  # 10 epochs is usually enough for proxy tasks
    verbose=1
)

# Save the trained model
model.save("cnn_viirs_proxy.h5")
print("Model saved as 'cnn_viirs_proxy.h5'. Phase 1 Complete!")