This model is the adaptation of two equations one the Theis equation we covered in class the second is Terzaghi's One-Dimensional Consolidation Theory

#Theis equation
$s = \frac{Q}{4\pi T}w(u)$

#Terzaghi's one-dimensional consolidation equation:
Substituting into volumetric strain:
$\epsilon_v = -\frac{a_v}{1+e}\frac{\partial u_e}{\partial t}dt$

Volume change:
$dV = \epsilon_v dxdydz$

$dV = -\frac{a_v}{1+e}\frac{\partial u_e}{\partial t}dtdxdydz$

Equate Volume Change Expressions:
$-\frac{k}{\gamma_w}\frac{\partial^2 u_e}{\partial z^2}dxdydzdt = -\frac{a_v}{1+e}\frac{\partial u_e}{\partial t}dtdxdydz$

Collect terms:
$\frac{k}{\gamma_w}\frac{1+e}{a_v}\frac{\partial^2 u_e}{\partial z^2} = \frac{\partial u_e}{\partial t}$

Define coefficient of consolidation, $c_v$:
$c_v = \frac{k}{\gamma_w}\frac{1+e}{a_v}$

Terzaghi's one-dimensional consolidation equation:
$c_v\frac{\partial^2 u_e}{\partial z^2} = \frac{\partial u_e}{\partial t}$

Commonly the subscript "e" is dropped from the "u" term:
$c_v\frac{\partial^2 u}{\partial z^2} = \frac{\partial u}{\partial t}$

---

These equations make a lot of assumptions:

**Such as:**
* Ignores secondary compression (creep)
* No lateral deformation considered (1D only)
* Assumes constant soil properties (which change during consolidation)
* compression index is assumed constant -> yet actually varies with stress level
* void_ratio assumed constant -> yet Initial value may vary spatially
* Changes during consolidation not accounted for
* The drainage path parameter assumes ideal drainage conditions -> not always true



**Therefore it Doesn't account for:**
1. Soil layering
2. Non-uniform initial stress states
3. Temperature effects
4. Chemical effects on soil behavior
5. Large strain situations
6. Non-linear stress-strain behavior
7. Soil structure changes during consolidation
8. Variable loading rates
9. 3D effects near structures (1d) this matter most in built up areas
10. Spatial variability of soil properties

In [None]:
import numpy as np
import math
from scipy.integrate import solve_ivp, odeint

