# GCL Dataset Machine Learning Training

This notebook trains a machine learning model on the GCL (Global Coverage Layer) shapefile datasets from 2010, 2015, and 2020.

## Dataset Overview
The GCL dataset contains shapefiles with geographic data across multiple years:
- GCL2010: Data from 2010
- GCL2015: Data from 2015  
- GCL2020: Data from 2020

## Workflow
1. **Print data structure** - Examine the shapefile structure and attributes
2. **Prepare dataset** - Load and preprocess the shapefile data for training
3. **Model architecture** - Define the ML model suitable for this data
4. **Train model** - Train the model on the prepared dataset
5. **Testing** - Evaluate model performance


In [None]:
# Import required libraries
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
import geopandas as gpd
import matplotlib.pyplot as plt
import seaborn as sns
from shapely.geometry import Point, Polygon
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from sklearn.decomposition import PCA
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import os
from pathlib import Path

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# Check GPU availability
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
print(f"PyTorch version: {torch.__version__}")
print("All imports successful!")


## 1. Print Data Structure

First, let's examine the structure of our GCL shapefiles to understand what attributes and data we're working with.


In [None]:
# Define paths to shapefiles
shapefile_years = ['2010', '2015', '2020']
shapefiles = {}

# Load each shapefile and examine its structure
for year in shapefile_years:
    shapefile_path = f'GCL{year}.shp'
    try:
        # Load the shapefile
        gdf = gpd.read_file(shapefile_path)
        shapefiles[year] = gdf
        
        print(f"\n{'='*50}")
        print(f"GCL{year} Shapefile Structure:")
        print(f"{'='*50}")
        print(f"Shape: {gdf.shape}")
        print(f"CRS: {gdf.crs}")
        print(f"\nColumns: {list(gdf.columns)}")
        print(f"\nData types:")
        print(gdf.dtypes)
        print(f"\nFirst 5 rows:")
        print(gdf.head())
        
        # Print summary statistics for numeric columns
        numeric_cols = gdf.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 0:
            print(f"\nSummary statistics:")
            print(gdf[numeric_cols].describe())
        
        # Print unique values for categorical columns (if any)
        categorical_cols = gdf.select_dtypes(include=['object']).columns
        categorical_cols = [col for col in categorical_cols if col != 'geometry']
        if len(categorical_cols) > 0:
            print(f"\nUnique values in categorical columns:")
            for col in categorical_cols:
                unique_vals = gdf[col].nunique()
                if unique_vals < 20:  # Only print if reasonable number of unique values
                    print(f"{col}: {gdf[col].unique()}")
                else:
                    print(f"{col}: {unique_vals} unique values")
                    
    except Exception as e:
        print(f"Error loading GCL{year}.shp: {e}")
        
print(f"\n{'='*50}")
print("Data loading complete!")
print(f"Successfully loaded {len(shapefiles)} shapefiles")


## 2. Prepare Dataset for Training

Now let's prepare the data for machine learning. We'll:
- Extract features from the geometries
- Combine data from different years 
- Create target variables based on temporal changes
- Split into training and testing sets


