In [None]:
# Tests for methods that manipulate with horizons
import sys
import warnings
import numpy as np

warnings.filterwarnings('ignore')

sys.path.append('../..') # for running py-script
sys.path.append('../../..') # for running this notebook directly

from seismiqb import Field, Horizon, plot_image

In [None]:
""" You can manage notebook execution kwargs which relates to cube and horizon for the test:

CUBE_PATH : str or None
    Path to an existed seismic cube.
HORIZON_PATH : str or None
    Path to an existed seismic horizon.

Visualizations in saved execution notebooks are controlled with:

SCALE : int
    Figures scale.
SHOW_FIGURES : bool
    Whether to show additional figures.
    Showing some figures can be useful for finding out the reason for the failure of tests.
"""
# Workspace constants
TESTS_ROOT_DIR = './'

# Visualization parameters
SCALE = 1
SHOW_FIGURES = True

In [None]:
# Tests parameters
OUTPUT_DIR = os.path.join(TESTS_ROOT_DIR, f"horizon_test_files")
CUBE_PATH = os.path.join(OUTPUT_DIR, 'test_cube.sgy')
HORIZON_PATH = os.path.join(OUTPUT_DIR, 'test_horizon')

In [None]:
def test_horizons_equality(horizon_1, horizon_2, test_name):
    " Check two horizons equality."
    assert np.array_equal(horizon_1.full_matrix, horizon_2.full_matrix), f"{test_name} test failed"
    assert np.array_equal(horizon_1.points, horizon_2.points), f"{test_name} test failed"

In [None]:
%%time
field = Field(CUBE_PATH)

horizon = Horizon(HORIZON_PATH, field=field)

horizon.show(show=SHOW_FIGURES)

# get_cube_values

In [None]:
%%time
# Make a flat horizon in the cube
depth = field.shape[-1]//2
window = field.shape[-1]//20

constant_matrix = np.ones_like(horizon.full_matrix) * depth

constant_horizon = Horizon(constant_matrix, i_min=0, x_min=0,
                           field=horizon.field, name="const")

# Get values along constant horizon
horizon_values = constant_horizon.get_cube_values(window=window)
horizon_values = np.nan_to_num(horizon_values, constant_horizon.FILL_VALUE)