class SubsidenceModel:
    def __init__(self, aquifer_params, soil_layers):
        """Initialize model parameters"""
        # Aquifer parameters
        self.K = aquifer_params['K']
        self.S = aquifer_params['S']
        self.b = aquifer_params['b']
        self.Q = aquifer_params['Q']
        self.T = self.K * self.b  # Transmissivity

        # Soil layer parameters
        self.layers = []
        self.total_thickness = 0

        for layer in soil_layers:
            self.layers.append({
                'thickness': layer['thickness'],
                'void_ratio': layer['void_ratio'],
                'Cc': layer['compression_index'],
                'Cr': layer['recompression_index'],
                'OCR': layer['OCR'],
                'k': layer['k'],
                'mv': layer['mv'],
                'sigma0': layer['initial_effective_stress'],
                'sigma_p': layer['initial_effective_stress'] * layer['OCR']
            })
            self.total_thickness += layer['thickness']

    def _ensure_array(self, x):
        """Convert input to numpy array if it isn't already"""
        return np.asarray(x).reshape(-1)

    def calculate_theis_drawdown(self, r, t):
        """Calculate drawdown using Theis equation"""
        r = self._ensure_array(r)
        t = self._ensure_array(t)

        # Create proper meshgrid
        R, T = np.meshgrid(r, t)

        # Calculate u parameter
        u = (R**2 * self.S) / (4 * self.T * T)

        # Well function approximation
        W = np.zeros_like(u)

        # For small u (u < 0.01), use logarithmic approximation
        small_u = u < 0.01
        W[small_u] = -np.log(u[small_u]) - 0.5772

        # For larger u, use series approximation
        large_u = ~small_u
        for n in range(1, 5):
            W[large_u] += (-1)**n * u[large_u]**n / (n * math.factorial(n))

        drawdown = (self.Q / (4 * np.pi * self.T)) * W
        return drawdown

    def calculate_pressure_change(self, drawdown):
        """Convert drawdown to pressure change"""
        water_unit_weight = 9.81  # kN/m³
        return drawdown * water_unit_weight

    def calculate_layer_settlement(self, layer_idx, delta_pressure):
        """Calculate settlement for a single layer"""
        layer = self.layers[layer_idx]

        # Vectorized calculation of final stress
        final_stress = np.maximum(layer['sigma0'] + delta_pressure, 0.1)
        sigma_p = layer['sigma_p']

        # Create masks for different compression ranges
        recompression_mask = final_stress <= sigma_p
        virgin_mask = ~recompression_mask

        # Initialize settlement array
        settlement = np.zeros_like(final_stress)

        # Calculate settlement for recompression range
        if np.any(recompression_mask):
            settlement[recompression_mask] = (layer['thickness'] / (1 + layer['void_ratio'])) * \
                layer['Cr'] * np.log10(final_stress[recompression_mask] / layer['sigma0'])

        # Calculate settlement for virgin compression range
        if np.any(virgin_mask):
            settlement[virgin_mask] = (layer['thickness'] / (1 + layer['void_ratio'])) * \
                (layer['Cr'] * np.log10(sigma_p / layer['sigma0']) + \
                 layer['Cc'] * np.log10(final_stress[virgin_mask] / sigma_p))

        return settlement

    def calculate_total_settlement(self, r, t, include_consolidation=True):
        """Calculate total settlement at given distance and time"""
        r = self._ensure_array(r)
        t = self._ensure_array(t)

        drawdown = self.calculate_theis_drawdown(r, t)
        delta_pressure = self.calculate_pressure_change(drawdown)

        layer_settlements = []
        total_settlement = np.zeros_like(delta_pressure)

        for i in range(len(self.layers)):
            layer = self.layers[i]

            # Calculate immediate settlement
            settlement = self.calculate_layer_settlement(i, delta_pressure)

            if include_consolidation:
                # Calculate consolidation coefficient
                cv = (layer['k'] / 9.81) * (1 / layer['mv'])

                # Calculate time factor and degree of consolidation
                R, T = np.meshgrid(r, t)
                Tv = cv * T / (layer['thickness']**2)
                U = np.where(Tv < 0.2,
                           2 * np.sqrt(Tv / np.pi),
                           1 - (8 / (np.pi**2)) * np.exp(-((np.pi**2) / 4) * Tv))

                settlement = settlement * U

            layer_settlements.append(settlement)
            total_settlement += settlement

        return total_settlement, layer_settlements

    def generate_settlement_profile(self, distances, times):
        """Generate settlement profile for visualization"""
        distances = self._ensure_array(distances)
        times = self._ensure_array(times)

        total_settlement, _ = self.calculate_total_settlement(distances, times)
        X, T = np.meshgrid(distances, times)

        return X, T, total_settlement

def plot_settlement_profile(model, distances, times):
    """
    Create visualization of settlement profile

    Parameters:
    model: SubsidenceModel instance
    distances (array): Array of distances from well
    times (array): Array of times since pumping started
    """
    import matplotlib.pyplot as plt

    X, T, S = model.generate_settlement_profile(distances, times)

    plt.figure(figsize=(12, 8))
    contour = plt.contourf(X, T, S * 1000, levels=20, cmap='RdYlBu_r')
    plt.colorbar(contour, label='Settlement (mm)')
    plt.xlabel('Distance from well (m)')
    plt.ylabel('Time (days)')
    plt.title('Settlement Profile')

    # Add contour lines
    CS = plt.contour(X, T, S * 1000, levels=10, colors='black', linewidths=0.5)
    plt.clabel(CS, inline=True, fontsize=8, fmt='%.1f')

    # Log scale for distance axis
    plt.xscale('log')

    plt.grid(True, which="both", ls="-", alpha=0.2)
    plt.show()

    # Calculate statistics properly
    max_settlement = np.max(S) * 1000  # Convert to mm

    # Find settlement at 100m
    dist_idx = np.abs(distances - 100).argmin()  # Find closest distance to 100m
    time_idx = -1  # Last time point
    settlement_at_100m = S[time_idx, dist_idx] * 1000  # Convert to mm

    print(f"Maximum settlement: {max_settlement:.2f} mm")
    print(f"Settlement at ~100m after {times[-1]:.0f} days: {settlement_at_100m:.2f} mm")

    # Print additional statistics
    nearest_distance = distances[dist_idx]
    if abs(nearest_distance - 100) > 1:  # If the nearest distance is not exactly 100m
        print(f"Note: Settlement reported at {nearest_distance:.1f}m (nearest available distance to 100m)")