In [None]:
class GeospatialFeatureExtractor:
    """Extract features from geospatial data for ML training"""
    
    def __init__(self):
        self.scaler = StandardScaler()
        self.label_encoder = LabelEncoder()
        
    def extract_geometry_features(self, gdf):
        """Extract features from geometry objects"""
        features = pd.DataFrame()
        
        # Basic geometry features
        features['area'] = gdf.geometry.area
        features['perimeter'] = gdf.geometry.length
        features['centroid_x'] = gdf.geometry.centroid.x
        features['centroid_y'] = gdf.geometry.centroid.y
        
        # Shape complexity metrics
        features['shape_complexity'] = features['perimeter'] / (2 * np.sqrt(np.pi * features['area']))
        features['area_perimeter_ratio'] = features['area'] / features['perimeter']
        
        # Bounding box features
        bounds = gdf.geometry.bounds
        features['bbox_width'] = bounds['maxx'] - bounds['minx']
        features['bbox_height'] = bounds['maxy'] - bounds['miny']
        features['bbox_area'] = features['bbox_width'] * features['bbox_height']
        features['fill_ratio'] = features['area'] / features['bbox_area']
        
        return features
    
    def create_temporal_features(self, data_dict):
        """Create features based on temporal changes between years"""
        years = sorted(data_dict.keys())
        
        if len(years) < 2:
            print("Need at least 2 years of data for temporal features")
            return None
            
        # Extract features for each year
        feature_dfs = {}
        for year in years:
            gdf = data_dict[year]
            
            # Extract geometry features
            geom_features = self.extract_geometry_features(gdf)
            
            # Add year identifier
            geom_features['year'] = int(year)
            
            # Add non-geometry attributes
            non_geom_cols = [col for col in gdf.columns if col != 'geometry']
            for col in non_geom_cols:
                geom_features[f'attr_{col}'] = gdf[col]
            
            feature_dfs[year] = geom_features
            
        # Combine all years
        combined_df = pd.concat(feature_dfs.values(), ignore_index=True)
        
        # Create change-based features (comparing consecutive years)
        change_features = []
        for i in range(1, len(years)):
            prev_year = years[i-1]
            curr_year = years[i]
            
            prev_df = feature_dfs[prev_year]
            curr_df = feature_dfs[curr_year]
            
            # Calculate changes in numeric features
            numeric_cols = ['area', 'perimeter', 'shape_complexity']
            for col in numeric_cols:
                if col in prev_df.columns and col in curr_df.columns:
                    # Match geometries by index or location
                    change = curr_df[col].values[:min(len(prev_df), len(curr_df))] - \
                            prev_df[col].values[:min(len(prev_df), len(curr_df))]
                    
                    change_df = pd.DataFrame({
                        f'{col}_change': change,
                        'year_transition': f'{prev_year}_{curr_year}'
                    })
                    change_features.append(change_df)
        
        return combined_df, change_features
    
    def prepare_ml_dataset(self, data_dict, task='classification'):
        """Prepare final dataset for machine learning"""
        
        # Create temporal features
        combined_df, change_features = self.create_temporal_features(data_dict)
        
        if combined_df is None:
            return None, None, None, None
            
        # Select numeric features for training
        feature_cols = [col for col in combined_df.columns if col not in ['year'] and 
                       pd.api.types.is_numeric_dtype(combined_df[col])]
        
        X = combined_df[feature_cols]
        
        # Create target variable based on task
        if task == 'classification':
            # Example: Classify based on area quantiles
            if 'area' in combined_df.columns:
                y = pd.qcut(combined_df['area'], q=3, labels=['small', 'medium', 'large'])
                y = self.label_encoder.fit_transform(y)
            else:
                # Create synthetic target if no obvious target exists
                y = np.random.randint(0, 3, size=len(X))
        else:
            # Regression: predict area or another continuous variable
            if 'area' in combined_df.columns:
                y = combined_df['area']
            else:
                y = np.random.randn(len(X))
        
        # Handle missing values
        X = X.fillna(X.mean())
        
        # Scale features
        X_scaled = self.scaler.fit_transform(X)
        
        # Train-test split
        X_train, X_test, y_train, y_test = train_test_split(
            X_scaled, y, test_size=0.2, random_state=42
        )
        
        print(f"Dataset prepared:")
        print(f"  Total samples: {len(X)}")
        print(f"  Features: {len(feature_cols)}")
        print(f"  Training samples: {len(X_train)}")
        print(f"  Testing samples: {len(X_test)}")
        print(f"  Feature names: {feature_cols}")
        
        return X_train, X_test, y_train, y_test, feature_cols

# Create feature extractor and prepare dataset
extractor = GeospatialFeatureExtractor()

# Only proceed if we have loaded shapefiles
if len(shapefiles) > 0:
    X_train, X_test, y_train, y_test, feature_names = extractor.prepare_ml_dataset(
        shapefiles, task='classification'
    )
    
    if X_train is not None:
        print(f"\n✅ Dataset successfully prepared for training!")
        print(f"Training set shape: {X_train.shape}")
        print(f"Test set shape: {X_test.shape}")
        print(f"Number of classes: {len(np.unique(y_train))}")
    else:
        print("⚠️ Could not prepare dataset. Check data structure.")
else:
    print("⚠️ No shapefiles loaded. Cannot prepare dataset.")


## 3. Model Architecture

We'll implement both traditional ML models and a neural network for comparison. This allows flexibility in choosing the best approach for the geospatial data.