# Get values from the cube
geometry_values = field.geometry[:, :, depth-window//2:depth+window//2]

assert np.allclose(
    horizon_values[constant_horizon.presence_matrix],
    geometry_values[constant_horizon.presence_matrix]
), "get_cube_values test failed"

# Filtering

In [None]:
%%time
# from_subset
# Make horizon which is cutted by i_line and calculate its coverage
# Create filtering matrix
cut_i_line = horizon.shape[0]//3
filtering_matrix = np.ones_like(horizon.full_matrix).astype(bool)
filtering_matrix[:cut_i_line] = False
filtering_matrix = filtering_matrix & horizon.presence_matrix

# Matrix coverage
filtering_matrix_coverage = np.sum(filtering_matrix)
nonzero_traces = (np.prod(horizon.field.spatial_shape) - np.sum(horizon.field.zero_traces))
filtering_matrix_coverage /= nonzero_traces

# New horizon filtered by matrix
filtered_horizon = Horizon(storage=horizon.full_matrix, field=horizon.field, name="filtered")
filtered_horizon = filtered_horizon.from_subset(filtering_matrix)

filtered_horizon.show(show=SHOW_FIGURES)

assert np.isclose(
    filtering_matrix_coverage, filtered_horizon.coverage, atol=1e-3
), "from_subset test failed"

In [None]:
%%time
filtered_horizon = Horizon(horizon.full_matrix, field=horizon.field, name="filtered")
filtered_horizon.filter()

# Check that zero_traces filled by FILL_VALUE
filtered_horizon_full_matrix = filtered_horizon.matrix_put_on_full(filtered_horizon.matrix)
zero_traces_area = filtered_horizon_full_matrix[filtered_horizon.field.zero_traces.astype(bool)]

assert np.all(zero_traces_area==filtered_horizon.FILL_VALUE), "filter test failed"

# Check that other traces didn't change
non_zero_traces_area = filtered_horizon_full_matrix[
    ~filtered_horizon.field.zero_traces.astype(bool)
]

original_matrix = horizon.full_matrix[~horizon.field.zero_traces.astype(bool)]

filtered_horizon.show(show=SHOW_FIGURES)

assert np.array_equal(non_zero_traces_area, original_matrix), "filter test failed"

In [None]:
%%time
# filter_matrix
# cut horizon by iline and check that cutted area is empty and other area is the same
i_line_cut = horizon.shape[0]//3

filtered_horizon = Horizon(horizon.full_matrix, field=horizon.field, name="filtered")

filtering_matrix = np.zeros_like(horizon.matrix)
filtering_matrix[:i_line_cut] = 1

filtered_horizon.filter_matrix(filtering_matrix)

filtered_horizon.show(show=SHOW_FIGURES)

# cutted area contains only FILL_VALUE
assert np.all(
    filtered_horizon.full_matrix[:i_line_cut]==horizon.FILL_VALUE
), "filter_matrix test failed"

assert np.array_equal(
    filtered_horizon.points,
    horizon.points[horizon.points[:, 0] >= i_line_cut]
), "filter_matrix test failed" # this area didn't change

In [None]:
%%time
filtered_horizon = Horizon(horizon.full_matrix, field=horizon.field, name="filtered")

filtered_horizon.filter_points(filtering_matrix)

filtered_horizon.show(show=SHOW_FIGURES)

# cutted area is not exists and existed part is the same as not cutted area
assert np.array_equal(
    filtered_horizon.points, horizon.points[horizon.points[:, 0] >= i_line_cut]
), "filter_points test failed"

In [None]:
%%time
# Make constant horizon with spikes:
spike_point = (horizon.shape[0]//2, horizon.shape[1]//2)

horizon_matrix = np.ones_like(horizon.full_matrix) * 1000
horizon_matrix[spike_point] += 5

horizon_with_spikes = Horizon(horizon_matrix, field=horizon.field, name="constant_with_spike")
horizon_with_spikes.filter()

# Save matrix state before despiking
mtrx_before_despiking = horizon_with_spikes.full_matrix.copy()

# Get mask of areas with cutted spikes (only for dilation=1)
dilation = 1
spikes_mask_median = np.zeros_like(horizon_with_spikes.full_matrix)

for i in range(-dilation, dilation+1):
    for x in range(-dilation, dilation+1):
        if i == 0 or x == 0:
            spikes_mask_median[(spike_point[0]+i, spike_point[1]+x)] = 1

spikes_mask_gradient = spikes_mask_median.copy() # it has i_line and x_line shifts
spikes_mask_gradient[1:] += spikes_mask_median[:-1].copy()
spikes_mask_gradient[:, 1:] += spikes_mask_median[:, :-1].copy()

spikes_mask_median = spikes_mask_median.astype(bool)
spikes_mask_gradient = spikes_mask_gradient.astype(bool)

spikes = {
    "median": spikes_mask_median,
    "gradient": spikes_mask_gradient
}

for mode, spikes_mask in spikes.items():
    despiked_constant_horizon = Horizon(horizon_with_spikes.full_matrix,
                                        field=horizon.field, name="despiked_constant")

    despiked_constant_horizon.filter_spikes(mode=mode, dilation=dilation)

    assert np.all(
        despiked_constant_horizon.full_matrix[spikes_mask]==despiked_constant_horizon.FILL_VALUE
    ), f"despike with mode {mode} test failed" # spikes were despiked

    assert np.array_equal(
        despiked_constant_horizon.full_matrix[~spikes_mask],
        mtrx_before_despiking[~spikes_mask]
    ), f"despike with mode {mode} test failed" # other values are unchanged

    print(f"Despiking mode {mode} was successfully tested.")

In [None]:
%%time
# Split horizon in two parts by cut_i_line
cut_i_line = horizon.shape[0]//3
filtering_matrix = np.zeros_like(horizon.full_matrix)
filtering_matrix[cut_i_line, :] = 1

horizon_with_holes = Horizon(horizon.full_matrix, field=horizon.field, name="with_holes")

horizon_with_holes.filter(filtering_matrix)

horizon_with_holes.filter_disconnected_regions()

assert np.array_equal(
    horizon_with_holes.full_matrix[cut_i_line+1:],
    horizon.full_matrix[cut_i_line+1:]
), "filter_disconnected_regions test failed" # after cut_line nothing to change

assert np.all(
    horizon_with_holes.full_matrix[:cut_i_line+1]==horizon_with_holes.FILL_VALUE
), "filter_disconnected_regions test failed" # before cut_line horizon is empty

# Specific manipulations

In [None]:
filtered_horizon = Horizon(horizon.full_matrix, field=horizon.field, name="filtered")
filtered_horizon.filter()

frequency = 100

def calculate_grid_coverage(horizon, frequencies=100, width=1, **kwargs):
    """ Approximate calculation of coverage of regular grid.

    horizon : instance of the `Horizon` class
        Seismic horizon.
    frequencies : int or sequence of two integers
        Grid frequencies by (iline, xline).
        If int, then grid frequencies are equals for ilines and xlines.
    width : int or sequence of two integers
        Grid width by (iline, xline).
        If int, then grid widths are equals for ilines and xlines.
    """
    if isinstance(frequencies, int):
        frequencies = (frequencies, frequencies)

    i_min = kwargs.get('i_min', horizon.i_min)
    x_min = kwargs.get('x_min', horizon.x_min)

    # Calculate amounts of grid *_lines
    amount_of_ilines = (horizon.i_length - i_min) // frequencies[0]
    amount_of_xlines = (horizon.x_length - x_min) // frequencies[1]

    # On borders we have a line with half of grid
    if (horizon.i_length - i_min) % frequencies[0] > 0:
        amount_of_ilines += 0.5
    else:
        amount_of_ilines -= 0.5

    if (horizon.x_length - x_min) % frequencies[1] > 0:
        amount_of_xlines += 0.5
    else:
        amount_of_xlines -= 0.5

    # Count amount of traces for grid with width=1
    intersection_traces = amount_of_ilines * amount_of_xlines
    unit_grid_traces = amount_of_ilines * horizon.x_length + amount_of_xlines * horizon.i_length

    # Count amount of traces for grid with preset width
    grid_traces = width * unit_grid_traces
    intersection_traces = width * width *intersection_traces
    grid_traces -= intersection_traces

    # Calculate approximate amount of traces in grid in horizon area
    coverage = grid_traces / horizon.size

    return coverage

## Carcass

In [None]:
%%time
carcass = filtered_horizon.make_carcass(margin=0, frequencies=frequency,
                                        regular=True, apply_smoothing=True)

carcass.show(show=SHOW_FIGURES, load_kwargs={"enlarge": True})

approximate_grid_coverage = calculate_grid_coverage(
    horizon=filtered_horizon, frequencies=frequency,
    width=3, i_min=0, x_min=0
)

assert np.isclose(carcass.coverage, approximate_grid_coverage, atol=2e-3), "carcass test failed"

## thin_out

In [None]:
%%time
thined_horizon = Horizon(filtered_horizon.full_matrix, field=filtered_horizon.field, name="thined")
thined_horizon.thin_out(factor=(frequency, frequency), threshold=np.min(filtered_horizon.shape)//10)

thined_horizon.show(show=SHOW_FIGURES, load_kwargs={"enlarge": True})

approximate_grid_coverage = calculate_grid_coverage(
    horizon=filtered_horizon, frequencies=frequency, width=1,
    i_min=filtered_horizon.i_min, x_min=filtered_horizon.x_min
)

assert np.isclose(
    thined_horizon.coverage, approximate_grid_coverage, atol=2e-3
), "thin_out test failed"

# Interpolate

In [None]:
%%time
interpolated_horizon = Horizon(thined_horizon.full_matrix,
                               field=filtered_horizon.field, name="interpolated")
interpolated_horizon.interpolate()

interpolated_horizon.show(show=SHOW_FIGURES, load_kwargs={"enlarge": True})

approximate_grid_coverage = calculate_grid_coverage(
    horizon=filtered_horizon, frequencies=frequency, width=3,
    i_min=filtered_horizon.i_min, x_min=filtered_horizon.x_min
)

assert np.isclose(
    interpolated_horizon.coverage, approximate_grid_coverage, atol=2e-3
), "interpolate test failed"

## Holes

In [None]:
%%time
filtering_matrix = filtered_horizon.make_random_holes_matrix(seed=13)
filtering_matrix = filtered_horizon.matrix_put_on_full(filtering_matrix)

horizon_with_holes = Horizon(filtered_horizon.full_matrix,
                             field=filtered_horizon.field,
                             name="with_holes")
horizon_with_holes.filter(filtering_matrix)

if SHOW_FIGURES:
    plot_image(filtering_matrix, cmap="viridis", scale=SCALE, title="Holes matrix")
    horizon_with_holes.show(scale=SCALE)

assert (
    horizon_with_holes.coverage < filtered_horizon.coverage
), "make_random_holes_matrix test failed"