In [None]:
def predict_point(model, x, y, t):
    """
    Predict settlement at a specific point and time

    Parameters:
    x, y: coordinates in meters from well
    t: time in days

    Returns:
    float: settlement in meters
    """
    # Calculate radial distance
    r = np.sqrt(x**2 + y**2)

    # Get settlement prediction
    settlement, layers = model.calculate_total_settlement(r, t)

    # Extract scalar value from numpy array
    settlement_value = float(settlement.flatten()[0])

    # Print results
    print(f"Point ({x}m, {y}m) at time {t} days:")
    print(f"Distance from well: {r:.1f}m")
    print(f"Settlement: {settlement_value * 1000:.2f} mm")

    return settlement_value

def analyze_point_over_time(model, x, y, times):
    """
    Analyze settlement at a point over multiple times

    Parameters:
    x, y: coordinates in meters from well
    times: array of times in days
    """
    r = np.sqrt(x**2 + y**2)
    settlement, layers = model.calculate_total_settlement(r, times)

    # Convert to millimeters for display
    settlement_mm = settlement.flatten() * 1000

    print(f"\nSettlement analysis for point ({x}m, {y}m):")
    print(f"Distance from well: {r:.1f}m")
    print(f"Initial settlement: {settlement_mm[0]:.2f} mm")
    print(f"Final settlement: {settlement_mm[-1]:.2f} mm")
    print(f"Maximum settlement: {np.max(settlement_mm):.2f} mm")

    # Optional: plot settlement vs time
    import matplotlib.pyplot as plt

    plt.figure(figsize=(10, 6))
    plt.plot(times, settlement_mm, 'b-')
    plt.xlabel('Time (days)')
    plt.ylabel('Settlement (mm)')
    plt.title(f'Settlement at ({x}m, {y}m)')
    plt.grid(True)
    plt.show()

    return settlement_mm

In [None]:
aquifer_params = {
    'K': 1.0,  # m/day
    'S': 0.001,  # storage coefficient
    'b': 50.0,  # aquifer thickness (m)
    'Q': 3000.0  # pumping rate (m³/day)
}

soil_layers = [{
    'thickness': 30.0,
    'void_ratio': 0.75,
    'compression_index': 0.3,
    'recompression_index': 0.05,
    'OCR': 1.5,
    'k': 1e-8,
    'mv': 1e-4,
    'initial_effective_stress': 200.0
}]

In [None]:
model = SubsidenceModel(aquifer_params, soil_layers)

In [None]:
# Analyze a single point at a specific time
x, y = 40, 60  # Point coordinates
t = 365        # Time in days
settlement = predict_point(model, x, y, t)

# Analyze the same point over time
times = np.linspace(1, 365, 50)
settlement_history = analyze_point_over_time(model, x, y, times)

In [None]:
# Single point calculation
r = 100  # meters from well
t = 365  # days
settlement, layer_settlements = model.calculate_total_settlement(r, t)
print(f"Settlement at {r}m after {t} days: {settlement[0,0]:.4f}m")

# Generate and plot settlement profile
distances = np.linspace(1, 1000, 100)
times = np.linspace(1, 365, 25)
plot_settlement_profile(model, distances, times)

In [None]:
import numpy as np
from scipy.integrate import odeint

