In [None]:
"""
Hybrid Geostatistical–Deep Learning Framework for Geochemical Characterization in Historical Mine Tailings

Author: Keyumars Anvari
Supervisor: Professor Jörg Benndorf
Affiliation: Department of Mine Surveying and Geodesy,
             TU Bergakademie Freiberg, 09599 Freiberg, Germany

Purpose:
--------
This code demonstrates the GCNN–RNN hybrid workflow described in our paper (see Algorithm 1 and Methods).
The example uses synthetic data—replace these with your own geospatial/geochemical samples for real applications.
"""

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
import logging
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from scipy.spatial.distance import cdist
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Input, Conv1D, LSTM, Dense, Dropout, Bidirectional, LeakyReLU
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, Callback
import warnings
warnings.filterwarnings('ignore')

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def set_random_seeds(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    tf.random.set_seed(seed)
set_random_seeds()

# ======== Algorithm 1 Step 1: Data Preparation ========
# (Replace with your own data for research use)
df_known = pd.DataFrame({
    'X': np.random.rand(50),
    'Y': np.random.rand(50),
    'Concentration': np.random.rand(50)
})
df_unknown = pd.DataFrame({
    'X': np.random.rand(20),
    'Y': np.random.rand(20)
})

# ======== Algorithm 1 Step 2: Compute Geostatistical Feature ========
def spherical_variogram(h, nugget, sill, range_):
    h = np.asarray(h)
    gamma = np.where(h <= range_,
                     nugget + (sill - nugget) * (1.5 * (h / range_) - 0.5 * (h / range_) ** 3),
                     sill)
    return gamma

def compute_covariance_matrix(coordinates, variogram_params):
    distances = cdist(coordinates, coordinates)
    gamma = spherical_variogram(distances, variogram_params['nugget'], variogram_params['sill'], variogram_params['range'])
    return variogram_params['sill'] - gamma

variogram_params = {'nugget': 0.1, 'sill': 1.0, 'range': 0.5}
coords_all = pd.concat([df_known[['X', 'Y']], df_unknown[['X', 'Y']]], ignore_index=True)
cov_matrix = compute_covariance_matrix(coords_all.values, variogram_params)
avg_cov = cov_matrix.mean(axis=1)
coords_all['Average_Covariance'] = avg_cov
df_known['Average_Covariance'] = coords_all['Average_Covariance'].iloc[:len(df_known)].values
df_unknown['Average_Covariance'] = coords_all['Average_Covariance'].iloc[len(df_known):].values

# ======== Algorithm 1 Step 3: Build Model Inputs ========
X = df_known[['Concentration', 'X', 'Y', 'Average_Covariance']].values
y = df_known[['Concentration']].values
X_unknown = df_unknown[['X', 'Y', 'Average_Covariance']].values
scaler_X = StandardScaler().fit(X)
scaler_y = StandardScaler().fit(y)
X_scaled = scaler_X.transform(X)
y_scaled = scaler_y.transform(y)
X_unknown_scaled = scaler_X.transform(np.hstack([np.zeros((X_unknown.shape[0], 1)), X_unknown]))

# ======== Algorithm 1 Step 4: Split for Training/Validation ========
X_train, X_val, y_train, y_val = train_test_split(X_scaled, y_scaled, test_size=0.2, random_state=42)
X_train = X_train.reshape(-1, 1, X_train.shape[1])
X_val = X_val.reshape(-1, 1, X_val.shape[1])

# ======== Algorithm 1 Step 5: Define and Train GCNN–RNN Model ========
class NaNDetector(Callback):
    def on_batch_end(self, batch, logs=None):
        loss = logs.get('loss') if logs else None
        if loss is not None and np.isnan(loss):
            logging.error(f"NaN loss detected at batch {batch}. Stopping training.")
            self.model.stop_training = True

model = Sequential([
    Input(shape=(1, X_train.shape[2])),
    Conv1D(filters=32, kernel_size=1, activation='linear', kernel_regularizer=l2(1e-4)),
    LeakyReLU(),
    Dropout(0.2),
    Bidirectional(LSTM(32, return_sequences=False)),
    Dropout(0.2),
    Dense(32, activation='relu'),
    Dense(1, activation='linear')
])
model.compile(optimizer=Adam(1e-4), loss='mse', metrics=['mae'])

callbacks = [
    EarlyStopping(monitor='val_loss', patience=30, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', patience=10, factor=0.5, min_lr=1e-6),
    NaNDetector()
]
history = model.fit(
    X_train, y_train,
    epochs=200,
    batch_size=8,
    validation_data=(X_val, y_val),
    callbacks=callbacks,
    verbose=1
)

# ======== Algorithm 1 Step 6: Predict on Unknowns and Visualize ========
X_unknown_for_pred = X_unknown_scaled.reshape(-1, 1, X_unknown_scaled.shape[1])
pred_unknown_scaled = model.predict(X_unknown_for_pred)
pred_unknown = scaler_y.inverse_transform(pred_unknown_scaled)

plt.figure(figsize=(6,4))
plt.hist(pred_unknown, bins=15, alpha=0.7)
plt.title("Predicted Concentrations (Unknown Points)")
plt.xlabel("Predicted Value")
plt.ylabel("Frequency")
plt.show()

# Optionally: Save predictions
# df_unknown['Predicted_Concentration'] = pred_unknown.flatten()
# df_unknown.to_csv('predictions_demo.csv', index=False)