In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install rasterio

Collecting rasterio
  Downloading rasterio-1.4.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.1 kB)
Collecting affine (from rasterio)
  Downloading affine-2.4.0-py3-none-any.whl.metadata (4.0 kB)
Collecting cligj>=0.5 (from rasterio)
  Downloading cligj-0.7.2-py3-none-any.whl.metadata (5.0 kB)
Collecting click-plugins (from rasterio)
  Downloading click_plugins-1.1.1-py2.py3-none-any.whl.metadata (6.4 kB)
Downloading rasterio-1.4.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (22.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m22.2/22.2 MB[0m [31m63.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading cligj-0.7.2-py3-none-any.whl (7.1 kB)
Downloading affine-2.4.0-py3-none-any.whl (15 kB)
Downloading click_plugins-1.1.1-py2.py3-none-any.whl (7.5 kB)
Installing collected packages: cligj, click-plugins, affine, rasterio
Successfully installed affine-2.4.0 click-plugins-1.1.1 cligj-0.7.2 rasterio-1.4.3


In [3]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
import rasterio
from sklearn.model_selection import train_test_split

# --- Config ---
IMAGE_DIR = "/content/drive/My Drive/HLS_Composite_Patches_2021"
LABEL_CSV = "/content/drive/My Drive/HLS_Composite_Patches_2021/HLS_Patch_Labels_2021.csv"
IMG_SIZE = (256, 256)
BATCH_SIZE = 32
EPOCHS = 30

# --- 1. Load label mapping ---
labels_df = pd.read_csv(LABEL_CSV)
labels_dict = dict(zip(labels_df["fileNamePrefix"], labels_df["label"]))

# --- 2. Collect image paths and match to labels ---
all_image_paths = [os.path.join(IMAGE_DIR, f) for f in os.listdir(IMAGE_DIR) if f.endswith('.tif')]

# Extract filename without extension to match CSV
filtered_paths = [
    p for p in all_image_paths
    if os.path.splitext(os.path.basename(p))[0] in labels_dict
]

print(f"Matched {len(filtered_paths)} images with labels.")

Matched 1650 images with labels.


In [4]:
# --- 3. Split train/test ---
train_paths, test_paths = train_test_split(filtered_paths, test_size=0.2, random_state=288)

# --- 4. Load function: RGB image + scalar label ---
def load_image_and_label(path):
    path_str = path.decode()
    with rasterio.open(path_str) as src:
        img = src.read(out_shape=(3, *IMG_SIZE))  # (3, H, W)
        img = np.transpose(img, (1, 2, 0))  # to (H, W, 3)
        img = np.nan_to_num(img.astype(np.float32))

    basename = os.path.splitext(os.path.basename(path_str))[0]
    label = np.float32(labels_dict[basename])

    return img, label

# --- 5. Create tf.data.Dataset ---
def create_dataset(paths, batch_size=BATCH_SIZE, shuffle=True):
    def _parse(path):
        image, label = tf.numpy_function(load_image_and_label, [path], [tf.float32, tf.float32])
        image.set_shape([*IMG_SIZE, 3])
        label.set_shape([])
        return image, label

    ds = tf.data.Dataset.from_tensor_slices(paths)
    ds = ds.map(_parse, num_parallel_calls=tf.data.AUTOTUNE)
    if shuffle:
        ds = ds.shuffle(100)
    ds = ds.batch(batch_size).prefetch(tf.data.AUTOTUNE)
    return ds

In [5]:
import matplotlib.pyplot as plt

train_ds = create_dataset(train_paths)
test_ds = create_dataset(test_paths, shuffle=False)

print("✅ Dataset ready.")

✅ Dataset ready.


In [6]:
# Test image
with rasterio.open(filtered_paths[0]) as src:
    print("Dtype:", src.dtypes)
    raw = src.read(1)  # first band
    print("Raw min/max:", raw.min(), raw.max())

Dtype: ('float64', 'float64', 'float64')
Raw min/max: -0.0026500000000000004 0.4601


In [7]:
from tensorflow.keras import layers, models

def build_image_regression_model(input_shape=(256, 256, 3)):
    inputs = layers.Input(shape=input_shape)

    x = layers.Conv2D(32, 3, activation='relu', padding='same')(inputs)
    x = layers.MaxPooling2D()(x)
    x = layers.Conv2D(64, 3, activation='relu', padding='same')(x)

    x = layers.GlobalAveragePooling2D()(x)

    x = layers.Dense(64, activation='relu')(x)
    outputs = layers.Dense(1)(x)

    return models.Model(inputs, outputs)


In [8]:
model = build_image_regression_model()

callbacks = [
    tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True),
    tf.keras.callbacks.ModelCheckpoint("best_model.keras", save_best_only=True),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=3,
        min_lr=1e-6
    )
]

# --- Compile ---
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3, clipnorm=1.0),
    loss=tf.keras.losses.MeanSquaredError(),
    metrics=[tf.keras.metrics.RootMeanSquaredError()]
)

In [None]:
# UPDATED IMAGE REG BASE MODEL
history = model.fit(train_ds, validation_data=test_ds, epochs=EPOCHS, callbacks=callbacks)