class AdvancedSubsidenceModel:
    def __init__(self, layers, void_ratios, compression_indices,
                 recompression_indices, OCRs, k_values, mv_values,
                 initial_effective_stresses):
        """Initialize model parameters"""
        self.layers = np.array(layers)
        self.e0 = np.array(void_ratios)
        self.Cc = np.array(compression_indices)
        self.Cr = np.array(recompression_indices)
        self.OCR = np.array(OCRs)
        self.k = np.array(k_values)
        self.mv = np.array(mv_values)
        self.sigma0 = np.array(initial_effective_stresses)
        self.sigma_p = self.sigma0 * self.OCR
        self.num_layers = len(layers)
        self.current_void_ratios = self.e0.copy()

    def _calculate_cv(self, layer_idx, current_stress):
        """Calculate stress-dependent coefficient of consolidation"""
        gamma_w = 9.81
        k = self.k[layer_idx]
        current_stress = max(current_stress, 0.1)  # Prevent negative or zero stress
        mv = self.mv[layer_idx] * np.exp(-0.0005 * (current_stress - self.sigma0[layer_idx]))
        return (k / gamma_w) * (1 / mv)

    def _calculate_nonlinear_settlement(self, layer_idx, delta_pressure):
        """Calculate settlement considering nonlinear compression behavior"""
        if isinstance(delta_pressure, np.ndarray):
            delta_pressure = float(delta_pressure)

        # Ensure minimum positive stress to avoid log10 issues
        final_stress = max(self.sigma0[layer_idx] + delta_pressure, 0.1)
        precons_pressure = self.sigma_p[layer_idx]

        if final_stress <= precons_pressure:
            # Recompression range
            settlement = (self.layers[layer_idx] / (1 + self.e0[layer_idx])) * \
                        self.Cr[layer_idx] * np.log10(final_stress / self.sigma0[layer_idx])
        else:
            # Both recompression and virgin compression
            settlement = (self.layers[layer_idx] / (1 + self.e0[layer_idx])) * \
                        (self.Cr[layer_idx] * np.log10(precons_pressure / self.sigma0[layer_idx]) + \
                         self.Cc[layer_idx] * np.log10(final_stress / precons_pressure))

        strain = settlement / self.layers[layer_idx]
        self.current_void_ratios[layer_idx] = self.e0[layer_idx] - \
                                             ((1 + self.e0[layer_idx]) * strain)

        return settlement

    def _consolidation_equation(self, u, t, cv_func, H):
        """Solve 1D consolidation equation with variable cv"""
        d2u_dz2 = np.gradient(np.gradient(u))
        cv = cv_func(np.mean(u)) if callable(cv_func) else cv_func
        return cv * d2u_dz2

    def calculate_spatial_settlement(self, x, y, well_locations, Q, t, aquifer_props):
        """Calculate spatially variable settlement"""
        X, Y = np.meshgrid(x, y)
        settlement = np.zeros_like(X)

        for (well_x, well_y), q in zip(well_locations, Q):
            r = np.sqrt((X - well_x)**2 + (Y - well_y)**2)
            r = np.maximum(r, 0.1)  # Minimum distance

            u = (r**2 * aquifer_props['S']) / (4 * aquifer_props['T'] * t)
            u = np.maximum(u, 1e-10)  # Prevent log of zero
            W = -np.log(u) - 0.5772
            drawdown = q / (4 * np.pi * aquifer_props['T']) * W

            delta_p = drawdown * 9.81

            for layer in range(self.num_layers):
                layer_settlement = np.zeros_like(X)
                for i in range(X.shape[0]):
                    for j in range(X.shape[1]):
                        layer_settlement[i,j] = self._calculate_nonlinear_settlement(
                            layer, max(delta_p[i,j], -self.sigma0[layer]/2))
                settlement += layer_settlement

        return settlement

    def calculate_spatial_settlement1(self, x, y, well_locations, Q, t, aquifer_props):
        """
        Calculate spatially variable settlement using finite difference
        solution for confined aquifer flow.
        """
        # Ensure `t` is an array
        if isinstance(t, (int, float)):
            t = [t]
        t = np.array(t)

        dx = x[1] - x[0]  # Assume uniform spacing
        dy = y[1] - y[0]
        nx, ny = len(x), len(y)

        # Aquifer properties
        T = aquifer_props['T']  # Transmissivity
        S = aquifer_props['S']  # Storativity
        dt = t[1] - t[0] if len(t) > 1 else 1.0  # Time step (default to 1.0 if single value)

        # Initialize hydraulic head (h) and source terms (Q)
        h = np.zeros((nx, ny))
        q_grid = np.zeros((nx, ny))
        for (well_x, well_y), q in zip(well_locations, Q):
            i = np.argmin(np.abs(x - well_x))
            j = np.argmin(np.abs(y - well_y))
            q_grid[i, j] += q

        # Finite difference coefficients
        alpha_x = T / (S * dx**2)
        alpha_y = T / (S * dy**2)

        # Time-stepping loop to solve the diffusion equation
        for time_step in t:
            h_new = h.copy()
            for i in range(1, nx - 1):
                for j in range(1, ny - 1):
                    h_new[i, j] = h[i, j] + dt * (
                        alpha_x * (h[i + 1, j] - 2 * h[i, j] + h[i - 1, j]) +
                        alpha_y * (h[i, j + 1] - 2 * h[i, j] + h[i, j - 1]) +
                        q_grid[i, j] / S
                    )
            h = h_new

        # Convert hydraulic head change to drawdown and settlement
        drawdown = np.maximum(0, h.max() - h)  # Positive drawdown
        delta_p = drawdown * 9.81  # Convert drawdown to pressure change

        # Calculate settlement for each layer
        settlement = np.zeros((nx, ny))
        for layer in range(self.num_layers):
            layer_settlement = np.zeros_like(drawdown)
            for i in range(nx):
                for j in range(ny):
                    layer_settlement[i, j] = self._calculate_nonlinear_settlement(
                        layer, delta_p[i, j]
                    )
            settlement += layer_settlement

        return settlement


    def calculate_multilayer_settlement(self, delta_pressures, include_secondary=True,
                                      Ca=0.01, t_secondary=365*10):
        """
        Calculate total settlement for all layers including secondary compression

        Parameters:
        delta_pressures (list): Pressure change for each layer (kPa)
        include_secondary (bool): Include secondary compression
        Ca (float): Secondary compression index
        t_secondary (float): Time for secondary compression calculation (days)

        Returns:
        float: Total settlement (m)
        """
        total_settlement = 0

        for i in range(self.num_layers):
            # Primary consolidation
            settlement = self._calculate_nonlinear_settlement(i, delta_pressures[i])

            # Add secondary compression
            if include_secondary:
                t_p = self.layers[i]**2 / self._calculate_cv(i, self.sigma0[i])  # Time for primary
                secondary = self.layers[i] * Ca * np.log10(t_secondary / t_p)
                settlement += secondary

            total_settlement += settlement

        return total_settlement

    def time_dependent_multilayer(self, t, cv_stress_dependent=True):
        """Calculate time-dependent settlement"""
        settlements = np.zeros((self.num_layers, len(t)))

        for i in range(self.num_layers):
            cv_init = self._calculate_cv(i, float(self.sigma0[i]))

            # Calculate final settlement for this layer
            final_settlement = self._calculate_nonlinear_settlement(
                i, float(self.sigma0[i] * 0.2))  # assumption 20% increase probably should be a parameter

            # Calculate degree of consolidation using simplified Terzaghi solution
            Tv = cv_init * t / (self.layers[i]**2)  # Time factor

            # Calculate U (degree of consolidation)
            U = np.zeros_like(t)
            for idx, tv in enumerate(Tv):
                if tv < 0.2:
                    U[idx] = 2 * np.sqrt(tv / np.pi)
                else:
                    U[idx] = 1 - (8 / (np.pi**2)) * np.exp(-((np.pi**2) / 4) * tv)

            # Apply degree of consolidation to final settlement
            settlements[i] = U * final_settlement

        return np.sum(settlements, axis=0)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
