In [44]:
from scipy.ndimage import median_filter, gaussian_filter
import cv2
 
def load_lidar_raster(file_path):
    """Loads a LiDAR raster file and returns it as a NumPy array."""
    with rasterio.open(file_path) as src:
        data = src.read(1)  # Read the first band
        profile = src.profile  # Save metadata for future use
    return data, profile
 
def normalize_image(image):
    """Normalize image to range [0, 255] for better visualization and processing."""
    #image = (image - np.min(image)) / (np.max(image) - np.min(image)) * 255.0
    return image
 
def denoise_lidar(image, method='bilateral'):
    """Apply noise reduction to LiDAR raster using various filtering methods."""
    if method == 'median':
        return median_filter(image, size=3)
    elif method == 'gaussian':
        return gaussian_filter(image, sigma=1)
    elif method == 'bilateral':
        return cv2.bilateralFilter(image, d=9, sigmaColor=75, sigmaSpace=75)
    else:
        raise ValueError("Unknown denoising method: choose 'median', 'gaussian', or 'bilateral'")
 
def enhance_edges(image):
    """Apply edge enhancement using the Sobel filter."""
    sobel_x = cv2.Sobel(image, cv2.CV_64F, 1, 0, ksize=3)
    sobel_y = cv2.Sobel(image, cv2.CV_64F, 0, 1, ksize=3)
    edges = np.sqrt(sobel_x**2 + sobel_y**2)
    return normalize_image(edges)
 
def preprocess_lidar(image, method='bilateral'):
    """Preprocess LiDAR raster by denoising and enhancing edges."""
    image = normalize_image(image)
    denoised = denoise_lidar(image, method)
    edges = enhance_edges(denoised)
    return edges

In [45]:
#!pip install rasterio
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import rasterio
import matplotlib.pyplot as plt
import torch.optim as optim
from PIL import Image


hillshade_path = "./South_Clear_Creek/Lidar_DEM_Hillshade/South_Clear_Creek_BareEarth_Hillshade_1m.tif"
roads_path = "./South_Clear_Creek/Roads_Boundary/South_Clear_Creek_Roads_Mask.tif"
dem_path = "./South_Clear_Creek/Lidar_DEM_Hillshade/South_Clear_Creek_BareEarth_DEM_1m.tif"


with rasterio.open(hillshade_path) as src:
    hillshade = src.read().squeeze().astype(np.float32)
    hillshade = (hillshade - hillshade.min()) / (hillshade.max() - hillshade.min())

with rasterio.open(roads_path) as src:
    roads_mask = src.read().squeeze().astype(np.uint8)
    road_mask_binary = (roads_mask > 0).astype(np.uint8)
with rasterio.open(dem_path) as src:
    array = src.read(masked=True)
    metadata = src.profile
print(array.shape)
print(hillshade.shape)

def get_patches(image, mask, dem, patch_size=128, stride=128):
    img_patches = []
    mask_patches = []
    dem_patches = []  # To store patches of the DEM data
    h, w = image.shape
      # Remove the channel dimension (from shape (1, 13115, 10460) to (13115, 10460))
    for i in range(0, h - patch_size + 1, stride):
        print(i)
        for j in range(0, w - patch_size + 1, stride):
            # Extract patches for image, mask, and DEM
            img_patch = image[i:i+patch_size, j:j+patch_size]
            img_patch = preprocess_lidar(img_patch, method='bilateral')
            mask_patch = mask[i:i+patch_size, j:j+patch_size]
            # Filter out patches with too few valid mask values
            if np.sum(mask_patch) > 10:
                img_patches.append(np.expand_dims(img_patch, axis=0))  # Add image patch
                mask_patches.append(np.expand_dims(mask_patch, axis=0))  # Add mask patch

    return img_patches, mask_patches, dem_patches

img_patches, mask_patches, dem_patches = get_patches(hillshade, road_mask_binary, array)




(1, 13115, 10460)
(13115, 10460)
0
128
256
384
512
640
768
896
1024
1152
1280
1408
1536
1664
1792
1920
2048
2176
2304
2432
2560
2688
2816
2944
3072
3200
3328
3456
3584
3712
3840
3968
4096
4224
4352
4480
4608
4736
4864
4992
5120
5248
5376
5504
5632
5760
5888
6016
6144
6272
6400
6528
6656
6784
6912
7040
7168
7296
7424
7552
7680
7808
7936
8064
8192
8320
8448
8576
8704
8832
8960
9088
9216
9344
9472
9600
9728
9856
9984
10112
10240
10368
10496
10624
10752
10880
11008
11136
11264
11392
11520
11648
11776
11904
12032
12160
12288
12416
12544
12672
12800
12928


