Skip to content

Commit

Permalink
swm updated
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanhammonds committed Jun 3, 2021
1 parent f8b899d commit 0f3f6f2
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 132 deletions.
225 changes: 97 additions & 128 deletions neurodsp/rhythm/swm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
###################################################################################################

@multidim()
def sliding_window_matching(sig, fs, win_len, win_spacing, max_iterations=500,
temperature=1, window_starts_custom=None):
def sliding_window_matching(sig, fs, win_len, win_spacing, max_iterations=100,
window_starts_custom=None, var_thresh=None):
"""Find recurring patterns in a time series using the sliding window matching algorithm.
Parameters
Expand All @@ -22,21 +22,20 @@ def sliding_window_matching(sig, fs, win_len, win_spacing, max_iterations=500,
Window length, in seconds. This is L in the original paper.
win_spacing : float
Minimum spacing between windows, in seconds. This is G in the original paper.
max_iterations : int, optional, default: 500
max_iterations : int, optional, default: 100
Maximum number of iterations of potential changes in window placement.
temperature : float, optional, default: 1
Controls the probability of accepting a new window.
window_starts_custom : 1d array, optional
window_starts_custom : 1d array, optional, default: None
Custom pre-set locations of initial windows.
var_thresh: float, opational, default: None
Removes initial windows with variance below a set threshold. This speeds up
runtime proportional to the number of low variance windows in the data.
Returns
-------
avg_window : 1d array
The average pattern discovered in the input signal.
windows : 2d array
Putative patterns discovered in the input signal.
window_starts : 1d array
Indices at which each window begins for the final set of windows.
costs : 1d array
Cost function value at each iteration.
References
----------
Expand All @@ -53,173 +52,143 @@ def sliding_window_matching(sig, fs, win_len, win_spacing, max_iterations=500,
- The `win_spacing` parameter also determines the number of windows that are used.
- If looking at high frequency activity, you may want to apply a highpass filter,
so that the algorithm does not converge on a low frequency motif.
- This implementation is a minimal version, as compared to the original implementation
in [2], which has more available options.
- This version may also be much slower than the original, as this implementation does not
currently include the Markov Chain Monte Carlo sampling to calculate distances.
- This implementation is a minimal, modified version, as compared to the original
implementation in [2], which has more available options.
- This version has the following changes to speed up convergence:
1. Each iteration is similar to an epoch, randomly moving all windows in
random order. The original implementation randomly selects windows and
does not guarantee even resampling.
2. New window acceptance is determined via increased correlation coefficients
and reduced ivariance across windows.
3. Phase optimization / realignment to escape local minima.
Examples
--------
Search for reoccuring patterns using sliding window matching in a simulated beta signal:
>>> from neurodsp.sim import sim_combined
>>> sig = sim_combined(n_seconds=10, fs=500,
... components={'sim_powerlaw': {'f_range': (2, None)},
... 'sim_bursty_oscillation': {'freq': 20,
... 'enter_burst': .25,
... 'leave_burst': .25}})
>>> avg_window, window_starts, costs = sliding_window_matching(sig, fs=500, win_len=0.05,
... win_spacing=0.20)
>>> components={'sim_bursty_oscillation': {'freq': 20, 'phase':'min'},
... 'sim_powerlaw': {'f_range': (2, None)}}
>>> sig = sim_combined(10, fs=500, components=components, component_variances=(1, .05))
>>> windows, starts = sliding_window_matching(sig, fs=500, win_len=0.05, win_spacing=0.05,
... max_iterations=100, var_thresh=.5)
"""

# Compute window length and spacing in samples
win_n_samps = int(win_len * fs)
spacing_n_samps = int(win_spacing * fs)
win_len = int(win_len * fs)
win_spacing = int(win_spacing * fs)

# Initialize window positions
if window_starts_custom is None:
window_starts = np.arange(0, len(sig) - win_n_samps, 2 * spacing_n_samps)
window_starts = np.arange(0, len(sig) - win_len, win_spacing).astype(int)
else:
window_starts = window_starts_custom
n_windows = len(window_starts)