Epoch 1/30
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m616s[0m 8s/step - loss: 875.6575 - root_mean_squared_error: 29.5609 - val_loss: 272.7048 - val_root_mean_squared_error: 16.5138 - learning_rate: 0.0010
Epoch 2/30
[1m42/42[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m356s[0m 8s/step - loss: 184.5692 - root_mean_squared_error: 13.5469 - val_loss: 123.7397 - val_root_mean_squared_error: 11.1238 - learning_rate: 0.0010
Epoch 3/30
[1m 3/42[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m4:27[0m 7s/step - loss: 149.9756 - root_mean_squared_error: 12.2004

In [None]:
loss, rmse = model.evaluate(test_ds)
print(f"Test RMSE: {rmse:.4f}")

In [None]:
from tensorflow.keras.applications import ResNet50
from tensorflow.keras import layers, models

def build_resnet50_regression_model(input_shape=(256, 256, 3)):
    base = ResNet50(
        input_shape=input_shape,
        weights='imagenet',
        include_top=False  # Remove ImageNet classifier head
    )

    x = layers.GlobalAveragePooling2D()(base.output)
    x = layers.Dense(64, activation='relu')(x)
    x = layers.Dropout(0.2)(x)
    output = layers.Dense(1)(x)  # Single scalar output

    model = models.Model(inputs=base.input, outputs=output)
    return model

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

callbacks = [
    EarlyStopping(patience=7, restore_best_weights=True),
    ModelCheckpoint("best_model.keras", save_best_only=True),
    tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',1
        factor=0.5,
        patience=3,
        min_lr=1e-6
    )
]

model2 = build_resnet50_regression_model()

# --- Compile ---
model2.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4, clipnorm=1.0),
    loss=tf.keras.losses.MeanSquaredError(),
    metrics=[tf.keras.metrics.RootMeanSquaredError()]
)

In [None]:
history2 = model2.fit(train_ds, validation_data=test_ds, epochs=EPOCHS, callbacks=callbacks)

In [None]:
loss, rmse = model2.evaluate(test_ds)
print(f"Test RMSE: {rmse:.4f}")

In [None]:
# import matplotlib.pyplot as plt
# import numpy as np

# Take a batch from test set
# for images, labels in test_ds.take(1):
#     preds = model2.predict(images)

#     # Plot predictions vs. ground truth
#     for i in range(len(images)):
#         img = images[i].numpy()
#         label = labels[i].numpy()
#         pred = preds[i][0]  # shape: (batch_size, 1)

#         # Normalize image for display (only for visualization)
#         img_disp = (img - img.min()) / (img.max() - img.min() + 1e-8)

#         plt.figure(figsize=(5, 5))
#         plt.imshow(img_disp)
#         plt.title(f"Predicted: {pred:.3f} | True: {label:.3f}")
#         plt.axis('off')
#         plt.show()

In [None]:
# for images, labels in test_ds:
#     preds = model2.predict(images)
#     for i in range(len(images)):
#         img = images[i].numpy()
#         label = labels[i].numpy()
#         pred = preds[i][0]  # shape: (batch_size, 1)
#         print(pred, label)


In [None]:
# def plot_training_curves(history):
#     plt.figure(figsize=(10, 5))

#     # Plot loss
#     plt.subplot(1, 2, 1)
#     plt.plot(history.history['loss'], label='Train Loss')
#     plt.plot(history.history['val_loss'], label='Val Loss')
#     plt.xlabel('Epoch')
#     plt.ylabel('MSE Loss')
#     plt.title('Training & Validation Loss')
#     plt.legend()

#     # Plot RMSE
#     if 'rmse' in history.history:
#         plt.subplot(1, 2, 2)
#         plt.plot(history.history['rmse'], label='Train RMSE')
#         plt.plot(history.history['val_rmse'], label='Val RMSE')
#         plt.xlabel('Epoch')
#         plt.ylabel('RMSE')
#         plt.title('Training & Validation RMSE')
#         plt.legend()

#     plt.tight_layout()
#     plt.show()

In [None]:
# plot_training_curves(history)

In [None]:
# plot_training_curves(history2)

In [None]:
# NEW TEST DATASETS
# --- Config ---
LOWER_DIR = "/content/drive/My Drive/HLS_Composite_Patches_2021_CRElower"
LOWER_LABEL_CSV = "/content/drive/My Drive/HLS_Composite_Patches_2021/HLS_Patch_Labels_2021_lower.csv"

UPPER_DIR = "/content/drive/My Drive/HLS_Composite_Patches_2021_CREupper"
UPPER_LABEL_CSV = "/content/drive/My Drive/HLS_Composite_Patches_2021/HLS_Patch_Labels_2021_upper.csv"


def map_test_set(IMAGE_DIR, LABEL_CSV):

  labels_df = pd.read_csv(LABEL_CSV)
  labels_dict = dict(zip(labels_df["fileNamePrefix"], labels_df["label"]))

  all_image_paths = [os.path.join(IMAGE_DIR, f) for f in os.listdir(IMAGE_DIR) if f.endswith('.tif')]

  # Extract filename without extension to match CSV
  filtered_paths = [
      p for p in all_image_paths
      if os.path.splitext(os.path.basename(p))[0] in labels_dict
  ]
  print(f"Matched {len(filtered_paths)} images with labels.")
  return filtered_paths



In [None]:
test_cre_lower = map_test_set(LOWER_DIR, LOWER_LABEL_CSV)
test_cre_upper = map_test_set(UPPER_DIR, UPPER_LABEL_CSV)

In [None]:
test_lower = create_dataset(test_cre_lower, shuffle=False)
test_upper = create_dataset(test_cre_upper, shuffle=False)

In [None]:
loss, rmse = model.evaluate(test_lower)
print(f"Test RMSE (base model, lower bound): {rmse:.4f}")

In [None]:
loss, rmse = model.evaluate(test_upper)
print(f"Test RMSE (base model, upper bound): {rmse:.4f}")

In [None]:
loss, rmse = model2.evaluate(test_lower)
print(f"Test RMSE (ResNet model, lower bound): {rmse:.4f}")

In [None]:
loss, rmse = model2.evaluate(test_upper)
print(f"Test RMSE (ResNet model, upper bound): {rmse:.4f}")