In [31]:
from ogs6py import ogs
import numpy as np
import ogs6py
import matplotlib.pyplot as plt
import time
import math
import gmsh
import os
import argparse
import re
import time
import csv
import pandas as pd
import glob
import pyvista as pv
from ogstools.msh2vtu import msh2vtu
import shutil
import xml.etree.ElementTree as ET

pi = math.pi
plt.rcParams["text.usetex"] = True

# Swelling


## Problem Description

![Schematic view of hydraulic fracturing problem and Boundary conditions](./figures/Model_propagating_straight.png#one-half "Schematic view of hydraulic fracturing problem and Boundary conditions.")

# Input Data

The simulations were run with the properties listed in the Table below.


| **Name**                       | **Range**                 | **Unit**   | **Symbol** |
|--------------------------------|---------------------------|------------|------------|
| _Young's modulus_              | 500 - 2500                | MPa        | $E$        |
| _Poisson's ratio_              | 0.16 - 0.35               | -          | $\nu$      |
| _Maximum swelling pressure_    | 3.2 - 13                  | MPa        | $\sigma$   |
| _Permeability_                 | 1 $\times$ 10$^{-14}$ - 1 $\times$ 10$^{-12}$ | m$^2$ | $k$        |
| _Entry Pressure_     | 1000 - 3500                 | Pa          | $p_b$   |


# Output Directory  and Project File

In [32]:
# file's name
prj_name = "swelling_staufen.prj"

out_dir = os.environ.get("OGS_TESTRUNNER_OUT_DIR", "_out")
os.makedirs(out_dir, exist_ok=True)

In [33]:
# Find all .vtu files in the current directory
vtu_files = glob.glob("*.vtu")

# Copy each .vtu file to the output directory
for vtu_file in vtu_files:
    shutil.copy(vtu_file, out_dir)

print(f"Copied {len(vtu_files)} .vtu files to {out_dir}")

Copied 10 .vtu files to _out


In [34]:
ogs_path ='/home/reza/OGS/ogs/build/bin'
ogs_util_path ='/home/reza/OGS/ogs/build/bin'

# Post-processing

In [35]:
def calculate_principal_stresses(sigma_xx, sigma_yy, sigma_xy):
    # Calculate principal stresses
    lambda_1 = (sigma_xx + sigma_yy) / 2 + np.sqrt(((sigma_xx - sigma_yy) / 2) ** 2 + sigma_xy ** 2)
    lambda_2 = (sigma_xx + sigma_yy) / 2 - np.sqrt(((sigma_xx - sigma_yy) / 2) ** 2 + sigma_xy ** 2)
    lambda_3 = np.zeros_like(lambda_1)  # sigma_zz = 0
    
    # Calculate maximum principal stress
    sigma_max = np.max(np.abs([lambda_1, lambda_2, lambda_3]), axis=0)
    return sigma_max

def calculate_principal_strains(epsilon_xx, epsilon_yy, epsilon_xy):
    # Calculate principal strains
    lambda_1 = (epsilon_xx + epsilon_yy) / 2 + np.sqrt(((epsilon_xx - epsilon_yy) / 2) ** 2 + epsilon_xy ** 2)
    lambda_2 = (epsilon_xx + epsilon_yy) / 2 - np.sqrt(((epsilon_xx - epsilon_yy) / 2) ** 2 + epsilon_xy ** 2)
    lambda_3 = np.zeros_like(lambda_1)  # In 2D, assuming epsilon_zz = 0
    
    # Calculate maximum principal strain
    epsilon_max = np.max(np.abs([lambda_1, lambda_2, lambda_3]), axis=0)
    return epsilon_max

def calculate_displacement_magnitude(disp_x, disp_y):
    disp_magnitude = np.sqrt(disp_x ** 2 + disp_y ** 2)
    return disp_magnitude

In [36]:
def post_processing(prefix, E, nu, K, p_b, swelling_stress_rate):
    reader = pv.get_reader(f"{out_dir}/{prefix}.pvd")

    data = []
    for time_value in reader.time_values:
        reader.set_active_time_value(time_value)
        mesh = reader.read()[0]
        
        xs = mesh.points[:, 0]
        ys = mesh.points[:, 1]
        porosity = mesh.point_data["porosity"]
        saturation = mesh.point_data["saturation"]
        disp = mesh.point_data["displacement"]
        sigma = mesh.point_data["sigma"]
        epsilon = mesh.point_data["epsilon"]

        disp_x = disp[:, 0]
        disp_y = disp[:, 1]
        sigma_xx = sigma[:, 0]
        sigma_yy = sigma[:, 1]
        sigma_zz = sigma[:, 2]
        sigma_xy = sigma[:, 3]
        epsilon_xx = epsilon[:, 0]
        epsilon_yy = epsilon[:, 1]
        epsilon_zz = epsilon[:, 2]
        epsilon_xy = epsilon[:, 3]

        epsilon_max = calculate_principal_strains(epsilon_xx, epsilon_yy, epsilon_xy)
        sigma_max = calculate_principal_stresses(sigma_xx, sigma_yy, sigma_xy)
        disp_magnitude = calculate_displacement_magnitude(disp_x, disp_y)

        E_values = [E] * len(xs)
        nu_values = [nu] * len(xs)
        K_values = [K] * len(xs)
        p_b_values = [p_b] * len(xs)
        swelling_stress_rate_values = [swelling_stress_rate] * len(xs)

        for i in range(len(xs)):
            data.append([
                time_value, E_values[i], nu_values[i], K_values[i], p_b_values[i], 
                swelling_stress_rate_values[i], porosity[i], saturation[i], xs[i], ys[i], 
                disp_magnitude[i], epsilon_max[i], sigma_max[i]
            ])

    with open(f'{out_dir}/{prefix}.csv', 'w', newline='') as csvfile:
        csvwriter = csv.writer(csvfile)
        csvwriter.writerow([
            'time_value', 'E', 'nu', 'K', 'p_b', 'swelling_stress_rate', 
            'porosity', 'saturation', 'xs', 'ys', 'disp_magnitude', 
            'epsilon_max', 'sigma_max'
        ])
        csvwriter.writerows(data)

In [37]:
class OGSModel:
    def __init__(self, prj_file):
        self.prj_file = prj_file
        self.tree = ET.parse(prj_file)
        self.root = self.tree.getroot()

    def _get_root(self):
        return self.root

    def _get_parameter_pointer(self, root, name, parameterpath):
        for parameter in root.findall(parameterpath):
            if parameter.find('name').text == name:
                return parameter
        return None

    def _set_type_value(self, parameterpointer, value, valuetag="values"):
        if parameterpointer is not None:
            value_element = parameterpointer.find(valuetag)
            if value_element is not None:
                value_element.text = ' '.join(map(str, value))
            else:
                ET.SubElement(parameterpointer, valuetag).text = ' '.join(map(str, value))

    def replace_parameter(self, name=None, value=None, valuetag="values"):
        root = self._get_root()
        parameterpath = ".//parameter"
        parameterpointer = self._get_parameter_pointer(root, name, parameterpath)
        self._set_type_value(parameterpointer, value, valuetag=valuetag)

    def replace_property_value(self, media_id, phase_type, property_name, values, valuetag="values"):
        root = self._get_root()
        for medium in root.findall(".//media/medium"):
            if medium.get('id') == media_id:
                for phase in medium.findall(".//phase"):
                    if phase.find('type').text == phase_type:
                        for property in phase.findall(".//property"):
                            if property.find('name').text == property_name:
                                target_element = property.find(valuetag)
                                if target_element is not None:
                                    target_element.text = ' '.join(map(str, values))
                                else:
                                    ET.SubElement(property, valuetag).text = ' '.join(map(str, values))

    def replace_property_value_general(self, xpath_query, value):
        root = self._get_root()
        target_element = root.find(xpath_query)
        if target_element is not None:
            target_element.text = str(value)

    def save(self, output_file):
        self.tree.write(output_file)


# Run the Simulation 


In [38]:
import pyvista as pv
pv.set_plot_theme("document")
if "PYVISTA_HEADLESS" in os.environ:
    pv.start_xvfb()
pv.set_jupyter_backend("static")

def swelling_numerical(E, nu, K, swelling_pressure, p_b, out_dir):
    prefix = f"swelling_E{E[0]}_nu{nu[0]}_K{K[0]}_p_b{p_b}_S{swelling_pressure[0]}"
    Screenoutput = f"log_swelling_E{E[0]}_nu{nu[0]}_K{K[0]}_p_b{p_b}_S{swelling_pressure[0]}"
    print(prefix)
    print(Screenoutput)
    
    model = OGSModel(prj_name)
    model.replace_parameter(name="E_l1", value=E)
    model.replace_parameter(name="nu1", value=nu)
    model.replace_parameter(name="permeability_l1", value=K) 
    model.replace_property_value_general(".//media/medium[@id='1']//property[name='saturation']//p_b", p_b)  # Ensure p_b is used as a scalar
    model.replace_property_value(media_id="1", phase_type="Solid", property_name="swelling_stress_rate", values=swelling_pressure, valuetag="swelling_pressures")

    model.save(f"{out_dir}/{prj_name}")
    print(f"Saved {prj_name}")
    
    model = ogs.OGS(INPUT_FILE=f"{out_dir}/{prj_name}", PROJECT_FILE=f"{out_dir}/{prj_name}", MKL=True, args=f"-o {out_dir}")
    model.replace_text(prefix, xpath="./time_loop/output/prefix")
    model.write_input() 

    t0 = time.time()
    print(">>> OGS started execution ... <<<")
    !{ogs_path}/ogs {out_dir}/{prj_name} -o {out_dir} > {out_dir}/{Screenoutput}
    tf = time.time()
    print(">>> OGS terminated execution  <<< Elapsed time: ", round(tf - t0, 2), " s.")
    
    print("Post-processing")
    post_processing(prefix, E[0], nu[0], K[0], p_b, swelling_pressure[0])

In [39]:
# Most probable values
most_probable_values = {
    'Young’s modulus (E)': 1000,
    'Poisson’s ratio (ν)': 0.2,
    'Maximum swelling pressure (σ)': 8,
    'Permeability (k)': 8e-13,
    'Air entry pressure (p_b)': 2000
}

# Ranges
ranges = {
    'Young’s modulus (E)': (500, 2500),
    'Poisson’s ratio (ν)': (0.16, 0.35),
    'Maximum swelling pressure (σ)': (3.2, 13),
    'Permeability (k)': (1e-14, 1e-12),
    'Air entry pressure (p_b)': (1000, 3500)
}

# Standard deviations as 1/3th of the range
std_devs = {param: (max_val - min_val) / 3 for param, (min_val, max_val) in ranges.items()}

def generate_values(mean, std_dev, min_val, max_val, num_values=35):
    values = np.random.normal(loc=mean, scale=std_dev, size=num_values-2)
    values = np.clip(values, min_val, max_val)
    values = np.concatenate(([min_val], values, [max_val]))
    values = np.unique(values)
    return np.sort(values)

# Generate 30 values for each parameter
generated_values = {}
for param, mean in most_probable_values.items():
    std_dev = std_devs[param]
    min_val, max_val = ranges[param]
    values = generate_values(mean, std_dev, min_val, max_val, num_values=30)
    generated_values[param] = values

# Convert values to Pa
swelling_pressure_values = (generated_values['Maximum swelling pressure (σ)'] * 1e6).tolist()
young_modulus_values = (generated_values['Young’s modulus (E)']* 1e6).tolist()
air_pressure_values = (generated_values['Air entry pressure (p_b)']).tolist()


# Create parameter sets from generated values
parameters = {
    "E": [[val, val, val] for val in young_modulus_values], 
    "nu": [[val, val, val] for val in generated_values["Poisson’s ratio (ν)"].tolist()], 
    "swelling_pressure": [[val, val, val] for val in swelling_pressure_values], 
    "K": [[val, val] for val in generated_values["Permeability (k)"].tolist()], 
    "p_b": [val for val in air_pressure_values], 

}

# Display the generated values for verification
for param, values in parameters.items():
    print(f"{param}:")
    if param == "p_b":
        for value in values:
            print(value)
    else:
        for value in values:
            print(" ".join(map(str, value)))
    print()


E:
500000000.0 500000000.0 500000000.0
558130765.6577073 558130765.6577073 558130765.6577073
680426909.9618686 680426909.9618686 680426909.9618686
738756960.0803174 738756960.0803174 738756960.0803174
748135403.8484746 748135403.8484746 748135403.8484746
773842967.3908312 773842967.3908312 773842967.3908312
801391386.6401898 801391386.6401898 801391386.6401898
856827745.7388512 856827745.7388512 856827745.7388512
1059210505.8510258 1059210505.8510258 1059210505.8510258
1093605171.123319 1093605171.123319 1093605171.123319
1126919564.8654525 1126919564.8654525 1126919564.8654525
1160202480.5444398 1160202480.5444398 1160202480.5444398
1413573744.6029289 1413573744.6029289 1413573744.6029289
1452143783.623852 1452143783.623852 1452143783.623852
1702363837.9829545 1702363837.9829545 1702363837.9829545
1736599991.4498615 1736599991.4498615 1736599991.4498615
1737120891.3143663 1737120891.3143663 1737120891.3143663
1816891336.6483352 1816891336.6483352 1816891336.6483352
1972936029.5342875 

In [None]:
# Most probable values for all parameters
most_probable_E = [1000e6, 1000e6, 1000e6]  # Convert to Pa
most_probable_nu = [0.2, 0.2, 0.2]
most_probable_swelling_pressure = [8e6, 8e6, 8e6]  # Convert to Pa
most_probable_K = [8e-13, 8e-13]
most_probable_p_b = 2000

# Display the parameter combinations
print("=====================================")
print("E (MPa)\t\tν\t\tK (m²)\t\tSwelling Pressure (Pa)\t\tAir entry pressure (Pa)")
print("=====================================")

# Vary E
for E in parameters["E"]:
    print(f"{E[0]:.2f}\t\t{most_probable_nu[0]:.2f}\t\t{most_probable_K[0]:.2e}\t\t{most_probable_swelling_pressure[0]:.2e}\t\t{most_probable_p_b:.2e}")
    swelling_numerical(E, most_probable_nu, most_probable_K, most_probable_swelling_pressure, most_probable_p_b, out_dir)

# Vary nu
for nu in parameters["nu"]:
    print(f"{most_probable_E[0]:.2f}\t\t{nu[0]:.2f}\t\t{most_probable_K[0]:.2e}\t\t{most_probable_swelling_pressure[0]:.2e}\t\t{most_probable_p_b:.2e}")
    swelling_numerical(most_probable_E, nu, most_probable_K, most_probable_swelling_pressure, most_probable_p_b, out_dir)

# Vary swelling_pressure
for swelling_pressure in parameters["swelling_pressure"]:
    print(f"{most_probable_E[0]:.2f}\t\t{most_probable_nu[0]:.2f}\t\t{most_probable_K[0]:.2e}\t\t{swelling_pressure[0]:.2e}\t\t{most_probable_p_b:.2e}")
    swelling_numerical(most_probable_E, most_probable_nu, most_probable_K    , swelling_pressure, most_probable_p_b, out_dir)

# Vary K
for K in parameters["K"]:
    print(f"{most_probable_E[0]:.2f}\t\t{most_probable_nu[0]:.2f}\t\t{K[0]:.2e}\t\t{most_probable_swelling_pressure[0]:.2e}\t\t{most_probable_p_b:.2e}")
    swelling_numerical(most_probable_E, most_probable_nu, K, most_probable_swelling_pressure, most_probable_p_b, out_dir)

# Vary p_b
for p_b in parameters["p_b"]:
    print(f"{most_probable_E[0]:.2f}\t\t{most_probable_nu[0]:.2f}\t\t{most_probable_K[0]:.2e}\t\t{most_probable_swelling_pressure[0]:.2e}\t\t{p_b:.2e}")
    swelling_numerical(most_probable_E, most_probable_nu, most_probable_K, most_probable_swelling_pressure, p_b, out_dir)


E (MPa)		ν		K (m²)		Swelling Pressure (Pa)		Air entry pressure (Pa)
500000000.00		0.20		8.00e-13		8.00e+06		2.00e+03
swelling_E500000000.0_nu0.2_K8e-13_p_b2000_S8000000.0
log_swelling_E500000000.0_nu0.2_K8e-13_p_b2000_S8000000.0
Saved swelling_staufen.prj
>>> OGS started execution ... <<<


# Combine  all valid CSV files into a single CSV file

In [None]:
# Combine all CSV files
csv_files = glob.glob(f"{out_dir}/*.csv")
dataframes = []

for file in csv_files:
    try:
        data = pd.read_csv(file)
        if not data.empty:
            dataframes.append(data)
        else:
            print(f"File '{file}' is empty. Skipping...")
    except pd.errors.EmptyDataError:
        print(f"File '{file}' is empty or has formatting issues. Skipping...")

if dataframes:
    combined_data = pd.concat(dataframes, ignore_index=True)
    combined_data.to_csv("combined_data.csv", index=False)
    print("CSV files successfully combined and saved as 'combined_data.csv'")
else:
    print("No valid CSV files found or all files are empty.")