Collision

Notebook for collision modeling in ASPECT 2026

In [None]:
import os, sys, shutil, math
import numpy as np
from shutil import rmtree, copy
from matplotlib import pyplot as plt
from matplotlib import gridspec, cm
from PIL import Image, ImageDraw, ImageFont
from scipy.interpolate import interp1d
from scipy.interpolate import UnivariateSpline
import datetime
import subprocess

# Include this pakage
HaMaGeoLib_DIR = "/home/lochy/ASPECT_PROJECT/HaMaGeoLib"
if os.path.abspath(HaMaGeoLib_DIR) not in sys.path:
    sys.path.append(os.path.abspath(HaMaGeoLib_DIR))
from hamageolib.utils.exception_handler import my_assert
import hamageolib.utils.plot_helper as plot_helper

# Working directories
local_Collision_dir = "/mnt/lochy/ASPECT_DATA/Collision0" # data directory
# remote_ThDSubduction_dir = "peloton:/group/billengrp-mpi-io/lochy/ThDSubduction"
assert(os.path.isdir(local_Collision_dir))

# py_temp file and temperature results directory
py_temp_dir = os.path.join(HaMaGeoLib_DIR, "py_temp_files")
RESULT_DIR = os.path.join(HaMaGeoLib_DIR, 'results')
os.makedirs(py_temp_dir, exist_ok=True) # Ensure the directory exists

# paraview scripts directory
SCRIPT_DIR = os.path.join(HaMaGeoLib_DIR, "scripts")

today_date = datetime.datetime.today().strftime("%Y-%m-%d") # Get today's date in YYYY-MM-DD format
py_temp_file = os.path.join(py_temp_dir, f"py_temp_{today_date}.sh")

if not os.path.exists(py_temp_file):
    bash_header = """#!/bin/bash
# =====================================================
# Script: py_temp.sh
# Generated on: {date}
# Description: Temporary Bash script created by Python
# =====================================================

""".format(date=today_date)
    with open(py_temp_file, "w") as f:
        f.write(bash_header)

print(f"File ensured at: {py_temp_file}")

# Create Cases

In [None]:
# todo_config
from gdmate.aspect.config_engine import RuleEngine
from gdmate.aspect.io import parse_parameters_to_dict, save_parameters_from_dict
from hamageolib.research.haoyuan_collision0.config import Collison0Config, GeometryRule

case_dir = "/mnt/lochy/ASPECT_DATA/Collision0/collision_test/test0"
if not os.path.isdir(case_dir):
    os.mkdir(case_dir)


template_prm = os.path.join(HaMaGeoLib_DIR, "hamageolib/research/haoyuan_collision0/files/01112026", "post_compressible_test.prm")
case_prm = os.path.join(case_dir, "case.prm")

with open(template_prm, "r") as fin:
    prm_dict = parse_parameters_to_dict(fin)
wb_dict = None

ruleEngine = RuleEngine([GeometryRule()])
config = Collison0Config()
ruleEngine.validate(config)
contexts = ruleEngine.apply_all(config, prm_dict, wb_dict)

# write case prm file
with open(case_prm, "w") as fout:
    save_parameters_from_dict(fout, prm_dict)
print("Saved prm file %s" % case_prm)

# Cases

We now begin processing the simulation cases.
The first step is to enumerate the case directories and confirm that all specified paths exist on the filesystem.

Options:

* do_post_process - control the run of the following code blocks


In [None]:
do_post_process = True

In [None]:
if do_post_process:
    # placeholder for a 3-D case
    case_name = None 
    # Initial test from Fritz
    case_name_2d = "collision_test/Fritz_test0"; prm_basename_2d = "init_compressible_test.prm"; wb_basename_2d = "original.wb"; output_directory="isentropic_adiabat_new_boundary"

    local_dir = None; local_dir_2d = None
    if case_name is not None:
        local_dir = os.path.join(local_Collision_dir, case_name)
        assert(os.path.isdir(local_dir))
        print("local_dir:\n\t", local_dir)
        subprocess.run(['mkdir', '-p', '%s/img/pv_outputs' % local_dir])
    if case_name_2d is not None:
        local_dir_2d = os.path.join(local_Collision_dir, case_name_2d)
        assert(os.path.isdir(local_dir_2d))
        print("local_dir_2d:\n\t", local_dir_2d)
        subprocess.run(['mkdir', '-p', '%s/img/pv_outputs' % local_dir_2d])

