In [None]:
import os
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import torch
from snr.calc_snr import str2snr_strategy
from dataset_creation.noisy_dataset import create_single_noisy_version
from dataset_creation.utils import get_n_random_noises
from utils.common import assert_path_exists

# Create a Noisy Dataset

## Requirements And Configuration

What is your user root directory?  (`/home/<username>/` on linux machines e.g.)

In [None]:
USER_ROOT_DIR='/home/moshe/'
assert_path_exists(path_str=USER_ROOT_DIR, name='USER_ROOT_DIR')
USER_ROOT_DIR

What is the root folder of your datasets?

In [None]:
DATASETS_ROOT_DIR= os.path.join(USER_ROOT_DIR,'datasets/GFZ/')
assert_path_exists(path_str=DATASETS_ROOT_DIR, name='DATASETS_ROOT_DIR')
DATASETS_ROOT_DIR

The generated noisy traces should be a synthetic version of traces taken from the following dataset ('ethz', 'geofon'):

In [None]:
DATASETS_ORIGINS = ['ethz', 'geofon']
dataset_origin = 'ethz'
assert dataset_origin in DATASETS_ORIGINS, f'Expected dataset one of {DATASETS_ORIGINS}. Got {dataset_origin}.'

In [None]:
NUM_SAMPLES=3001                    # Trace sample length - If the dataset is built for phasenet: 3001 If it is for EQTransformer: 6000
NUM_NOISY_VERSIONS_PER_TRACE=1      # How many noisy versions to synthesize using a single real trace
DESIRED_SNR=7                      # What SNR level should the noisy synthesized trace be
SAMPLING_RATE=100                   # Sampling Rate - PhaseNet and EQTransformer expect 100Hz
NUM_OF_ORIGINAL_TRACES = 2100       # How many original traces to use for the noisy dataset - use slice from the start

In [None]:
SNR_CALC_STRATEGY_STR_ALTERNATIVES = ['energy_ratio', 'max_amplitude_vs_rms_ratio']
SNR_CALC_STRATEGY_STR = 'energy_ratio'
assert SNR_CALC_STRATEGY_STR in SNR_CALC_STRATEGY_STR_ALTERNATIVES, f'Expected one of {SNR_CALC_STRATEGY_STR_ALTERNATIVES}. Got {SNR_CALC_STRATEGY_STR}'
SNR_CALC_STRATEGY=str2snr_strategy(SNR_CALC_STRATEGY_STR)  # Function used to estimate the trace Signal to Noise Ratio(SNR) -  ENERGY_RATIO orMAX_AMPLITUDE_VS_RMS_RATIO
SNR_CALC_STRATEGY

If the generated noises should allow shifting experiments they should be longer than the original trace.
Define how many 1-second-shifts will the dataset enable.

In [None]:
NUM_SHIFTS=6
AUGMENTED_WINDOW_SIZE=NUM_SAMPLES+SAMPLING_RATE*NUM_SHIFTS

Browse The path of the **event** traces that will be used for synthesizing.

In [None]:
DATASET_PATH= os.path.join(DATASETS_ROOT_DIR, f'noisy_datasets/{dataset_origin}_{NUM_SAMPLES}_sample_joachim_noises_{SNR_CALC_STRATEGY_STR}_snr/')
assert_path_exists(path_str=DATASET_PATH, name='DATASET_PATH')
DATASET_PATH

Browse The path of the **noise** traces that will be used for synthesizing.

In [None]:
NOISES_PATH= os.path.join(DATASETS_ROOT_DIR,'Noises')
assert_path_exists(path_str=NOISES_PATH, name='NOISES_PATH')
NOISES_PATH

In [None]:
dataset_traces_path = os.path.join(DATASET_PATH, 'original_dataset.pt')
dataset_labels_path = os.path.join(DATASET_PATH, 'original_labels.pt')

assert_path_exists(path_str=dataset_traces_path, name='dataset_traces_path')
assert_path_exists(path_str=dataset_labels_path, name='dataset_labels_path')

## Load Dataset

In [None]:
dataset=torch.load(dataset_traces_path)[:NUM_OF_ORIGINAL_TRACES]
labels=torch.load(dataset_labels_path)[:NUM_OF_ORIGINAL_TRACES]

num_traces = dataset.shape[0]
num_labels = labels.shape[0]
num_samples = dataset.shape[-1]

assert num_labels==num_traces, f'Expected traces equal num labels.Got {num_traces} traces and {num_labels} labels'
assert num_samples==NUM_SAMPLES, f'Expected {NUM_SAMPLES} in each trace. Got {num_samples}.'

print(f'Loaded {num_traces} traces and corresponding labels.')

## Create a Noisy Dataset