from datetime import datetime, timedelta

def run_subsidence_analysis():
    """Useing the model with sample paramters"""

    layers = [5.0, 10.0, 8.0]
    void_ratios = [0.7, 0.6, 0.9]
    compression_indices = [0.2, 0.1, 0.25]
    recompression_indices = [0.02, 0.01, 0.025]
    OCRs = [1.5, 1.2, 1.3]
    k_values = [1e-4, 1e-3, 1e-4]
    mv_values = [1e-4, 5e-5, 1.2e-4]

    # Calculate initial stresses
    unit_weight = 18
    depths = np.cumsum(layers)
    initial_effective_stresses = [(depth - layer/2) * unit_weight
                                for depth, layer in zip(depths, layers)]

    model = AdvancedSubsidenceModel(
        layers=layers,
        void_ratios=void_ratios,
        compression_indices=compression_indices,
        recompression_indices=recompression_indices,
        OCRs=OCRs,
        k_values=k_values,
        mv_values=mv_values,
        initial_effective_stresses=initial_effective_stresses
    )

    # Reduced pumping rates
    well_locations = [(700, 600), (300, 150), (150, 500)]
    pumping_rates = [-800, -850, -150]

    x = np.linspace(0, 1000, 50)
    y = np.linspace(0, 1000, 50)

    aquifer_props = {
        'T': 100,
        'S': 1e-4
    }

    # Calculate spatial settlement
    settlement_spatial = model.calculate_spatial_settlement(
        x, y, well_locations, pumping_rates, t=365, aquifer_props=aquifer_props
    )
    print(settlement_spatial)
    print(settlement_spatial1)


    # Calculate time-dependent settlement
    t = np.logspace(0, np.log10(365*2), 100)  # 0 to 2 years
    settlement_time = model.time_dependent_multilayer(t, cv_stress_dependent=False)

    # Calculate total settlement
    delta_pressures = [10, 20, 90]
    total_settlement = model.calculate_multilayer_settlement(
        delta_pressures,
        include_secondary=True,
        Ca=0.01,
        t_secondary=365*10
    )

    return {
        'spatial': (x, y, settlement_spatial),
        'temporal': (t, settlement_time),
        'total': total_settlement
    }
