In [None]:
!pip install git+https://github.com/waltsims/k-wave-python.git

In [None]:
import io
import logging
import sys
from copy import deepcopy

import cv2
import matplotlib.pyplot as plt
import numpy as np
import requests
from kwave.data import Vector
from kwave.kgrid import kWaveGrid
from kwave.kmedium import kWaveMedium
from kwave.ksensor import kSensor
from kwave.ksource import kSource
from kwave.kspaceFirstOrder2D import kspaceFirstOrder2D
from kwave.options.simulation_execution_options import SimulationExecutionOptions
from kwave.options.simulation_options import SimulationOptions
from kwave.reconstruction.time_reversal import TimeReversal
from kwave.utils.colormap import get_color_map
from kwave.utils.conversion import cart2grid, grid2cart
from kwave.utils.filters import smooth
from kwave.utils.interp import interp_cart_data
from kwave.utils.io import load_image
from kwave.utils.mapgen import make_cart_circle, make_circle
from kwave.utils.matrix import resize, sort_rows
from kwave.utils.signals import add_noise, reorder_binary_sensor_data
from mpl_toolkits.axes_grid1.axes_divider import make_axes_locatable


pml_size: int = 20 # size of the PML in grid points
Nx: int = 256 - 2 * pml_size # number of grid points in the x direction
Ny: int = 256 - 2 * pml_size # number of grid points in the y direction

x: float = 10e-3 # total grid size [m]
y: float = 10e-3 # total grid size [m]

dx: float = x / float(Nx) # grid point spacing in the x direction [m]
dy: float = y / float(Ny) # grid point spacing in the y direction [m]

kgrid = kWaveGrid(Vector([Nx, Ny]), Vector([dx, dy]))

medium = kWaveMedium(sound_speed=1500.0)

kgrid.makeTime(medium.sound_speed)

# URL of the image on GitHub
url = 'https://raw.githubusercontent.com/waltsims/k-wave-python/refs/heads/master/tests/EXAMPLE_source_two.bmp'

# Fetch the image
response = requests.get(url)

# Check if the request was successful
if response.status_code == 200:
    # Read the image data from the response content
    image_data = io.BytesIO(response.content)

    image_array = np.frombuffer(image_data.getvalue(), dtype=np.uint8)

    # Decode the image using cv2.imdecode. This returns a NumPy array.
    p0 = cv2.imdecode(image_array, cv2.IMREAD_GRAYSCALE)

else:
    # Handle the case where the image could not be downloaded
    print(f"Error: Could not download image from {url}. Status code: {response.status_code}")

# Load the image using load_image
p0_magnitude = 2.0
p0 = p0_magnitude * p0

p0 = resize(p0, [Nx, Ny])
p0 = smooth(p0, True)

source = kSource()
source.p0 = p0

# =========================================================================
# DEFINE THE MATERIAL PROPERTIES
# =========================================================================

sensor = kSensor()

# define a centered Cartesian circular sensor
sensor_radius: float = 4.5e-3            # [m]
sensor_angle: float = 3.0 * np.pi / 2.0  # [rad]
sensor_pos = Vector([0, 0])              # [m]
num_sensor_points: int = 70
cart_sensor_mask = make_cart_circle(sensor_radius, num_sensor_points, sensor_pos, sensor_angle)

# put the cartesian points into the sensor.mask
sensor.mask = cart_sensor_mask

# set the record type: record the pressure waveform
sensor.record = ['p']

# set color map
cmap = get_color_map()

grid, _, reorder_index = cart2grid(kgrid, cart_sensor_mask)

cart_sensor_mask_reordered = cart_sensor_mask[:, np.squeeze(reorder_index.T).astype(int) - 1]

fig, ax = plt.subplots(1, 1)
im = ax.pcolormesh(np.squeeze(kgrid.x_vec)* 1e3,
                   np.squeeze(kgrid.y_vec)* 1e3, p0, shading='gouraud', cmap=cmap, vmin=-1, vmax=1)
ax.scatter(cart_sensor_mask[1, :] * 1e3, cart_sensor_mask[0, :] * 1e3, c='k', marker='o', s=8)
ax.yaxis.set_inverted(True)

In [None]:
# =========================================================================
# DEFINE THE SIMULATION PARAMETERS
# =========================================================================

DATA_CAST = 'single'
DATA_PATH = './'

input_filename = 'input.h5'
output_filename = 'output.h5'

# options for writing to file, but not doing simulations
simulation_options = SimulationOptions(
    data_cast=DATA_CAST,
    data_recast=True,
    save_to_disk=True,
    smooth_p0=False,
    input_filename=input_filename,
    output_filename=output_filename,
    save_to_disk_exit=False,
    data_path=DATA_PATH,
    pml_inside=False)

execution_options = SimulationExecutionOptions(
    is_gpu_simulation=False,
    delete_data=False,
    verbose_level=2)

# =========================================================================
# RUN THE FIRST SIMULATION
# =========================================================================