## Generate Paraview script

We apply a combined workflow with **ParaView**.  
In this notebook, we generate the processing script.  
We then use ParaView to refine styling and export the final plots.

options

- `is_prepare_for_plot` — Generate the **ParaView** Python script (no data pre-processing).  
- `is_process_pyvista_for_plot` — Pre-process results with **PyVista** before exporting the ParaView script, then proceed in ParaView.


In [None]:
is_prepare_for_plot = True
is_process_pyvista_for_plot = False

In [None]:
if do_post_process and is_prepare_for_plot:

    from hamageolib.research.haoyuan_collision0.case_options import CASE_OPTIONS_TWOD
    from hamageolib.research.haoyuan_collision0.post_process import ProcessVtuFileTwoDStep, GenerateParaviewScript

    # check again case directory exists    
    assert(local_dir_2d is not None)

    # options 
    graphical_step = 29

    # original script
    ofile_list = ["collision0.py"]; require_base=True

    # automatically read case configurations
    Case_Options_2d = CASE_OPTIONS_TWOD(local_dir_2d, case_file=prm_basename_2d, wb_basename=wb_basename_2d, output_directory=output_directory)
    Case_Options_2d.Interpret(step=graphical_step)
    Case_Options_2d.SummaryCaseVtuStep(os.path.join(local_dir_2d, "summary.csv"))

    # the index of the vtu file depends on whether all the intial adaptive steps are output
    pvtu_step = Case_Options_2d.get_pvtu_step(graphical_step)

    ProcessVtuFileTwoDStep(local_dir_2d, pvtu_step, Case_Options_2d)

    GenerateParaviewScript(local_dir_2d, Case_Options_2d, ofile_list, require_base=require_base)

## Automize figure finalization

The figures generated from the previous **ParaView** script can be finalized using the following code block.
We would use a "frame" figure generated in Adobe illustrator and overlay that on the ParaView plot.

Options:
- `file_name` — Name of the plot to finalize.
- `_time` — Model time associated with the plot.


In [None]:
# todo_collision
finalize_visual = False

if do_post_process and finalize_visual:
    
    from hamageolib.research.haoyuan_collision0.post_process import finalize_visualization_2d_11022025 

    # options for file_name and time
    file_name = "full_domain_viscosity" # "full_domain_viscosity", "full_model_temperature"
    _time = 2.9014e+06

    frame_png_file_with_ticks = "/home/lochy/Documents/papers/documented_files/collision/full_domain_frame_trans-01.png"
    output_image_file = finalize_visualization_2d_11022025(local_dir_2d, file_name, _time, frame_png_file_with_ticks, add_time=False)


## Animation

### 2-d case, basic

First generate the plotting scripts, then run them in the terminal. Assemble the final animation only after all figures have been generated. In practice, this section may need to be executed multiple times.


In [None]:
animate_2d_case_basic = True
debug_step0_animate_2d_case_basic = False
generate_paraview_scripts_for_animate_2d_case_basic = True

if animate_2d_case_basic:
    
    from hamageolib.research.haoyuan_collision0.case_options import CASE_OPTIONS_TWOD
    from hamageolib.research.haoyuan_collision0.post_process import ProcessVtuFileTwoDStep, GenerateParaviewScript

    # Assign a time interval for animation
    time_interval = 0.5e6
    animation_name= "ani_basic"
    max_depth = "1500"

    # Apply case options
    Case_Options_2d = CASE_OPTIONS_TWOD(local_dir_2d, case_file=prm_basename_2d, wb_basename=wb_basename_2d, output_directory=output_directory)
    Case_Options_2d.Interpret()
    # Case_Options_2d.SummaryCaseVtuStep(os.path.join(local_dir_2d, "summary.csv"))
    # Case_Options_2d.SummaryCaseVtuStepExport(os.path.join(local_dir_2d, "summary.csv"))
    resampled_df = Case_Options_2d.resample_visualization_df(time_interval)
    graphical_steps = resampled_df["Vtu step"].values

#### Generate **ParaView** scripts stepwise

