Computation of forward model for scatterers in a block in immersion.

Nicolas Budyn, 2017-2018


Input
-----

conf.yaml

Output
------

- Figures if ``aplt.conf['savefig'] == True``
- ``tfm_intensities.csv`` is ``save == True``

Model
-----

The response of a scatterer in $y$ for a given mode, for the transmitter $i$ and the receiver $j$ is:

$$
\begin{align}
F_{ij}(\omega) &= Q_i(\omega, y) Q'_j(\omega, y) S_{ij}(\omega, y) 
                  \exp(\iota \omega \tau_{ij}(y))  U(\omega) \\
&= H_{ij}(\omega) U(\omega)
\end{align}
$$
                  
                  
                  
$U(\omega)$ is the toneburst: variables ``toneburst`` in time domain and ``toneburst_f`` in frequency domain.

The scatterering function $S_{ij}$ is defined in variable ``scat_obj``.

The transfer function $H_{ij}$ is stored in variable ``transfer_function_f``.

Data structure
--------------

``frame`` : Frame
    Frame that contains the response of the scatterers for all views.
    
``tfms`` : dict of arim.im.tfm.TfmResult
    TFM image of the contribution of a mode to the corresponding view.
    



In [None]:
import math
import logging
from collections import OrderedDict

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import yaml
from scipy.signal import hilbert
import scipy.fftpack

import arim, arim.model, arim.scat, arim.plot as aplt
import arim.models.block_in_immersion as bim
import arim.im, arim.signal  # for imaging
import arim.scat
import arim.io


In [None]:
save = True
aplt.conf["savefig"] = False

use_multifreq = False
max_number_of_reflection = 1  # for scatterer
tfm_unique_only = False
numangles_for_scat_precomp = 120  # 0 to disable precomputation

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.getLogger("arim").setLevel(logging.INFO)

conf = arim.io.load_conf(".")


#### Define inspection set-up

In [None]:
probe = arim.io.probe_from_conf(conf)
tx_list, rx_list = arim.ut.fmc(probe.numelements)
numscanlines = len(tx_list)

examination_object = arim.io.block_in_immersion_from_conf(conf)

defect_centre = conf["scatterer"]["location"]
scatterer = arim.geometry.default_oriented_points(
    arim.geometry.Points(
        [
            [
                conf["scatterer"]["location"]["x"],
                conf["scatterer"]["location"]["y"],
                conf["scatterer"]["location"]["z"],
            ]
        ],
        name="Scatterer",
    )
)

# for imaging:
small_grid = arim.Grid.grid_centred_at_point(
    defect_centre["x"],
    defect_centre["y"],
    defect_centre["z"],
    5e-3,
    0.,
    5e-3,
    pixel_size=0.125e-3,
)
small_grid.name = "Small grid"

aplt.plot_interfaces(
    [
        probe.to_oriented_points(),
        examination_object.frontwall,
        examination_object.backwall,
        small_grid.to_oriented_points(),
        scatterer,
    ],
    show_last=True,
    markers=[".", "-", "-", "k.", "d"],
)


#### Ray tracing for scatterer

In [None]:
views = bim.make_views(
    examination_object,
    probe.to_oriented_points(),
    scatterer,
    max_number_of_reflection,
    tfm_unique_only,
)
# views = {'L-L': views['L-L']}  # debug
print("Views: " + ", ".join(views.keys()))
arim.ray.ray_tracing(views.values(), convert_to_fortran_order=True)


#### Ray tracing for frontwall and backwall echoes
Model:
- first frontwall reflection,
- LL, LT, TL and TT backwall reflections.

In [None]:
frontwall_path = bim.frontwall_path(
    examination_object.couplant_material,
    examination_object.block_material,
    *probe.to_oriented_points(),
    *examination_object.frontwall,
)

