Climate resiliance and risk managment Model

Installation of pacjages that will be needed for this model development

In [None]:
import pickle
import joblib
import json
from pathlib import Path
from datetime import datetime
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix, roc_auc_score
from sklearn.feature_selection import SelectKBest, f_classif
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

Class for all Risk assessment Models.

In [None]:
class BaseRiskModel:
    def __init__(self, model_name, model_type='classification'):
        self.model_name = model_name
        self.model_type = model_type
        self.model = None
        self.scaler = StandardScaler()
        self.feature_selector = None
        self.is_trained = False
        self.features = None
        self.training_history = []
        self.model_params = {}

    def prepare_data(self, data):
        """Prepare data for model training/prediction"""
        raise NotImplementedError("Must implement prepare_data method")   
    
    def train(self, X, y):
        """Train the risk model"""
        raise NotImplementedError("Must implement train method")
    
    def predict(self, X):
        """Make risk predictions"""
        if not self.is_trained:
            raise ValueError("Model must be trained before making predictions")
        return self.model.predict(X)
    
    def predict_proba(self, X):
        """Get prediction probabilities"""
        if not self.is_trained:
            raise ValueError("Model must be trained before making predictions")
        if hasattr(self.model, 'predict_proba'):
            return self.model.predict_proba(X)
        else:
            return None
        
    def evaluate(self, X_test, y_test):
        """Evaluate model performance"""
        predictions = self.predict(X_test)
        probabilities = self.predict_proba(X_test)
        
        evaluation = {
            'accuracy': accuracy_score(y_test, predictions),
            'classification_report': classification_report(y_test, predictions),
            'confusion_matrix': confusion_matrix(y_test, predictions).tolist(),
            'model_name': self.model_name,
            'test_samples': len(X_test),
            'evaluation_date': datetime.now().isoformat()
        }
        
        # Add AUC score if probabilities available
        if probabilities is not None and len(np.unique(y_test)) == 2:
            evaluation['auc_score'] = roc_auc_score(y_test, probabilities[:, 1])
        
        return evaluation
    
    def save_model(self, filepath):
        """Save trained model to file"""
        model_data = {
            'model': self.model,
            'scaler': self.scaler,
            'feature_selector': self.feature_selector,
            'model_name': self.model_name,
            'model_type': self.model_type,
            'features': self.features,
            'is_trained': self.is_trained,
            'training_history': self.training_history,
            'model_params': self.model_params,
            'save_date': datetime.now().isoformat()
        }
        
        filepath = Path(filepath)
        if filepath.suffix == '.pkl':
            with open(filepath, 'wb') as f:
                pickle.dump(model_data, f)
        elif filepath.suffix == '.joblib':
            joblib.dump(model_data, filepath)
        
        print(f"Model saved to: {filepath}")
    
    def load_model(self, filepath):
        """Load trained model from file"""
        filepath = Path(filepath)
        if filepath.suffix == '.pkl':
            with open(filepath, 'rb') as f:
                model_data = pickle.load(f)
        elif filepath.suffix == '.joblib':
            model_data = joblib.load(filepath)
        
        self.model = model_data['model']
        self.scaler = model_data.get('scaler', StandardScaler())
        self.feature_selector = model_data.get('feature_selector', None)
        self.model_name = model_data['model_name']
        self.model_type = model_data['model_type']
        self.features = model_data['features']
        self.is_trained = model_data['is_trained']
        self.training_history = model_data.get('training_history', [])
        self.model_params = model_data.get('model_params', {})
        
        print(f"Model loaded from: {filepath}")


LANDSLIDE RISK MODEL

In [None]:
class RwandaLandslideModel():
    """Advanced landslide risk prediction model for Rwanda"""

    def __init__(self, model_algorithm ='random_forest'):

        super().__init__("Rwanda_Landslide_Model", "classificattion")
        self.algorithm = model_algorithm
        self.rwanda_params = self.__setup_rwanda_parameters()
        self._initialize_model()

    def __setup_rwanda_parameters(self):
        """setup Rwanda specific landslide parameters"""
        return {
            'elevation_range': (900, 4507), #Rwanda's elevation range (m)
            'critical_slope': 25, #Critical slope angle (degrees)
            'rainfall_threshold': 50, #Critical 24-hour rainfall (mm)
            'soil_saturation_threshold': 0.8, #Critical soil moisture
            'vegetation_threshold': 0.3, #Critical NDVI threshold
            'geolgy_weights': {
                'volcanic': 0.6, #More stable
                'sedimentary': 0.3, #Moderate stability
                'weathered': 1.0, #Less stable
                'alluvial': 0.7 #Variable stability
            }
        }
    
    def _initialize_model(self):
        """ Initialize the underlying ML model """
        if self.algorithm == 'random_forest':
            self.model = RandomForestClassifier(
                n_estimators= 200,
                max_depth= 15,
                min_samples_split= 10,
                min_samples_leaf=5,
                random_state= 42,
                class_weight='balanced'
            )
        elif self.algorithm == 'gradient_boosting':
            self.model = GradientBoostingClassifier(
                n_estimators= 150,
                learning_rate= 0.1,
                max_depth= 8,
                random_state= 42            
                )
        elif self.algorithm == 'Logistic_Regression':
            self.model = LogisticRegression( 
                random_state= 42,
                class_weight='balanced',
                max_iter= 1000
            )
    
    def generate_training_data(self, n_samples= 5000):
        """Generate realistic training data for Rwanda landslidies"""
        print(f"Generating {n_samples} training samples for landslide model...")
        
        np.random.seed(42)
        data = []

        # Rwanda's geographic regions with different landslide susceptibility
        regions = {
            'Northern_Mountains': {'base_risk': 0.7, 'elevation_mean': 2200, 'slope_mean': 30},
            'Central_Plateau': {'base_risk': 0.4, 'elevation_mean': 1600, 'slope_mean': 20},
            'Eastern_Hills': {'base_risk': 0.5, 'elevation_mean': 1400, 'slope_mean': 25},
            'Western_Ridges': {'base_risk': 0.6, 'elevation_mean': 1800, 'slope_mean': 28},
            'Southern_Valleys': {'base_risk': 0.3, 'elevation_mean': 1300, 'slope_mean': 15}
        }

        for i in range(n_samples):
            #Select random region

            region_name, region_params = np.random.choice(list(regions.items()))

            #Generate geographic features
            elevation = np.random.normal(region_params['elevation_mean'], 200)
            elevation = np.clip(elevation, 900, 4500)
            slope = np.random.normal(region_params['slope_mean'], 8)
            slope = np.clip(slope, 0, 60)

            aspect = np.random.uniform(0,360)

            #Generate climate features
            #Rainfall patterns (more in wet season)

            season_factor = np.sin((i % 365) * (2 * np.pi / 365) * 0.3 + 1)
            daily_rainfall = np.random.gamma(2, 3) * season_factor
            rainfall_24h = daily_rainfall + np.random.gamma(1, 10) #Extrem events
            rainfall_72h =  rainfall_24h + np.random.gamma(1, 15)
            antecedent_rainfall = np.random.gamma(3, 8) #previous week rainfall

            # Soil and vegetation features
            soil_depth = np.ranndom.gamma(2,1.5) #meters
            soil_type = np.random.choice(['volcanic', 'sedimentary', 'weathered', 'alluvial'])
            soil_moisture = 0.3 + 0.4 * (rainfall_24h / 100) + np.random.normal(0,0.1)
            soil_moisture = np.clip(soil_moisture, 0.1, 0.95)

            #Human factors
            