Optional: generate stepwise **ParaView** scripts controlled by `generate_paraview_scripts_for_animate_2d_case_basic` option.
After this step, run `py_temp.py` script in a terminal to produce the visualizations.

In [None]:
if animate_2d_case_basic and generate_paraview_scripts_for_animate_2d_case_basic: 

    # Open py_temp_file for output
    fout = open(py_temp_file, 'w')
    assert(fout)
    fout.write("#!/bin/bash\n")

    # Run stepwise
    print("Start generating paraview scripts")
    for i, _time in enumerate(resampled_df["Time"].values):

        # debug run step 0
        if debug_step0_animate_2d_case_basic:
            if i > 0:
                break

        # Stepwise configurations 
        _time = float(_time)
        time_rounded = round(_time / float(resampled_df.attrs["Time between graphical output"]))\
              * float(resampled_df.attrs["Time between graphical output"])
        step = int(graphical_steps[i])
        print("\tGenerating paraview scripts for step = %d, time = %.4e" % (step, time_rounded))

        # Assign the script to use
        py_script = "collision0.py"

        # Make the directory to hold the scripts
        ps_dir = os.path.join(local_dir_2d, 'paraview_scripts')
        if not os.path.isdir(ps_dir):
            os.mkdir(ps_dir) 
        odir = os.path.join(ps_dir, "stepwise")
        if not os.path.isdir(odir):
            os.mkdir(odir)

        # Apply stepwise configuration
        Case_Options_2d.options['GRAPHICAL_STEPS'] = [step]
        Case_Options_2d.options['GRAPHICAL_TIMES'] = [time_rounded]

        ofile = os.path.join(odir, 'slab_%d.py' % (step))
        paraview_script = os.path.join(SCRIPT_DIR, 'paraview_scripts', 'Collision0', py_script)
        paraview_script_base = os.path.join(SCRIPT_DIR, 'paraview_scripts', 'base.py')
        Case_Options_2d.read_contents(paraview_script_base, paraview_script)

        # Save script
        Case_Options_2d.substitute()
        Case_Options_2d.save(ofile)

        # Write to py_temp file
        fout.write("pvpython %s\n" % ofile)

    # Finish writting to py_temp file
    fout.close()
    subprocess.run(["chmod", "+x", py_temp_file])
    print("saved file: %s" % py_temp_file)

#### Finalize plot from **ParaView**

Ensure the previous step has completed. Use the following cell to finalize plots generated by **ParaView**.

In [None]:
if animate_2d_case_basic:
    
    from hamageolib.research.haoyuan_collision0.post_process import finalize_visualization_2d_11022025

    # file types
    file_name_list = ["full_domain_viscosity", "full_domain_temperature", "full_domain_density"]

    print("Start Finalizing Plots")
    for i, _time in enumerate(resampled_df["Time"].values):

        # debug run step 0
        if debug_step0_animate_2d_case_basic:
            if i > 0:
                break

        # Stepwise configurations 
        _time = resampled_df["Time"].values[i]
        time_rounded = round(_time / float(resampled_df.attrs["Time between graphical output"]))\
              * float(resampled_df.attrs["Time between graphical output"])
        step = graphical_steps[i]
        print("\tFinalizing plots for step = %d, time = %.4e" % (step, time_rounded))

        for file_name in file_name_list:
            frame_png_file_with_ticks = "/home/lochy/Documents/papers/documented_files/collision/full_domain_frame_trans_frame-01.png"
            output_image_file = finalize_visualization_2d_11022025(local_dir_2d, file_name, time_rounded, frame_png_file_with_ticks, add_time=False) 


#### Assemble and make animation

Using the generated figures (e.g., linear plots and finalized figures from **ParaView**):

1. Assemble them stepwise and export one combined figure per step.
2. Create an `.avi` animation from the combined figures.