In [46]:
class UNet(nn.Module):
    def __init__(self, in_channels=1, out_channels=1, init_features=64):
        super(UNet, self).__init__()
        features = init_features

        self.encoder1 = self.conv_block(in_channels, features)
        self.pool1 = nn.MaxPool2d(2)

        self.encoder2 = self.conv_block(features, features * 2)
        self.pool2 = nn.MaxPool2d(2)

        self.encoder3 = self.conv_block(features * 2, features * 4)
        self.pool3 = nn.MaxPool2d(2)

        self.encoder4 = self.conv_block(features * 4, features * 8)
        self.pool4 = nn.MaxPool2d(2)

        self.bottleneck = self.conv_block(features * 8, features * 16)

        self.upconv4 = nn.ConvTranspose2d(features * 16, features * 8, kernel_size=2, stride=2)
        self.decoder4 = self.conv_block(features * 16, features * 8)

        self.upconv3 = nn.ConvTranspose2d(features * 8, features * 4, kernel_size=2, stride=2)
        self.decoder3 = self.conv_block(features * 8, features * 4)

        self.upconv2 = nn.ConvTranspose2d(features * 4, features * 2, kernel_size=2, stride=2)
        self.decoder2 = self.conv_block(features * 4, features * 2)

        self.upconv1 = nn.ConvTranspose2d(features * 2, features, kernel_size=2, stride=2)
        self.decoder1 = self.conv_block(features * 2, features)

        self.final_conv = nn.Conv2d(features, out_channels, kernel_size=1)

    def forward(self, x):
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool1(enc1))
        enc3 = self.encoder3(self.pool2(enc2))
        enc4 = self.encoder4(self.pool3(enc3))

        bottleneck = self.bottleneck(self.pool4(enc4))

        dec4 = self.upconv4(bottleneck)
        dec4 = torch.cat((dec4, enc4), dim=1)
        dec4 = self.decoder4(dec4)

        dec3 = self.upconv3(dec4)
        dec3 = torch.cat((dec3, enc3), dim=1)
        dec3 = self.decoder3(dec3)

        dec2 = self.upconv2(dec3)
        dec2 = torch.cat((dec2, enc2), dim=1)
        dec2 = self.decoder2(dec2)

        dec1 = self.upconv1(dec2)
        dec1 = torch.cat((dec1, enc1), dim=1)
        dec1 = self.decoder1(dec1)

        return self.final_conv(dec1)

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

class RoadDataset(Dataset):
    def __init__(self, images, masks):
        self.images = images
        self.masks = masks
    

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        x = torch.tensor(self.images[idx], dtype=torch.float32)
        y = torch.tensor(self.masks[idx], dtype=torch.float32)

        return x, y


In [47]:

img_patches = np.array(img_patches)
mask_patches = np.array(mask_patches)
#dem_patches = np.array(dem_patches)

In [48]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset


# Split dataset into 80% train, 20% test
train_img, test_img, train_mask, test_mask= train_test_split(
    img_patches, mask_patches,  test_size=0.2, random_state=42
)

# Create dataset instances
train_dataset = RoadDataset(train_img, train_mask)
test_dataset = RoadDataset(test_img, test_mask)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

# --- Model setup ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = UNet(in_channels=1, out_channels=1).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

epochs = 10
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct_train = 0
    total_train = 0

    for batch in train_loader:
        x, y = batch
        x, y = x.to(device), y.to(device)

        optimizer.zero_grad()
        output = model(x)
        loss = criterion(output, y)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Compute train accuracy
        predicted = (torch.sigmoid(output) > 0.5).float()  # Thresholding
        correct_train += (predicted == y).sum().item()
        total_train += y.numel()

    avg_loss = running_loss / len(train_loader)
    train_accuracy = correct_train / total_train

    # --- Validation Phase ---
    model.eval()
    test_loss = 0.0
    correct_test = 0
    total_test = 0

    with torch.no_grad():
        for batch in test_loader:
            x, y = batch
            x, y = x.to(device), y.to(device)

            output = model(x)
            loss = criterion(output, y)
            test_loss += loss.item()

            # Compute test accuracy
            predicted = (torch.sigmoid(output) > 0.5).float()
            correct_test += (predicted == y).sum().item()
            total_test += y.numel()

    avg_test_loss = test_loss / len(test_loader)
    test_accuracy = correct_test / total_test

    print(f"Epoch {epoch+1}/{epochs}, "
          f"Loss: {avg_loss:.4f}, Accuracy: {train_accuracy:.4f}, "
          f"Test Loss: {avg_test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")

Epoch 1/10, Loss: 0.1990, Accuracy: 0.9853, Test Loss: 0.1008, Test Accuracy: 0.9913
Epoch 2/10, Loss: 0.0678, Accuracy: 0.9920, Test Loss: 0.0558, Test Accuracy: 0.9914
Epoch 3/10, Loss: 0.0490, Accuracy: 0.9920, Test Loss: 0.0469, Test Accuracy: 0.9914
Epoch 4/10, Loss: 0.0431, Accuracy: 0.9920, Test Loss: 0.0435, Test Accuracy: 0.9914
Epoch 5/10, Loss: 0.0405, Accuracy: 0.9920, Test Loss: 0.0426, Test Accuracy: 0.9913
Epoch 6/10, Loss: 0.0390, Accuracy: 0.9920, Test Loss: 0.0406, Test Accuracy: 0.9914
Epoch 7/10, Loss: 0.0382, Accuracy: 0.9920, Test Loss: 0.0400, Test Accuracy: 0.9914
Epoch 8/10, Loss: 0.0373, Accuracy: 0.9920, Test Loss: 0.0385, Test Accuracy: 0.9914
Epoch 9/10, Loss: 0.0368, Accuracy: 0.9920, Test Loss: 0.0396, Test Accuracy: 0.9914
Epoch 10/10, Loss: 0.0361, Accuracy: 0.9920, Test Loss: 0.0383, Test Accuracy: 0.9914