backwall_paths = bim.backwall_paths(
    examination_object.couplant_material,
    examination_object.block_material,
    probe.to_oriented_points(),
    examination_object.frontwall,
    examination_object.backwall,
)

wall_paths = {f"Backwall {key}": path for key, path in backwall_paths.items()}
wall_paths["Frontwall"] = frontwall_path
arim.ray.ray_tracing_for_paths(wall_paths.values())
print("Wall paths: " + ", ".join(wall_paths.keys()))


#### Ray tracing for imaging

In [None]:
# for imaging:
views_scat = bim.make_views(
    examination_object,
    probe.to_oriented_points(),
    small_grid.to_oriented_points(),
    max_number_of_reflection,
    tfm_unique_only,
)
# views_scat = {'L-L': views_scat['L-L']}  # debug
arim.ray.ray_tracing(views_scat.values(), convert_to_fortran_order=True)


#### Toneburst and time vector

In [None]:
max_delay_scat = max(
    (
        view.tx_path.rays.times.max() + view.rx_path.rays.times.max()
        for view in views.values()
    )
)
max_delay_frontwall = frontwall_path.rays.times.max()
max_delay_backwall = max(path.rays.times.max() for path in backwall_paths.values())
max_delay = max(max_delay_scat, max_delay_frontwall, max_delay_backwall)


dt = .25 / probe.frequency  # to adjust so that the whole toneburst is sampled
_tmax = max_delay + 4 * conf["toneburst"]["num_cycles"] / probe.frequency

numsamples = scipy.fftpack.next_fast_len(math.ceil(_tmax / dt))
time = arim.Time(0., dt, numsamples)
freq_array = np.fft.rfftfreq(len(time), dt)
numfreq = len(freq_array)

toneburst = arim.model.make_toneburst(
    conf["toneburst"]["num_cycles"],
    conf["probe"]["frequency"],
    dt,
    numsamples,
    wrap=True,
)
toneburst_f = np.fft.rfft(toneburst)

toneburst_ref = np.abs(hilbert(toneburst)[0])

# plot toneburst
plt.figure(figsize=(10, 5))
plt.subplot(1, 2, 1)
plt.plot(1e6 * time.samples, toneburst)
plt.title("toneburst (time domain)")
plt.xlabel("time (µs)")

plt.subplot(1, 2, 2)
plt.plot(1e-6 * np.fft.rfftfreq(len(toneburst), dt), abs(toneburst_f))
plt.title("toneburst (frequency domain)")
plt.xlabel("frequency (MHz)")
if aplt.conf["savefig"]:
    plt.savefig("toneburst")


#### Compute transfer functions

In [None]:
model_options = dict(
    probe_element_width=probe.dimensions.x[0],
    use_directivity=True,
    use_beamspread=True,
    use_transrefl=True,
    use_attenuation=True,
)


##### For scatterer

In [None]:
scat_obj = arim.scat.scat_factory(
    material=examination_object.block_material, **conf["scatterer"]["specs"]
)
scat_angle = np.deg2rad(conf["scatterer"]["angle_deg"])

transfer_function_f = np.zeros((numscanlines, numfreq), np.complex_)
tfms_scat = OrderedDict()


if use_multifreq:
    # Multi frequency model
    transfer_function_iterator = bim.multifreq_scat_transfer_functions(
        views,
        tx_list,
        rx_list,
        freq_array=freq_array,
        scat_obj=scat_obj,
        scat_angle=scat_angle,
        numangles_for_scat_precomp=numangles_for_scat_precomp,
        **model_options,
    )
else:
    # Single frequency model
    transfer_function_iterator = bim.singlefreq_scat_transfer_functions(
        views,
        tx_list,
        rx_list,
        freq_array=freq_array,
        scat_obj=scat_obj,
        scat_angle=scat_angle,
        frequency=probe.frequency,
        numangles_for_scat_precomp=numangles_for_scat_precomp,
        **model_options,
    )