In [None]:
if animate_2d_case_basic:

    print("Start making animation")

    # Load modules
    from hamageolib.research.haoyuan_2d_subduction.workflow_scripts import create_avi_from_images

    # Initiation
    prep_dir = os.path.join(local_dir_2d, "img", "prep")
    ani_file_paths = [] # path of the figures

    # Loop the steps to get job done
    for i, _time in enumerate(resampled_df["Time"].values):

        # debug run step 0
        if debug_step0_animate_2d_case_basic:
            if i > 0:
                break

        # do this if there is error at the last step
        if i == resampled_df["Time"].values.size - 1:
            break

        # Stepwise configurations 
        _time = resampled_df["Time"].values[i]
        time_rounded = round(_time / float(resampled_df.attrs["Time between graphical output"]))\
              * float(resampled_df.attrs["Time between graphical output"])
        step = graphical_steps[i]
        print("\tAssembling plots for step = %d, time = %.4e" % (step, time_rounded))

        # File paths
        image_files = []; image_positions=[]; cropping_regions=[]; image_scale_factors=[]

        # viscosity slice
        file_path_0 = os.path.join(prep_dir, "%s_t%.4e.png" % (file_name_list[0], time_rounded))
        assert(os.path.isfile(file_path_0))
        image_files.append(file_path_0)
        image_positions.append((0, 100)) 
        cropping_regions.append(None)
        image_scale_factors.append(1.5)
        # viscosity colorbar
        file_path_0_c = "/home/lochy/Documents/papers/documented_files/collision/color_viscosity_18_24-01.png"
        assert(os.path.isfile(file_path_0_c))
        image_files.append(file_path_0_c)
        image_positions.append((300, 650)) 
        cropping_regions.append(None)
        image_scale_factors.append(1.5)

        # temperature slice
        file_path_0 = os.path.join(prep_dir, "%s_t%.4e.png" % (file_name_list[1], time_rounded))
        assert(os.path.isfile(file_path_0))
        image_files.append(file_path_0)
        image_positions.append((1500, 100)) 
        cropping_regions.append(None)
        image_scale_factors.append(1.5)
        # temperature colorbar
        file_path_2_c = "/home/lochy/Documents/papers/documented_files/collision/color_temperature_0_2000.png"
        assert(os.path.isfile(file_path_2_c))
        image_files.append(file_path_2_c)
        image_positions.append((1900, 700)) 
        cropping_regions.append(None)
        image_scale_factors.append(1.5)

        # density slice
        file_path_0 = os.path.join(prep_dir, "%s_t%.4e.png" % (file_name_list[2], time_rounded))
        assert(os.path.isfile(file_path_0))
        image_files.append(file_path_0)
        image_positions.append((0, 900)) 
        cropping_regions.append(None)
        image_scale_factors.append(1.5)
        # density colorbar
        file_path_2_c = "/home/lochy/Documents/papers/documented_files/collision/color_density_3000_4000-01.png"
        assert(os.path.isfile(file_path_2_c))
        image_files.append(file_path_2_c)
        image_positions.append((300, 1450)) 
        cropping_regions.append(None)
        image_scale_factors.append(1.5)

        # Combine images
        output_image_file = os.path.join(prep_dir, "%s_t%.4e.png" % (animation_name, _time))
        # Remove existing output image to ensure a clean overlay
        if os.path.isfile(output_image_file):
            os.remove(output_image_file)
        # Call overlay function
        plot_helper.overlay_images_on_blank_canvas(
            canvas_size=(3000, 1700),  # Size of the blank canvas in pixels (width, height)
            image_files=image_files,  # List of image file paths to overlay
            image_positions=image_positions,  # Positions of each image on the canvas
            cropping_regions=cropping_regions,  # Optional cropping regions for the images
            image_scale_factors=image_scale_factors,  # Scaling factors for resizing the images
            output_image_file=output_image_file  # Path to save the final combined image
        )

        # Add time stamp
        text = "t = %.1f Ma" % (time_rounded / 1e6)  # Replace with the text you want to add
        position = (25, 0)  # Replace with the desired text position (x, y)
        font_path = "/usr/share/fonts/truetype/msttcorefonts/times.ttf"  # Path to Times New Roman font
        font_size = 56

        plot_helper.add_text_to_image(output_image_file, output_image_file, text, position, font_path, font_size)

        ani_file_paths.append(output_image_file)

    # Generate animation
    if not debug_step0_animate_2d_case_basic:
        ani_dir = os.path.join(local_dir_2d, "img", "animation")
        if not os.path.isdir(ani_dir):
            os.mkdir(ani_dir)
        output_file = os.path.join(local_dir_2d, "img", "animation", "%s.avi" % animation_name)
        create_avi_from_images(ani_file_paths, output_file, 1)