sensor_data_original = kspaceFirstOrder2D(medium=deepcopy(medium),
                                          kgrid=deepcopy(kgrid),
                                          source=deepcopy(source),
                                          sensor=deepcopy(sensor),
                                          simulation_options=deepcopy(simulation_options),
                                          execution_options=deepcopy(execution_options))

In [None]:
# plot the simulated sensor data

sensor_data_reordered = reorder_binary_sensor_data(sensor_data_original['p'].T, reorder_index=reorder_index)

fig, ax = plt.subplots(1, 1)
im = ax.pcolormesh(sensor_data_reordered,
                   shading='gouraud', cmap=cmap, vmin=-1, vmax=1)
ax.set_aspect('auto', adjustable='box')
ax.set_ylabel('Sensor')
ax.set_xlabel('Time')
ax.yaxis.set_inverted(True)
ax_divider = make_axes_locatable(ax)
cax = ax_divider.append_axes("right", size="3%", pad="2%")
cbar = fig.colorbar(im, cax=cax, ticks=[-1, -0.5, 0, 0.5, 1])
cbar.ax.set_yticklabels(['-1', '-0.5', '0', '0.5', '1'])
cbar.ax.tick_params(labelsize=8)

In [None]:
# =========================================================================
# SECOND
# =========================================================================

# add noise to the recorded sensor data
signal_to_noise_ratio: float = 40.0	# [dB]
sensor_data = add_noise(sensor_data_original['p'].T, signal_to_noise_ratio, 'peak')

# create a second computation grid for the reconstruction to avoid the
# inverse crime
Nx: int = 300              # number of grid points in the x direction
Ny: int = 300              # number of grid points in the y direction
dx: float = x / float(Nx)  # grid point spacing in the x direction [m]
dy: float = y / float(Ny)  # grid point spacing in the y direction [m]
kgrid_recon = kWaveGrid(Vector([Nx, Ny]), Vector([dx, dy]))

# use the same time array for the reconstruction
kgrid_recon.setTime(kgrid.Nt, kgrid.dt)

source = kSource()
del sensor

temp, _, _ = cart2grid(kgrid_recon, cart_sensor_mask)

sensor = kSensor()

sensor.mask = temp
sensor.recorded_pressure = sensor_data

# set the record type: record the pressure waveform
sensor.record = ['p']

tr = TimeReversal(kgrid_recon, medium, sensor, compensation_factor=1.0)
p0_recon = tr(kspaceFirstOrder2D, simulation_options, execution_options)

In [None]:
# plot result
fig, ax = plt.subplots()
ax = plt.pcolormesh(p0_recon)
plt.show()

In [None]:
# =========================================================================
# THIRD
# =========================================================================

# create a binary sensor mask of an equivalent continuous circle, this has the enlarged number of sensor points
sensor_radius_grid_points: int = round(sensor_radius / kgrid_recon.dx)
binary_sensor_mask = make_circle(Vector([Nx, Ny]), Vector([Nx // 2, Ny // 2]), sensor_radius_grid_points, sensor_angle)

td = interp_cart_data(kgrid=kgrid_recon,
                      cart_sensor_data=sensor_data.T,
                      cart_sensor_mask=cart_sensor_mask,
                      binary_sensor_mask=binary_sensor_mask)


del sensor
sensor = kSensor()
sensor.mask = binary_sensor_mask
sensor.record = ['p']
sensor.recorded_pressure = interp_cart_data(kgrid=kgrid_recon,
                                            cart_sensor_data=sensor_data_reordered,
                                            cart_sensor_mask=cart_sensor_mask,
                                            binary_sensor_mask=binary_sensor_mask)

# sensor defines the source
tr = TimeReversal(kgrid_recon, medium, sensor, compensation_factor=1.0)
p0_recon_interp = tr(kspaceFirstOrder2D, simulation_options, execution_options)

In [None]:
fig, ax = plt.subplots()
im = plt.pcolormesh(np.squeeze(kgrid_recon.x_vec),
                    np.squeeze(kgrid_recon.y_vec), p0_recon_interp,
                    cmap=cmap, vmin=-1, vmax=1)
ax.yaxis.set_inverted(True)
ax.set_title('Reconstructed Pressure Distribution with Interpolation')


# plot a profile for comparison
slice_pos = 4.5e-3  # [m] location of the slice from top of grid [m]
i = int(round(slice_pos / kgrid.dx))
j = int(round(slice_pos / kgrid_recon.dx))
fig, ax = plt.subplots()
ax.plot(np.squeeze(kgrid.y_vec) * 1e3, p0[i,: ], 'k--', label='Initial Pressure')
ax.plot(np.squeeze(kgrid_recon.y_vec) * 1e3, np.transpose(p0_recon)[:, j], 'r-', label='Point Reconstruction')
ax.plot(np.squeeze(kgrid_recon.y_vec) * 1e3, np.transpose(p0_recon_interp)[:, j], 'b-', label='Interpolated Reconstruction')
ax.set_xlabel('y-position [mm]')
ax.set_ylabel('Pressure')
ax.set_ylim(0, 2.1)
ax.legend()