# Create visualization
def plot_results(results):
    fig = plt.figure(figsize=(15, 10))

    # Spatial settlement plot
    ax1 = fig.add_subplot()
    x, y, settlement_spatial = results['spatial']
    X, Y = np.meshgrid(x, y)

    # Custom colormap from white to dark blue
    colors = ['#FFFFFF', '#E3F2FD', '#90CAF9', '#2196F3', '#1565C0']
    n_bins = 100
    cmap = LinearSegmentedColormap.from_list("custom", colors, N=n_bins)

    contour = ax1.contourf(X, Y, -settlement_spatial*1000, levels=20, cmap=cmap)
    ax1.set_title('Spatial Distribution of Settlement')
    ax1.set_xlabel('Distance (m)')
    ax1.set_ylabel('Distance (m)')
    plt.colorbar(contour, ax=ax1, label='Settlement (mm)')

    # Time-dependent settlement plot
    # ax2 = fig.add_subplot(122)
    # t, settlement_time = results['temporal']
    # ax2.semilogx(t, -settlement_time*1000)
    # ax2.set_title('Time-Dependent Settlement')
    # ax2.set_xlabel('Time (days)')
    # ax2.set_ylabel('Settlement (mm)')
    # ax2.grid(True)

    plt.tight_layout()
    return fig

def generate_subsidence_report():
    results = run_subsidence_analysis()

    # Print summary statistics
    print("\nSubsidence Analysis Report")
    print("-" * 50)
    print(f"Maximum settlement: {-np.min(results['spatial'][2])*1000:.1f} mm")
    print(f"Average settlement: {-np.mean(results['spatial'][2])*1000:.1f} mm")
    print(f"Total multi-layer settlement: {results['total']*1000:.1f} mm")
    print(f"Settlement after 1 year: {results['temporal'][1][-1]*1000:.1f} mm")

    # Create visualization
    fig = plot_results(results)
    return fig


fig = generate_subsidence_report()