# Digital Twins in Cardiology Tutorial

In [2]:
# Import libraries that get used in multiple cells
import time
from warnings import warn
import numpy as np

In [3]:
# LOAD FUNCTIONS
from conduction_system import EmptyConductionSystem, PurkinjeSystemVC
from ecg_functions import PseudoQRSTetFromStepFunction
from geometry_functions import EikonalGeometry
from propagation_models import EikonalDjikstraTet
from simulator_functions import SimulateECGwithLATmax, SimulateEP
from adapter_theta_params import AdapterThetaParams, RoundTheta
from discrepancy_functions import DiscrepancyHealthyQRS
from evaluation_functions import ParameterSimulator
from electrophysiology_functions import ElectrophysiologyUpstrokeStepFunction
from cellular_models import StepFunctionUpstrokeEP
from path_config import get_path_mapping
from io_functions import save_dictionary, write_geometry_to_ensight_with_fields, write_purkinje_vtk, \
        write_root_node_csv
from utils import get_vc_rt_name, \
    get_vc_ab_cut_name, get_fibre_speed_name, get_sheet_speed_name, \
    get_normal_speed_name, get_endo_dense_speed_name, get_endo_sparse_speed_name, \
    get_purkinje_speed_name, get_unique_lead_name_list, get_xyz_name_list, unfold_ecg_matrix
from postprocess_functions import visualise_ecg

In [4]:
import os

# Load the path configuration in the current server
if os.path.isfile('../.custom_config/.your_path_mapping.txt'):
    path_dict = get_path_mapping()
else:
    raise Exception('Missing data and results configuration file at: ../.custom_config/.your_path_mapping.txt')

In [5]:
# Step 0: Set the random seed for Reproducibility:
random_seed_value = 7  # Ensures reproducibility and turns off stochasticity
np.random.seed(seed=random_seed_value)  # Ensures reproducibility and turns off stochasticity

In [6]:
# Step 1: Define paths and other environment variables.
# Define the subject name used to navigate the file system
anatomy_subject_name = 'DTI004'
ecg_subject_name = 'DTI004'
print('anatomy_subject_name: ', anatomy_subject_name)
print('ecg_subject_name: ', ecg_subject_name)
# General settings:
source_resolution = 'coarse'
verbose = False
# Input Paths:
data_dir = path_dict["data_path"]
clinical_data_filename = 'clinical_data/' + ecg_subject_name + '_clinical_qrs_ecg.csv'
clinical_data_filename_path = data_dir + clinical_data_filename
geometric_data_dir = data_dir + 'geometric_data/'
# Output Paths:
ep_model_qrs = 'stepFunction'
# Module names:
propagation_module_name = 'propagation_module'
electrophysiology_module_name = 'electrophysiology_module'

anatomy_subject_name:  DTI004
ecg_subject_name:  DTI004


In [7]:
# Step 2: Create Cellular Electrophysiology model. In this case, it will use a step function as the AP's upstroke.
print('Step 2: Create Cellular Electrophysiology model, using a ToROrd APD dictionary.')
# Arguments for cellular model:
resting_vm_value = 0.
upstroke_vm_value = 1.
if ep_model_qrs == 'stepFunction':
    cellular_model = StepFunctionUpstrokeEP(resting_vm_value=resting_vm_value, upstroke_vm_value=upstroke_vm_value,
                                            verbose=verbose)
else:
    raise Exception('Uncontrolled cellular model for the inference of the activation properties from QRS signals!')

Step 2: Create Cellular Electrophysiology model, using a ToROrd APD dictionary.


In [None]:
t_start = time.time()
# Step 3: Generate an Eikonal-friendly geometry.
print('Step 3: Generate a cardiac geometry that can run the Eikonal.')
# Argument setup
vc_ab_cut_name = get_vc_ab_cut_name()
vc_rt_name = get_vc_rt_name()
vc_name_list = [vc_ab_cut_name, vc_rt_name]
# Only one celltype/no-celltype, because its using a step function as an action potential.
celltype_vc_info = {}
# Create geometry with a dummy conduction system to allow initialising the geometry.
print('This step can take up to 6 min')
geometry = EikonalGeometry(cellular_model=cellular_model, celltype_vc_info=celltype_vc_info,
                           conduction_system=EmptyConductionSystem(verbose=verbose),
                           geometric_data_dir=geometric_data_dir, resolution=source_resolution,
                           subject_name=anatomy_subject_name, vc_name_list=vc_name_list, verbose=verbose)