with arim.helpers.timeit("Main loop for scatterer"):
    for viewname, partial_transfer_func in transfer_function_iterator:
        transfer_function_f += partial_transfer_func

        # imaging:
        partial_response = arim.signal.rfft_to_hilbert(
            partial_transfer_func * toneburst_f, numsamples
        )
        partial_frame = arim.Frame(
            partial_response, time, tx_list, rx_list, probe, examination_object
        )

        tfms_scat[viewname] = arim.im.tfm.tfm_for_view(
            partial_frame,
            small_grid,
            views_scat[viewname],
            interpolation=("lanczos", 3),
            fillvalue=0.,
        )

# At this stage, transfer_function_f contains the transfer function for scatterer for all views


##### For walls

In [None]:
transfer_function_wall_f = np.zeros((numscanlines, numfreq), np.complex_)

if use_multifreq:
    transfer_function_iterator = bim.multifreq_wall_transfer_functions(
        wall_paths, tx_list, rx_list, freq_array, **model_options
    )
else:
    transfer_function_iterator = bim.singlefreq_wall_transfer_functions(
        wall_paths, tx_list, rx_list, probe.frequency, freq_array, **model_options
    )

transfer_function_wall_f_dict = {}
with arim.helpers.timeit("Main loop for walls:"):
    for pathname, partial_transfer_func in transfer_function_iterator:
        transfer_function_wall_f += partial_transfer_func
        transfer_function_wall_f_dict[pathname] = partial_transfer_func


#### Compute the response in frequency then time domain

In [None]:
response_scanlines_f = (transfer_function_f + transfer_function_wall_f) * toneburst_f
response_scanlines = arim.signal.rfft_to_hilbert(
    response_scanlines_f, numsamples, axis=-1
)
real_response_scanlines = np.real(response_scanlines)

frame = arim.Frame(
    response_scanlines, time, tx_list, rx_list, probe, examination_object
)

plt.figure()
idx = 31
plt.plot(
    frame.time.samples * 1e6,
    np.real(frame.scanlines[idx]),
    label=f"tx={frame.tx[idx]}, rx={frame.rx[idx]}",
)
plt.xlabel("time (µs)")
plt.title("time-domain response")
plt.legend()
if aplt.conf["savefig"]:
    plt.savefig("time_domain_response")


In [None]:
aplt.plot_bscan_pulse_echo(frame)


#### Check reciprocity

In [None]:
tx = 1
rx = 19

idx1 = np.nonzero(np.logical_and(tx_list == tx, rx_list == rx))[0][0]
idx2 = np.nonzero(np.logical_and(tx_list == rx, rx_list == tx))[0][0]

real_response_scanlines = np.real(response_scanlines)

plt.figure()
plt.plot(
    time.samples * 1e6,
    real_response_scanlines[idx1],
    label=f"tx={tx_list[idx1]}, rx={rx_list[idx1]}",
)
plt.plot(
    time.samples * 1e6,
    real_response_scanlines[idx2],
    label=f"tx={tx_list[idx2]}, rx={rx_list[idx2]}",
)
plt.legend()
plt.xlabel("time (µs)")
plt.title("reciprocity - signals must overlap perfectly")
if aplt.conf["savefig"]:
    plt.savefig("reciprocity")
response_scanlines_1 = real_response_scanlines.reshape(
    (probe.numelements, probe.numelements, len(time))
)
response_scanlines_2 = np.swapaxes(response_scanlines_1, 0, 1)
error_reciprocity = np.max(np.abs(response_scanlines_1 - response_scanlines_2), axis=-1)
logger.info(
    f"Reciprocity error: {np.max(error_reciprocity)} on scanline {np.argmax(error_reciprocity)}"
)


#### Measure TFM intensities

In [None]:
tmp = []
scatterer_idx = small_grid.closest_point(*scatterer.points[0])