In [None]:
noised_traces_list, noised_traces_labels_list, augmented_noise_traces_list, factors_list, indices_not_used_list = [],[],[],[], []
pbar = tqdm(range(NUM_NOISY_VERSIONS_PER_TRACE))
for i in pbar:
    # prepare full noises traces
    augmented_noise_traces_created: torch.tensor = get_n_random_noises(num_noises=num_traces, desired_window_size=AUGMENTED_WINDOW_SIZE, noises_path=NOISES_PATH, force_resample=True, filename='aaa', sampling_rate=SAMPLING_RATE, silent_exception_prints=True).squeeze()

    version_noised_traces, version_labels, version_full_noise_traces, version_factors, version_not_included_indices =  create_single_noisy_version(original_traces=dataset, original_labels=labels, augmented_noise_traces=augmented_noise_traces_created, desired_snr=DESIRED_SNR, snr_strategy=SNR_CALC_STRATEGY)
    noised_traces_list.append(version_noised_traces)
    noised_traces_labels_list.append(version_labels.unsqueeze(dim=1))
    augmented_noise_traces_list.append(version_full_noise_traces)
    factors_list.append(version_factors.unsqueeze(dim=1))
    indices_not_used_list.extend(version_not_included_indices)
    pbar.set_description(f'Lists len {len(noised_traces_list), len(noised_traces_labels_list), len(augmented_noise_traces_list), len(factors_list)}')

In [None]:
noised_traces = torch.vstack(noised_traces_list)
noised_traces_labels = torch.vstack(noised_traces_labels_list).squeeze()
augmented_noise_traces = torch.vstack(augmented_noise_traces_list)
noising_factors = torch.vstack(factors_list).squeeze()
indices_not_used = torch.tensor(list(set(indices_not_used_list)))
noised_traces.shape, noised_traces_labels.shape, augmented_noise_traces.shape, noising_factors.shape, indices_not_used.shape

## Save Noisy Indices To Files

In [None]:
noisy_dataset_path = os.path.join(DATASET_PATH,f'noisy_dataset_snr_{DESIRED_SNR}')
assert_path_exists(path_str=noisy_dataset_path, name='noisy_dataset_path')
noisy_dataset_path

In [None]:
torch.save(noised_traces, os.path.join(noisy_dataset_path, 'traces.pt'))
torch.save(augmented_noise_traces, os.path.join(noisy_dataset_path, 'full_noise_traces.pt'))
torch.save(noised_traces_labels, os.path.join(noisy_dataset_path, 'labels.pt'))
torch.save(noising_factors, os.path.join(noisy_dataset_path, 'factors.pt'))
torch.save(indices_not_used, os.path.join(noisy_dataset_path,'indices_not_used'))

## Plot Noising Example

In [None]:
idx = 6
trace  = noised_traces[idx]
label  = noised_traces_labels[idx]
factor = noising_factors[idx]
noise = augmented_noise_traces[idx, :, :trace.shape[-1] ]

fig, (ax_orig,ax_noise, ax_noised) = plt.subplots(1,3,figsize=(20,8), sharey='all')

ax_orig.plot((trace - factor * noise)[0]);
ax_orig.vlines(x=label, ymin=-1, ymax=1,  label='Onset', linestyles='dashed');
ax_orig.set_title('Original Trace')
ax_noise.plot(factor*noise[0]);
ax_noise.set_title('Noise added')
ax_noised.plot(trace[0]);
ax_noised.set_title('Noised Trace')

ax_noised.vlines(x=label, ymin=-1, ymax=1,  label='Onset', linestyles='dashed');

## A Unified Version of The Noisy Datasets

For metrics benchmark it is preferred to create several SNR datasets using the same noises. For each SNR, a single noisy version is created.

First, prepare the noise that will be used for all SNR levels.

In [None]:
# prepare full noises traces
augmented_noise_traces_created: torch.tensor = get_n_random_noises(num_noises=num_traces, desired_window_size=AUGMENTED_WINDOW_SIZE, noises_path=NOISES_PATH, force_resample=True, filename='aaa', sampling_rate=SAMPLING_RATE, silent_exception_prints=True).squeeze()

In [None]:
desired_snr_list = list(range(2,11))
noisy_dataset_paths = {}
for desired_snr in desired_snr_list:
    noisy_dataset_path = os.path.join(DATASET_PATH,f'noisy_dataset_snr_{desired_snr}')
    assert_path_exists(path_str=noisy_dataset_path, name='noisy_dataset_path')
    noisy_dataset_paths[desired_snr] = noisy_dataset_path

pbar = tqdm(desired_snr_list)
for desired_snr in pbar:
    noised_traces, noised_traces_labels, full_noise_traces_used, factors, indices_not_used_list =  create_single_noisy_version(original_traces=dataset, original_labels=labels, augmented_noise_traces=augmented_noise_traces_created, desired_snr=desired_snr, snr_strategy=SNR_CALC_STRATEGY)

    indices_not_used = torch.tensor(list(set(indices_not_used_list)))
    noisy_dataset_path = noisy_dataset_paths[desired_snr]
    torch.save(noised_traces, os.path.join(noisy_dataset_path, 'traces.pt'))
    torch.save(augmented_noise_traces, os.path.join(noisy_dataset_path, 'full_noise_traces.pt'))
    torch.save(noised_traces_labels, os.path.join(noisy_dataset_path, 'labels.pt'))
    torch.save(noising_factors, os.path.join(noisy_dataset_path, 'factors.pt'))
    torch.save(indices_not_used, os.path.join(noisy_dataset_path,'indices_not_used'))
    pbar.set_description(f'SNR {desired_snr}: {noised_traces.shape[0]} noised traces created and saved.')
    del noised_traces, noised_traces_labels, full_noise_traces_used, factors, indices_not_used_list , indices_not_used