windows = np.array([sig[start:start+win_len] for start in window_starts])

# Calculate initial cost
costs = np.zeros(max_iterations)
costs[0] = _compute_cost(sig, window_starts, win_n_samps)
# New window bounds
lower_bounds, upper_bounds = _compute_bounds(window_starts, win_spacing, 0, len(sig) - win_len)

for iter_num in range(1, max_iterations):
# Remove low variance windows to speed up runtime
if var_thresh != None:

# Find a new, random window start
window_starts_temp = _find_new_window_idx(np.copy(window_starts), len(sig) - win_n_samps,
spacing_n_samps)
thresh = np.array([np.var(sig[loc:loc+win_len]) > var_thresh for loc in window_starts])

# Calculate the cost & the change in the cost function
cost_temp = _compute_cost(sig, window_starts_temp, win_n_samps)
delta_cost = cost_temp - costs[iter_num - 1]
windows = windows[thresh]
window_starts = window_starts[thresh]
lower_bounds = lower_bounds[thresh]
upper_bounds = upper_bounds[thresh]

# Calculate the acceptance probability
p_accept = np.exp(-delta_cost / float(temperature))
# Modified SWM procedure
window_idxs = np.arange(len(windows)).astype(int)

# Accept update, based on the calculated acceptance probability
if np.random.rand() < p_accept:
corrs, variance = _compute_cost(sig, window_starts, win_len)
mae = np.mean(np.abs(windows - windows.mean(axis=0)))

# Update costs & windows
costs[iter_num] = cost_temp
window_starts = window_starts_temp
for _ in range(max_iterations):

else:
# Randomly shuffle order of windows
np.random.shuffle(window_idxs)

# Update costs
costs[iter_num] = costs[iter_num - 1]
for win_idx in window_idxs:

# Calculate average window
avg_window = np.zeros(win_n_samps)
for w_ind in range(n_windows):
avg_window = avg_window + sig[window_starts[w_ind]:window_starts[w_ind] + win_n_samps]
avg_window = avg_window / float(n_windows)
# Find a new, random window start
_window_starts = window_starts.copy()
_window_starts[win_idx] = np.random.choice(np.arange(lower_bounds[win_idx],
upper_bounds[win_idx]+1))

return avg_window, window_starts, costs
# Accept new window if correlation increases and variance decreases
_corrs, _variance = _compute_cost(sig, _window_starts, win_len)

if _corrs[win_idx].sum() > corrs[win_idx].sum() and _variance < variance:

def _compute_cost(sig, window_starts, win_n_samps):
"""Compute the cost, which is proportional to the difference between pairs of windows.
corrs = _corrs.copy()
variance = _variance
window_starts = _window_starts.copy()
lower_bounds, upper_bounds = _compute_bounds(window_starts, win_spacing, 0, len(sig) - win_len)

Parameters
----------
sig : 1d array
Time series.
window_starts : list of int
The list of window start definitions.
win_n_samps : int
The length of each window, in samples.
# Phase optimization
_window_starts = window_starts.copy()

Returns
-------
cost: float
The calculated cost of the current set of windows.
"""
for shift in np.arange(-int(win_len/2), int(win_len/2)):

# Get all windows and z-score them
n_windows = len(window_starts)
windows = np.zeros((n_windows, win_n_samps))
for ind, window in enumerate(window_starts):
temp = sig[window:window_starts[ind] + win_n_samps]
windows[ind] = (temp - np.mean(temp)) / np.std(temp)
_starts = _window_starts + shift

# Calculate distances for all pairs of windows
dists = []
for ind1 in range(n_windows):
for ind2 in range(ind1 + 1, n_windows):
window_diff = windows[ind1] - windows[ind2]
dist_temp = np.sum(window_diff**2) / float(win_n_samps)
dists.append(dist_temp)
# Skip windows shifts that are out-of-bounds
if (_starts[0] < 0) or (_starts[-1] > len(sig) - win_len):
continue