In [None]:
# PyTorch Dataset and DataLoader
class GeospatialDataset(Dataset):
    """PyTorch Dataset for geospatial features"""
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)
        self.y = torch.LongTensor(y)
        
    def __len__(self):
        return len(self.X)
        
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Neural Network Architecture
class GeospatialNN(nn.Module):
    """Neural network for geospatial classification"""
    def __init__(self, input_dim, hidden_dims=[128, 64, 32], num_classes=3, dropout=0.2):
        super(GeospatialNN, self).__init__()
        
        layers = []
        prev_dim = input_dim
        
        # Build hidden layers
        for hidden_dim in hidden_dims:
            layers.extend([
                nn.Linear(prev_dim, hidden_dim),
                nn.ReLU(),
                nn.BatchNorm1d(hidden_dim),
                nn.Dropout(dropout)
            ])
            prev_dim = hidden_dim
            
        # Output layer
        layers.append(nn.Linear(prev_dim, num_classes))
        
        self.model = nn.Sequential(*layers)
        
    def forward(self, x):
        return self.model(x)

# Model factory for creating different architectures
class ModelFactory:
    """Factory for creating different model types"""
    
    @staticmethod
    def create_random_forest(n_estimators=100):
        return RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=10,
            random_state=42,
            n_jobs=-1
        )
    
    @staticmethod
    def create_gradient_boosting(n_estimators=100):
        return GradientBoostingClassifier(
            n_estimators=n_estimators,
            learning_rate=0.1,
            max_depth=5,
            random_state=42
        )
    
    @staticmethod
    def create_neural_network(input_dim, num_classes=3):
        return GeospatialNN(
            input_dim=input_dim,
            hidden_dims=[128, 64, 32],
            num_classes=num_classes,
            dropout=0.2
        )

# Initialize models if data is available
if 'X_train' in locals() and X_train is not None:
    # Get dimensions
    input_features = X_train.shape[1]
    num_classes = len(np.unique(y_train))
    
    print(f"Creating models for {input_features} features and {num_classes} classes...")
    
    # Traditional ML models
    rf_model = ModelFactory.create_random_forest()
    gb_model = ModelFactory.create_gradient_boosting()
    
    # Neural network
    nn_model = ModelFactory.create_neural_network(input_features, num_classes)
    
    print("✅ Models created successfully!")
    print(f"  - Random Forest")
    print(f"  - Gradient Boosting")
    print(f"  - Neural Network: {sum(p.numel() for p in nn_model.parameters())} parameters")
else:
    print("⚠️ No training data available. Load data first before creating models.")


In [None]:
# Training utilities
def train_neural_network(model, train_loader, val_loader, epochs=50, lr=0.001):
    """Train neural network model"""
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=5, factor=0.5)
    
    train_losses = []
    val_losses = []
    val_accuracies = []
    
    model.to(device)
    
    for epoch in range(epochs):
        # Training phase
        model.train()
        train_loss = 0.0
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            train_loss += loss.item()
            
        avg_train_loss = train_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs.data, 1)
                total += batch_y.size(0)
                correct += (predicted == batch_y).sum().item()
                
        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = 100 * correct / total
        val_losses.append(avg_val_loss)
        val_accuracies.append(val_accuracy)
        
        scheduler.step(avg_val_loss)
        
        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch+1}/{epochs}] - Train Loss: {avg_train_loss:.4f}, "
                  f"Val Loss: {avg_val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")
    
    return train_losses, val_losses, val_accuracies