for viewname, tfm in tfms_scat.items():
    max_tfm_idx = np.argmax(np.abs(tfm.res))
    tmp.append(
        (
            viewname,
            arim.ut.decibel(np.abs(tfm.res.flat[scatterer_idx]), toneburst_ref),
            arim.ut.decibel(np.abs(tfm.res.flat[max_tfm_idx]), toneburst_ref),
            small_grid.x.flat[max_tfm_idx],
            small_grid.y.flat[max_tfm_idx],
            small_grid.z.flat[max_tfm_idx],
        )
    )
intensities_df = pd.DataFrame(
    tmp,
    columns=[
        "view",
        "intensity_at_centre",
        "max_intensity",
        "x_max_intensity",
        "y_max_intensity",
        "z_max_intensity",
    ],
).set_index("view")

if save:
    intensities_df.to_csv("tfm_intensities.csv")

intensities_df.iloc[:, :2]


#### Plot TFM of defect only

In [None]:
scale = aplt.common_dynamic_db_scale([tfm.res for tfm in tfms_scat.values()])

ncols = 3
nrows = math.ceil(len(tfms_scat) / ncols)
fig, axes = plt.subplots(nrows, ncols, figsize=(10, 30), sharex=True, sharey=True)

for i, (viewname, tfm) in enumerate(tfms_scat.items()):
    ref_db, clim = next(scale)

    amp = intensities_df.loc[viewname]

    ax = axes.flat[i]
    plt.sca(ax)

    aplt.plot_tfm(
        tfm,
        ax=ax,
        clim=clim,
        scale="db",
        ref_db=ref_db,
        title=f"TFM {viewname}",
        interpolation="none",
        savefig=False,
    )

    # last row
    if i // ncols == nrows - 1:
        plt.xlabel("x (mm)")
    else:
        plt.xlabel("")
    # first column
    if i % ncols == 0:
        plt.ylabel("z (mm)")
    else:
        plt.ylabel("")

    plt.plot(amp["x_max_intensity"], amp["z_max_intensity"], "1m")
    plt.plot(scatterer.points.x, scatterer.points.z, "dm")

if aplt.conf["savefig"]:
    ax.figure.savefig(f"tfm")


## Full TFM

In [None]:
grid = arim.geometry.Grid(**conf["grid"], ymin=0., ymax=0.)
views_imaging = bim.make_views(
    examination_object,
    probe.to_oriented_points(),
    grid.to_oriented_points(),
    max_number_of_reflection,
    tfm_unique_only=True,
)
arim.ray.ray_tracing(views_imaging.values(), convert_to_fortran_order=True)


In [None]:
tfms = {}
for i, view in enumerate(views_imaging.values()):
    with arim.helpers.timeit("TFM {}".format(view.name), logger=logger):
        tfms[view.name] = arim.im.tfm.tfm_for_view(
            frame, grid, view, fillvalue=0., interpolation=("lanczos", 3)
        )


In [None]:
size_box_x = 5e-3
size_box_z = 5e-3

reference_area = grid.points_in_rectbox(
    xmin=defect_centre["x"] - size_box_x / 2,
    xmax=defect_centre["x"] + size_box_x / 2,
    zmin=defect_centre["z"] - size_box_z / 2,
    zmax=defect_centre["z"] + size_box_z / 2,
)
scale = aplt.common_dynamic_db_scale(
    [tfm.res for tfm in tfms.values()], reference_area, db_range=40.
)
scale = aplt.common_dynamic_db_scale(
    [tfm.res for tfm in tfms.values()], None, db_range=40.
)


for i, (viewname, tfm) in enumerate(tfms.items()):
    ref_db, clim = next(scale)

    ax, _ = aplt.plot_tfm(
        tfm,
        clim=clim,
        scale="db",
        ref_db=ref_db,
        title="TFM {viewname}".format(**locals()),
        filename="tfm_{i:02}_{viewname}".format(**locals()),
    )