# Calculate cost function, which is the average difference, roughly
cost = np.sum(dists) / float(2 * (n_windows - 1))
_windows = np.array([sig[start:start+win_len] for start in _starts])

return cost
_mae = np.mean(np.abs(_windows - _windows.mean(axis=0)))

if _mae < mae:
window_starts = _starts.copy()
windows = _windows.copy()
mae = _mae

def _find_new_window_idx(windows, max_start, spacing_n_samps, tries_limit=1000):
"""Find a new sample for the starting window.
lower_bounds, upper_bounds = _compute_bounds(window_starts, win_spacing, 0, len(sig) - win_len)

return windows, window_starts


def _compute_cost(sig, window_starts, win_n_samps, start=None, end=None):
"""Compute the cost, as corrleation coefficients and variance across windows.
Parameters
----------
windows : 1d array
Indices at which each window begins.
max_start : int
The largest possible start index for the window.
spacing_n_samps : int
The minimum spacing required between windows, in samples.
tries_limit : int, optional, default: False
The maximum number of iterations to attempt to find a new window.
sig : 1d array
Time series.
window_starts : list of int
The list of window start definitions.
win_n_samps : int
The length of each window, in samples.
Returns
-------
windows : 1d array
Indices, including the updated window, at which each window begins.
corrs: 2d array
Window correlation matrix.
variance: float
Sum of the variance across windows.
"""

new_win = None

for _ in range(tries_limit):

# Randomly select current window
current_idx = np.random.choice(range(len(windows)))
current_win = windows[current_idx]

# Find adjacent window starts
if current_idx != 0:
lower_bound = windows[current_idx - 1] + spacing_n_samps
else:
lower_bound = 0

if current_idx != len(windows) - 1:
upper_bound = windows[current_idx + 1] - spacing_n_samps
else:
upper_bound = max_start
windows = np.array([sig[start:start+win_n_samps] for start in window_starts])

# Create allowed random positions/indices
new_wins = np.arange(lower_bound, upper_bound + 1)
new_wins = new_wins[np.where((new_wins >= 0) & (new_wins <= max_start))[0]]
corrs = np.corrcoef(windows)

# Drop the current window index so it can't be randomly sampled
new_wins = np.delete(new_wins, np.where(new_wins == current_win)[0][0])
variance = windows.var(axis=1).sum()

if len(new_wins) == 0:
# All new samples are too close to adjacent window starts, try another random position
continue
return corrs, variance

# Randomly select allowed sample position
new_win= np.random.choice(new_wins)

break
def _compute_bounds(window_starts, win_spacing, start=None, end=None):

if new_win is None:
raise RuntimeError('SWM algorithm has difficulty finding a new window. \
Try increasing the spacing parameter.')
lower_bounds = window_starts[:-1] + win_spacing
lower_bounds = np.insert(lower_bounds, 0, start)

windows[current_idx] = new_win
upper_bounds = window_starts[1:] - win_spacing
upper_bounds = np.insert(upper_bounds, len(upper_bounds), end)

return windows
return lower_bounds, upper_bounds
8 changes: 4 additions & 4 deletions neurodsp/tests/rhythm/test_swm.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ def test_sliding_window_matching(tsig):

win_len, win_spacing = 1, 0.5

pattern, starts, costs = sliding_window_matching(tsig, FS, win_len, win_spacing)
assert pattern.shape[-1] == int(FS * win_len)
windows, starts = sliding_window_matching(tsig, FS, win_len, win_spacing, var_thresh=0.1)
assert windows.shape[-1] == int(FS * win_len)

def test_sliding_window_matching_2d(tsig2d):

win_len, win_spacing = 1, 0.5

pattern, starts, costs = sliding_window_matching(tsig2d, FS, win_len, win_spacing)
assert pattern.shape[-1] == int(FS * win_len)
windows, starts = sliding_window_matching(tsig2d, FS, win_len, win_spacing, var_thresh=0.1)
assert windows.shape[-1] == int(FS * win_len)

0 comments on commit 0f3f6f2

Please sign in to comment.