# Training execution
if 'X_train' in locals() and X_train is not None:
    print("Starting model training...")
    
    # 1. Train Random Forest
    print("\n📊 Training Random Forest...")
    rf_model.fit(X_train, y_train)
    rf_pred = rf_model.predict(X_test)
    rf_accuracy = accuracy_score(y_test, rf_pred)
    print(f"Random Forest Accuracy: {rf_accuracy:.4f}")
    
    # 2. Train Gradient Boosting
    print("\n📊 Training Gradient Boosting...")
    gb_model.fit(X_train, y_train)
    gb_pred = gb_model.predict(X_test)
    gb_accuracy = accuracy_score(y_test, gb_pred)
    print(f"Gradient Boosting Accuracy: {gb_accuracy:.4f}")
    
    # 3. Train Neural Network
    print("\n📊 Training Neural Network...")
    
    # Create DataLoaders
    train_dataset = GeospatialDataset(X_train, y_train)
    test_dataset = GeospatialDataset(X_test, y_test)
    
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    # Train the model
    train_losses, val_losses, val_accuracies = train_neural_network(
        nn_model, train_loader, test_loader, epochs=50
    )
    
    # Evaluate Neural Network
    nn_model.eval()
    nn_predictions = []
    nn_true = []
    
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            batch_X = batch_X.to(device)
            outputs = nn_model(batch_X)
            _, predicted = torch.max(outputs.data, 1)
            nn_predictions.extend(predicted.cpu().numpy())
            nn_true.extend(batch_y.numpy())
    
    nn_accuracy = accuracy_score(nn_true, nn_predictions)
    print(f"\nNeural Network Final Accuracy: {nn_accuracy:.4f}")
    
    # Summary of results
    print("\n" + "="*50)
    print("MODEL PERFORMANCE SUMMARY")
    print("="*50)
    print(f"Random Forest:      {rf_accuracy:.4f}")
    print(f"Gradient Boosting:  {gb_accuracy:.4f}")
    print(f"Neural Network:     {nn_accuracy:.4f}")
    print("="*50)
    
    # Store results
    model_results = {
        'Random Forest': {'accuracy': rf_accuracy, 'predictions': rf_pred},
        'Gradient Boosting': {'accuracy': gb_accuracy, 'predictions': gb_pred},
        'Neural Network': {'accuracy': nn_accuracy, 'predictions': nn_predictions}
    }
else:
    print("⚠️ No training data available. Please load and prepare data first.")


In [None]:
# Test 1: Confusion Matrices
def plot_confusion_matrices(model_results, y_test):
    """Plot confusion matrices for all models"""
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    for idx, (model_name, results) in enumerate(model_results.items()):
        cm = confusion_matrix(y_test, results['predictions'])
        
        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[idx])
        axes[idx].set_title(f'{model_name}\nAccuracy: {results["accuracy"]:.3f}')
        axes[idx].set_xlabel('Predicted')
        axes[idx].set_ylabel('Actual')
    
    plt.tight_layout()
    plt.show()

# Execute if models are trained
if 'model_results' in locals():
    plot_confusion_matrices(model_results, y_test)
else:
    print("⚠️ Train models first before testing.")


In [None]:
# Test 2: Feature Importance Analysis
def plot_feature_importance(rf_model, feature_names, top_n=10):
    """Plot feature importance from Random Forest model"""
    importances = rf_model.feature_importances_
    indices = np.argsort(importances)[::-1][:top_n]
    
    plt.figure(figsize=(10, 6))
    plt.title(f'Top {top_n} Most Important Features')
    plt.bar(range(top_n), importances[indices])
    plt.xticks(range(top_n), [feature_names[i] for i in indices], rotation=45, ha='right')
    plt.ylabel('Feature Importance')
    plt.tight_layout()
    plt.show()

# Execute if Random Forest is trained
if 'rf_model' in locals() and 'feature_names' in locals():
    plot_feature_importance(rf_model, feature_names)
else:
    print("⚠️ Train Random Forest model first.")