print('The loading of the geometry and generation of the cardiac conduction system took ', round((time.time()-t_start)/60., 2), ' min.')

Step 3: Generate a cardiac geometry that can run the Eikonal.
This step can take up to 6 min


  warn('This part of the code needs to be deleted! Ruben already fixed this for future usage of the code! 19/01/2024')  # TODO DELETE LINE
(Error is small) Ranges of ventricular field ab_cut are not as expected!
Expected range was [0.0, 1.0], given values ranged from 0.000528066 to 1.0
  warn('Careful! Ventricular field ' + vc_name + ' has an error in the ranges, but within the tolerace!'
  warn('Please!! Urgently, come to function check_vc_field_ranges in utils.py and uncomment the following call')
(Error too large) Ranges of ventricular field rt are not as expected!
Expected range was [-1.0, 1.0], given values ranged from 1.70142e-06 to 0.999991
  warn('\n(Error too large) Ranges of ventricular field ' + vc_name + ' are not as expected!\nExpected range was '


Remomve this many nodes: 0
Remomve this many nodes: 0
Running in single thread!


In [None]:
# Step 4: Create conduction system for the propagation model to be initialised.
print('Step 4: Create rule-based Purkinje network using ventricular coordinates.')
# Arguments for Conduction system:
approx_djikstra_purkinje_max_path_len = 200
lv_inter_root_node_distance = 3.  # 1.5 cm    # TODO: Calibrate this hyper-parameter using sensitivity analysis
rv_inter_root_node_distance = 3.  # 1.5 cm    # TODO: Calibrate this hyper-parameter using sensitivity analysis
# Create conduction system
conduction_system = PurkinjeSystemVC(
    approx_djikstra_purkinje_max_path_len=approx_djikstra_purkinje_max_path_len,
    geometry=geometry, lv_inter_root_node_distance=lv_inter_root_node_distance,
    rv_inter_root_node_distance=rv_inter_root_node_distance,
    verbose=verbose)
# Assign conduction_system to its geometry
geometry.set_conduction_system(conduction_system)

In [None]:
t_start = time.time()
# Step 5: Create Eikonal instance. Eikonal will require a conduction and an Eikonal-friendly mesh on creation.
print('Step 5: Create propagation model instance.')
# Arguments for propagation model:
fibre_speed_name = get_fibre_speed_name()
sheet_speed_name = get_sheet_speed_name()
normal_speed_name = get_normal_speed_name()
endo_dense_speed_name = get_endo_dense_speed_name()
endo_sparse_speed_name = get_endo_sparse_speed_name()
purkinje_speed_name = get_purkinje_speed_name()
speed_parameter_name_list_in_order = [fibre_speed_name, sheet_speed_name, normal_speed_name, endo_dense_speed_name,
                                endo_sparse_speed_name, purkinje_speed_name]
nb_speed_parameters = len(speed_parameter_name_list_in_order)
nb_candidate_root_nodes = geometry.get_nb_candidate_root_node()
candidate_root_node_names = ['r' + str(root_i) for root_i in range(nb_candidate_root_nodes)]
propagation_parameter_name_list_in_order = speed_parameter_name_list_in_order + candidate_root_node_names
propagation_model = EikonalDjikstraTet(
    endo_dense_speed_name=endo_dense_speed_name, endo_sparse_speed_name=endo_sparse_speed_name,
    fibre_speed_name=fibre_speed_name, geometry=geometry, module_name=propagation_module_name,
    nb_speed_parameters=nb_speed_parameters, normal_speed_name=normal_speed_name,
    parameter_name_list_in_order=propagation_parameter_name_list_in_order, purkinje_speed_name=purkinje_speed_name,
    sheet_speed_name=sheet_speed_name, verbose=verbose)
# # Save hyperparameters for reproducibility
# hyperparameter_dict['fibre_speed_name'] = fibre_speed_name
# hyperparameter_dict['sheet_speed_name'] = sheet_speed_name
# hyperparameter_dict['normal_speed_name'] = normal_speed_name
# hyperparameter_dict['endo_dense_speed_name'] = endo_dense_speed_name
# hyperparameter_dict['endo_sparse_speed_name'] = endo_sparse_speed_name
# hyperparameter_dict['purkinje_speed_name'] = purkinje_speed_name
# hyperparameter_dict['nb_speed_parameters'] = nb_speed_parameters
# hyperparameter_dict['propagation_parameter_name_list_in_order'] = propagation_parameter_name_list_in_order
# # Clear Arguments to prevent Argument recycling
# nb_speed_parameters = None
print(round(time.time()-t_start, 5))

In [None]:
t_start = time.time()
# Step 6: Create Whole organ Electrophysiology model.
print('Step 6: Create ECG calculation method.')
# Arguments for electrophysiology:
# Create electrophysiology instance
electrophysiology_model = ElectrophysiologyUpstrokeStepFunction(
    cellular_model=cellular_model, module_name=electrophysiology_module_name, propagation_model=propagation_model,
    verbose=verbose)
# Save hyperparameters for reproducibility
# # Clear Arguments to prevent Argument recycling
# cellular_model = None
# propagation_model = None
print(round(time.time()-t_start, 5))

In [None]:
t_start = time.time()
# Step 7: Create ECG calculation method. In this case, the ecg will calculate only the QRS and will use a step
# function as the AP's upstroke.
print('Step 7: Create ECG calculation method. Using step function.')
# Arguments for ECG calculation:
filtering = True
max_len_qrs = 256  # This hyper-paramter is used when paralelising the ecg computation, because it needs a structure to synchronise the results from the multiple threads.
max_len_ecg = max_len_qrs
normalise = True
zero_align = True
frequency = 1000  # Hz
if frequency != 1000:
    warn('The hyper-parameter frequency is only used for filtering! If you dont use 1000 Hz in any time-series in the code, the other hyper-parameters will not give the expected outcome!')
low_freq_cut = 0.001  # 0.5
high_freq_cut = 100  # 150
# I_name = 'I'
# II_name = 'II'
# v3_name = 'V3'
# v5_name = 'V5'
lead_names = get_unique_lead_name_list()
nb_leads = len(lead_names)
# Read clinical data
clinical_ecg_raw = np.genfromtxt(clinical_data_filename_path, delimiter=',')     # No offset allowed in clinical QRS recordings
# TODO revert this change, this is only needed when using an ECG that has the full beat instead of a trimmed QRS, and the
# TODO cutting point will change from subject to subject, either have an automatic delineator, or preprocess the QRS beforehand
print('clinical_ecg_raw ', clinical_ecg_raw.shape)
# Sometimes the ECG gets saved using the saving function that presses it into a single line in the file
if len(clinical_ecg_raw.shape) == 1:
    clinical_ecg_raw = np.squeeze(unfold_ecg_matrix(data=clinical_ecg_raw, nb_leads=nb_leads))
clinical_ecg_raw = clinical_ecg_raw[:, :max_len_qrs]  # TODO remove this line
print('clinical_ecg_raw ', clinical_ecg_raw.shape)
# Create ECG model
ecg_model = PseudoQRSTetFromStepFunction(electrode_positions=geometry.electrode_xyz, filtering=filtering,
                                         frequency=frequency, high_freq_cut=high_freq_cut, lead_names=lead_names,
                                         low_freq_cut=low_freq_cut,
                                         max_len_qrs=max_len_qrs, nb_leads=nb_leads, nodes_xyz=geometry.node_xyz,
                                         normalise=normalise, reference_ecg=clinical_ecg_raw, tetra=geometry.tetra,
                                         tetra_centre=geometry.get_tetra_centre(), verbose=verbose,
                                         zero_align=zero_align)
clinical_ecg = ecg_model.preprocess_ecg(clinical_ecg_raw)
# # Save hyperparameters for reproducibility
# hyperparameter_dict['filtering'] = filtering
# hyperparameter_dict['frequency'] = frequency
# hyperparameter_dict['high_freq_cut'] = high_freq_cut
# hyperparameter_dict['lead_names'] = lead_names
# hyperparameter_dict['low_freq_cut'] = low_freq_cut
# hyperparameter_dict['max_len_qrs'] = max_len_qrs
# hyperparameter_dict['max_len_ecg'] = max_len_ecg
# hyperparameter_dict['nb_leads'] = nb_leads
# hyperparameter_dict['normalise'] = normalise
# hyperparameter_dict['zero_align'] = zero_align
# # Clear Arguments to prevent Argument recycling
# clinical_data_filename_path = None
# clinical_ecg_raw = None
# filtering = None
# frequency = None
# high_freq_cut = None
# geometry = None  # Clear Geometry
# lead_names = None
# max_len_ecg = None
# max_len_qrs = None
# max_len_st = None
# nb_leads = None
# normalise = None
# zero_align = None
print(round(time.time()-t_start, 5))

In [None]:
t_start = time.time()
# Step 8: Define instance of the simulation method.
print('Step 8: Define instance of the simulation method.')
simulator_ecg = SimulateECGwithLATmax(ecg_model=ecg_model, electrophysiology_model=electrophysiology_model, verbose=verbose)
simulator_ep = SimulateEP(electrophysiology_model=electrophysiology_model, verbose=verbose)
# Save hyperparameters for reproducibility
# Clear Arguments to prevent Argument recycling
# electrophysiology_model = None
# ecg_model = None
print(round(time.time()-t_start, 5))

In [None]:
t_start = time.time()
# Step 9: Define instance of the simulation method.
print('Step 9: Define instance of the simulation method.')
simulator_ecg = SimulateECGwithLATmax(ecg_model=ecg_model, electrophysiology_model=electrophysiology_model, verbose=verbose)
simulator_ep = SimulateEP(electrophysiology_model=electrophysiology_model, verbose=verbose)
simulator_ep = SimulateEP(electrophysiology_model=electrophysiology_model, verbose=verbose)
# Clear Arguments to prevent Argument recycling
# electrophysiology_model = None
# ecg_model = None
print(round(time.time()-t_start, 5))

In [None]:
t_start = time.time()
# Step 9: Define Adapter to translate between theta and parameters.
print('Step 9: Define Adapter to translate between theta and parameters.')
# Arguments for Adapter:
parameter_name_list_in_order = propagation_parameter_name_list_in_order
continuous_theta_name_list_in_order = [sheet_speed_name, endo_dense_speed_name, endo_sparse_speed_name]
theta_name_list_in_order = continuous_theta_name_list_in_order + candidate_root_node_names
parameter_fixed_value_dict = {}
# parameter_fixed_value_dict[fibre_speed_name] = 0.065  # Taggart et al. (2000)
# parameter_fixed_value_dict[normal_speed_name] = 0.048  # Taggart et al. (2000)
# parameter_fixed_value_dict[purkinje_speed_name] = 0.3  # Made up value (cm/ms)

# parameter_fixed_value_dict[fibre_speed_name] = 0.0065  # Taggart et al. (2000)
# parameter_fixed_value_dict[normal_speed_name] = 0.0048  # Taggart et al. (2000)
# parameter_fixed_value_dict[purkinje_speed_name] = 0.9  # Made up value (cm/ms)

physiological_rules_larger_than_dict = {}
physiological_rules_larger_than_dict[endo_dense_speed_name] = [endo_sparse_speed_name]  # Define custom rules to constrain which parameters must be larger than others.
# [sheet_speed_name, endo_dense_speed_name, endo_sparse_speed_name]
sheet_speed_resolution = 0.001
endo_dense_speed_resolution = 0.001
endo_sparse_speed_resolution = 0.001
nb_discrete_theta = len(candidate_root_node_names)
theta_adjust_function_list_in_order = [RoundTheta(resolution=sheet_speed_resolution),
                                       RoundTheta(resolution=endo_dense_speed_resolution),
                                       RoundTheta(resolution=endo_sparse_speed_resolution)]
for root_i in range(nb_discrete_theta):
    theta_adjust_function_list_in_order.append(None)
if len(theta_adjust_function_list_in_order) != len(theta_name_list_in_order):
    print('theta_name_list_in_order ', len(theta_name_list_in_order))
    print('theta_adjust_function_list_in_order ', len(theta_adjust_function_list_in_order))
    raise Exception('Different number of adjusting functions and theta for the inference')
# Distribute parameters into modules
destination_module_name_list_in_order = [propagation_module_name]
parameter_destination_module_dict = {}
parameter_destination_module_dict[propagation_module_name] = propagation_parameter_name_list_in_order
print('Caution: these rules have only been enabled for the inferred parameters!')   # TODO: modify this to also enable rules for fixed parameters (e.g., fibre_speed >= sheet_speed)
# Create an adapter that can translate between theta and parameters
adapter = AdapterThetaParams(destination_module_name_list_in_order=destination_module_name_list_in_order,
                             parameter_fixed_value_dict=parameter_fixed_value_dict,
                             parameter_name_list_in_order=parameter_name_list_in_order,
                             parameter_destination_module_dict=parameter_destination_module_dict,
                             theta_adjust_function_list_in_order=theta_adjust_function_list_in_order,
                             theta_name_list_in_order=theta_name_list_in_order,
                             physiological_rules_larger_than_dict=physiological_rules_larger_than_dict,
                             verbose=verbose)
# Save hyperparameters for reproducibility
# hyperparameter_dict['continuous_theta_name_list_in_order'] = continuous_theta_name_list_in_order
# hyperparameter_dict['destination_module_name_list_in_order'] = destination_module_name_list_in_order
# hyperparameter_dict['endo_dense_speed_resolution'] = endo_dense_speed_resolution
# hyperparameter_dict['endo_sparse_speed_resolution'] = endo_sparse_speed_resolution
# hyperparameter_dict['nb_discrete_theta'] = nb_discrete_theta
# hyperparameter_dict['parameter_destination_module_dict'] = parameter_destination_module_dict
# hyperparameter_dict['parameter_fixed_value_dict'] = parameter_fixed_value_dict
# hyperparameter_dict['parameter_name_list_in_order'] = parameter_name_list_in_order
# hyperparameter_dict['physiological_rules_larger_than_dict'] = physiological_rules_larger_than_dict
# hyperparameter_dict['theta_name_list_in_order'] = theta_name_list_in_order
# hyperparameter_dict['sheet_speed_resolution'] = sheet_speed_resolution
# # Clear Arguments to prevent Argument recycling
# candidate_root_node_names = None
# destination_module_name_list_in_order = None
# endo_dense_speed_name = None
# endo_dense_speed_resolution = None
# endo_sparse_speed_name = None
# endo_sparse_speed_resolution = None
# nb_discrete_theta = None
# fibre_speed_name = None
# normal_speed_name = None
# parameter_fixed_value_dict = None
# speed_parameter_name_list_in_order = None
# theta_adjust_function_list_in_order = None
# sheet_speed_name = None
# sheet_speed_resolution = None
print(round(time.time()-t_start, 5))

In [None]:
t_start = time.time()
# Step 10: Create evaluator_ecg.
print('Step 10: Create evaluator_ecg.')
evaluator_ecg = ParameterSimulator(adapter=adapter, simulator=simulator_ecg, verbose=verbose)
evaluator_ep = ParameterSimulator(adapter=adapter, simulator=simulator_ep, verbose=verbose)
# Clear Arguments to prevent Argument recycling.
# adapter = None
# # discrepancy_metric = None
# simulator_ecg = None
# simulator_ep = None
# clinical_ecg = None
print(round(time.time()-t_start, 5))

In [None]:
t_start = time.time()
parameter_value_dict = {}
# parameter_value_dict[fibre_speed_name] = 0.065  # Taggart et al. (2000)
# parameter_value_dict[normal_speed_name] = 0.048  # Taggart et al. (2000)
# parameter_value_dict[purkinje_speed_name] = 0.3  # Made up value (cm/ms)
# parameter_value_dict[sheet_speed_name] = 0.52 # approx Taggart et al. (2000)
# parameter_value_dict[endo_dense_speed_name] = 0.17  # Made up value (cm/ms)
# parameter_value_dict[endo_sparse_speed_name] = 0.1  # Made up value (cm/ms)
# for discrete_parameter_name in candidate_root_node_names:
#     parameter_value_dict[discrete_parameter_name] = 0 #1
# parameter_value_dict[candidate_root_node_names[-1]] = 1


parameter_value_dict[fibre_speed_name] = 0.001  # Taggart et al. (2000)
parameter_value_dict[normal_speed_name] = 0.001  # Taggart et al. (2000)
parameter_value_dict[purkinje_speed_name] = 0.9  # Made up value (cm/ms)
parameter_value_dict[sheet_speed_name] = 0.001 # approx Taggart et al. (2000)
parameter_value_dict[endo_dense_speed_name] = 0.001  # Made up value (cm/ms)
parameter_value_dict[endo_sparse_speed_name] = 0.001  # Made up value (cm/ms)
for discrete_parameter_name in candidate_root_node_names:
    parameter_value_dict[discrete_parameter_name] = 1
# parameter_value_dict[candidate_root_node_names[-1]] = 1


print(parameter_value_dict)
parameter_particle = []
for parameter_name in parameter_name_list_in_order:
    parameter_particle.append(parameter_value_dict[parameter_name])
parameter_particle = np.array(parameter_particle)
print(round(time.time()-t_start, 5))

In [None]:
t_start = time.time()
lat_simulation, vm_simulation = evaluator_ep.simulate_parameter_particle(
    parameter_particle=parameter_particle)
print(round(time.time()-t_start, 5))

In [None]:
# Define some new functions ot plot the activation maps on the 3D geometries
import plotly.graph_objects as go
import plotly.express as px

def list_faces(t):
  t.sort(axis=1)
  n_t, m_t= t.shape 
  f = np.empty((4*n_t, 3) , dtype=int)
  i = 0
  for j in range(4):
    f[i:i+n_t,0:j] = t[:,0:j]
    f[i:i+n_t,j:3] = t[:,j+1:4]
    i=i+n_t
  return f

def extract_unique_triangles(t):
  _, indxs, count  = np.unique(t, axis=0, return_index=True, return_counts=True)
  return t[indxs[count==1]]

def extract_surface(t):
  f=list_faces(t)
  f=extract_unique_triangles(f)
  return f


def plot_geometry(xyz, surf):
    fig = go.Figure(data=[
        go.Mesh3d(
            x=xyz[:, 0],
            y=xyz[:, 1],
            z=xyz[:, 2],
            colorbar=dict(title=dict(text='z')),
            colorscale=px.colors.sequential.Viridis,
            # Intensity of each vertex, which will be interpolated and color-coded
            intensity=lat_simulation,
            # i, j and k give the vertices of triangles
            # here we represent the 4 triangles of the tetrahedron surface
            i=surf[:, 0],
            j=surf[:, 1],
            k=surf[:, 2],
            name='y',
            showscale=True
        )
    ])
    fig.update_layout( width=1000,
            height=1000)
    
    fig.show()


In [None]:
# Visualise the activation map
plot_geometry(xyz=geometry.get_node_xyz(), surf=extract_surface(geometry.get_tetra()))

In [None]:
t_start = time.time()
predicted_data_particle = evaluator_ecg.simulate_parameter_particle(
    parameter_particle=parameter_particle)
predicted_ecg = predicted_data_particle[0]
max_lat = predicted_data_particle[1]
print(round(time.time()-t_start, 5))

In [None]:
import matplotlib.pyplot as plt

t_start = time.time()
# Step 14: Visualise ECGs for the final population.
print('Step 14: Visualise ECGs for the final population.')
# Initialise arguments for plotting
axes = None
fig = None
# Plot the ECG inference population
# print('population_ecg ', population_ecg.shape)
# axes, fig = visualise_ecg(ecg_list=population_ecg, lead_name_list=lead_names, axes=axes,
#                           ecg_color='k', fig=fig, label_list=None,
#                           linewidth=0.1)
axes, fig = visualise_ecg(ecg_list=[predicted_ecg], lead_name_list=lead_names, axes=axes,
                          ecg_color='magenta', fig=fig, label_list=['Simulated'],
                          linewidth=2.)
# Plot the clinical trace after the last iteration
axes, fig = visualise_ecg(ecg_list=[clinical_ecg], lead_name_list=lead_names, axes=axes,
                          ecg_color='lime', fig=fig, label_list=['Clinical'],
                          linewidth=2.)
axes[-1].legend(loc='center left', bbox_to_anchor=(1, 0.5), fontsize=20)
axes[0].set_title(anatomy_subject_name)
plt.show(block=False)
print(round(time.time()-t_start, 5))