In [49]:
torch.save(model.state_dict(), "trained_unet_64_stride.pth")


In [50]:
def get_pytorch_patches(image, patch_size=128, stride=128):
    patches = []
    positions = []
    H, W = image.shape
    for i in range(0, H - patch_size + 1, stride):
        for j in range(0, W - patch_size + 1, stride):
            patch = image[i:i+patch_size, j:j+patch_size]
            patch = np.expand_dims(patch, axis=(0))  # shape: (1, H, W) for PyTorch
            patches.append(patch)
            positions.append((i, j))
    return patches, positions, (H, W)


In [None]:
def predict_large_image(image, model, device, patch_size=128, stride=128):
    model.eval()
    patches, positions, (H, W) = get_pytorch_patches(image, patch_size, stride)
    predictions = []
    
    with torch.no_grad():
        for patch in patches:
            x = torch.tensor(patch, dtype=torch.float32).unsqueeze(0).to(device)  # shape: (1, 1, 256, 256)
            y = model(x)
            pred = torch.sigmoid(y).squeeze().cpu().numpy()  # (256, 256)
            predictions.append(pred)

    # Reassemble full-size prediction
    full_pred = np.zeros((H, W), dtype=np.float32)
    count_map = np.zeros((H, W), dtype=np.float32)

    for pred, (i, j) in zip(predictions, positions):
        full_pred[i:i+patch_size, j:j+patch_size] += pred
        count_map[i:i+patch_size, j:j+patch_size] += 1

    full_pred /= np.maximum(count_map, 1)
    binary_mask = (full_pred > 0.005).astype(np.uint8)
    return binary_mask


In [52]:
!pip install opencv-python
import rasterio
from PIL import Image


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = UNet(in_channels=1, out_channels=1).to(device)

model.load_state_dict(torch.load("trained_unet.pth", map_location=device))

model.to(device)

hillshade_path = "./South_Clear_Creek/Lidar_DEM_Hillshade/South_Clear_Creek_BareEarth_Hillshade_1m.tif"
with rasterio.open(hillshade_path) as src:
    image = src.read(1)
image = (image - image.min()) / (image.max() - image.min())

# Predict full mask
predicted_mask = predict_large_image(image, model, device)

# Save
Image.fromarray(predicted_mask * 255).save("pytorch_predicted_mask_with_dem.png")





[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: C:\Users\brice\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


KeyboardInterrupt: 

In [None]:
with rasterio.open('Upper_Willow_Creek_BareEarth_Hillshade_1m_1_uint8.tif') as src:
    arr = src.read().squeeze().astype(np.uint8)
patches, positions, (H, W) = get_pytorch_patches(image, 128, 128)
    
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 
model = UNet(in_channels=1, out_channels=1).to(device)
 
model.load_state_dict(torch.load("trained_unet_64_stride.pth", map_location=device))
 
model.to(device)
 
hillshade_path = "Upper_Willow_Creek_BareEarth_Hillshade_1m_1.tif"
with rasterio.open(hillshade_path) as src:
    image = src.read(1)
image = (image - image.min()) / (image.max() - image.min())
 
# Predict full mask
predicted_mask = predict_large_image(image, model, device)
 
# Save
Image.fromarray(predicted_mask * 255).save("pytorch_predicted_mask_3.png")
 




AttributeError: 'list' object has no attribute 'shape'

In [None]:
import numpy as np
from PIL import Image
import rasterio
import cv2
 
with rasterio.open("Upper_Willow_Creek_BareEarth_Hillshade_1m_1.tif") as src:
    hillshade = src.read(1).astype(np.float32)
    hillshade = (hillshade - hillshade.min()) / (hillshade.max() - hillshade.min())
 
prediction = np.array(Image.open("pytorch_predicted_mask_3.png").convert("L"))
prediction = (prediction > 127).astype(np.uint8)  # white = road
 
if prediction.shape != hillshade.shape:
    prediction = cv2.resize(prediction, (hillshade.shape[1], hillshade.shape[0]), interpolation=cv2.INTER_NEAREST)
 
hillshade_rgb = np.stack([hillshade]*3, axis=-1)
overlay = hillshade_rgb.copy()
overlay[prediction == 1] = [0.0, 0.0, 1.0]  # red overlay where prediction = 1
 
overlay_uint8 = (overlay * 255).astype(np.uint8)
Image.fromarray(overlay_uint8).save("hillshade_with_predicted_roads.png")
 
print("Saved overlay image as 'hillshade_with_predicted_roads.png'")

  overlay_uint8 = (overlay * 255).astype(np.uint8)


Saved overlay image as 'hillshade_with_predicted_roads.png'