In [None]:
# Test 3: Training Curves for Neural Network
def plot_training_curves(train_losses, val_losses, val_accuracies):
    """Plot training and validation curves"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    
    # Loss curves
    epochs = range(1, len(train_losses) + 1)
    ax1.plot(epochs, train_losses, 'b-', label='Training Loss')
    ax1.plot(epochs, val_losses, 'r-', label='Validation Loss')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.set_title('Training and Validation Loss')
    ax1.legend()
    ax1.grid(True)
    
    # Accuracy curve
    ax2.plot(epochs, val_accuracies, 'g-', label='Validation Accuracy')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy (%)')
    ax2.set_title('Validation Accuracy')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()

# Execute if neural network was trained
if 'train_losses' in locals() and 'val_losses' in locals():
    plot_training_curves(train_losses, val_losses, val_accuracies)
else:
    print("⚠️ Train neural network first to see training curves.")


In [None]:
# Test 4: Detailed Classification Reports
def print_classification_reports(model_results, y_test, class_names=None):
    """Print detailed classification reports for all models"""
    if class_names is None:
        class_names = [f'Class {i}' for i in range(len(np.unique(y_test)))]
    
    for model_name, results in model_results.items():
        print(f"\n{'='*50}")
        print(f"Classification Report: {model_name}")
        print('='*50)
        print(classification_report(y_test, results['predictions'], 
                                  target_names=class_names))

# Execute if models are trained
if 'model_results' in locals():
    # Define class names based on the classification task
    class_names = ['Small', 'Medium', 'Large']  # Adjust based on your actual classes
    print_classification_reports(model_results, y_test, class_names)
else:
    print("⚠️ Train models first before generating reports.")


In [None]:
# Test 5: Model Comparison Visualization
def compare_models_barplot(model_results):
    """Create a bar plot comparing model accuracies"""
    model_names = list(model_results.keys())
    accuracies = [results['accuracy'] for results in model_results.values()]
    
    plt.figure(figsize=(8, 6))
    bars = plt.bar(model_names, accuracies)
    
    # Color bars based on performance
    colors = ['green' if acc > 0.8 else 'orange' if acc > 0.6 else 'red' 
              for acc in accuracies]
    for bar, color in zip(bars, colors):
        bar.set_color(color)
    
    plt.ylabel('Accuracy')
    plt.title('Model Performance Comparison')
    plt.ylim(0, 1.0)
    
    # Add accuracy values on top of bars
    for i, (name, acc) in enumerate(zip(model_names, accuracies)):
        plt.text(i, acc + 0.01, f'{acc:.3f}', ha='center', va='bottom')
    
    plt.grid(axis='y', alpha=0.3)
    plt.tight_layout()
    plt.show()

# Execute if models are trained
if 'model_results' in locals():
    compare_models_barplot(model_results)
else:
    print("⚠️ Train models first before comparing.")


In [None]:
# Test 6: Make Predictions on New Data
def predict_new_samples(model, scaler, feature_names, n_samples=5):
    """Generate and predict on new synthetic samples"""
    print("Generating predictions on new samples...\n")
    
    # Generate synthetic samples similar to the training data
    new_samples = np.random.randn(n_samples, len(feature_names))
    
    # Scale the samples
    new_samples_scaled = scaler.transform(new_samples)
    
    # Make predictions
    if isinstance(model, nn.Module):
        # Neural network prediction
        model.eval()
        with torch.no_grad():
            inputs = torch.FloatTensor(new_samples_scaled).to(device)
            outputs = model(inputs)
            _, predictions = torch.max(outputs, 1)
            predictions = predictions.cpu().numpy()
            probabilities = torch.softmax(outputs, dim=1).cpu().numpy()
    else:
        # Scikit-learn model prediction
        predictions = model.predict(new_samples_scaled)
        probabilities = model.predict_proba(new_samples_scaled)
    
    # Display results
    class_names = ['Small', 'Medium', 'Large']
    for i in range(n_samples):
        print(f"Sample {i+1}:")
        print(f"  Predicted class: {class_names[predictions[i]]}")
        print(f"  Class probabilities:")
        for j, class_name in enumerate(class_names):
            print(f"    {class_name}: {probabilities[i][j]:.3f}")
        print()

# Execute prediction test
if 'rf_model' in locals() and 'extractor' in locals():
    print("Random Forest Predictions:")
    predict_new_samples(rf_model, extractor.scaler, feature_names, n_samples=3)
    
    if 'nn_model' in locals():
        print("\nNeural Network Predictions:")
        predict_new_samples(nn_model, extractor.scaler, feature_names, n_samples=3)
else:
    print("⚠️ Train models first before making predictions.")


In [None]:
# Save trained models
import pickle
import joblib

# Create models directory
models_dir = Path('trained_models')
models_dir.mkdir(exist_ok=True)

# Save models if they are trained
if 'rf_model' in locals():
    # Save Random Forest
    joblib.dump(rf_model, models_dir / 'random_forest_model.joblib')
    print("✅ Random Forest model saved")
    
if 'gb_model' in locals():
    # Save Gradient Boosting
    joblib.dump(gb_model, models_dir / 'gradient_boosting_model.joblib')
    print("✅ Gradient Boosting model saved")
    
if 'nn_model' in locals():
    # Save Neural Network
    torch.save({
        'model_state_dict': nn_model.state_dict(),
        'input_dim': nn_model.model[0].in_features,
        'num_classes': nn_model.model[-1].out_features,
    }, models_dir / 'neural_network_model.pt')
    print("✅ Neural Network model saved")
    
if 'extractor' in locals():
    # Save the scaler and feature names
    joblib.dump({
        'scaler': extractor.scaler,
        'label_encoder': extractor.label_encoder,
        'feature_names': feature_names if 'feature_names' in locals() else None
    }, models_dir / 'preprocessors.joblib')
    print("✅ Preprocessors saved")
    
print(f"\nAll models saved to: {models_dir.absolute()}")
