In [None]:
import numpy as np
import plotly.graph_objects as go
from plotly.colors import sample_colorscale
from plottingtools_emilbowry.Plots import Plots


class LikelihoodPlots(Plots):

	@staticmethod
	def likelihoods(func):
		func.plot = 4
		return func

	@staticmethod
	def mixture_evolution(func):
		func.plot = 5
		return func

	@staticmethod
	def resonator(func):
		func.plot = 6
		return func

	@staticmethod
	def _createPulseAnnotations(observed_signal, marker_style=None, line_style=None):
		if marker_style is None:
			marker_style = dict(color="red", size=8)
		if line_style is None:
			line_style = dict(color="red", width=2)

		pulse_indices = np.where(observed_signal == 1)[0]

		marker_trace = go.Scatter(
			x=pulse_indices,
			y=np.ones_like(pulse_indices),
			mode="markers",
			marker=marker_style,
			showlegend=False,
		)

		shapes = []
		for x_val in pulse_indices:
			shapes.append(dict(type="line", x0=x_val, y0=0, x1=x_val, y1=1, line=line_style))

		return [marker_trace], shapes

	@staticmethod
	def graphLikelihoods(data, **kwargs):
		history, observed_signal = data[0][0]
		time_axis = data[-1]

		fig_parameters = dict(
			shared_xaxes=True, vertical_spacing=0.05, row_heights=[0.5, 0.5]
		)

		if not history:
			return {"traces": [[None], [None]], "layout": {}, "fig_parameters": fig_parameters}

		pulse_traces, pulse_shapes = LikelihoodPlots._createPulseAnnotations(
			observed_signal,
			marker_style=dict(color="black", size=6, symbol="circle-open"),
			line_style=dict(color="rgba(0,0,0,0.3)"),
		)

		traces_row1 = []
		final_likelihood = history[-1]
		traces_row1.append(
			go.Scatter(
				x=time_axis,
				y=final_likelihood,
				mode="lines",
				line=dict(color="blue", width=2),
				showlegend=False,
			)
		)
		traces_row1.extend(pulse_traces)

		traces_row2 = []
		inferred_cutoffs = np.where(observed_signal == 1)[0][1:]
		colors = sample_colorscale("Turbo", np.linspace(0, 1, len(history)))
		for i, state in enumerate(history):
			cutoff = inferred_cutoffs[i]
			future_mask = time_axis >= cutoff
			traces_row2.append(
				go.Scatter(
					x=time_axis[future_mask],
					y=state[future_mask],
					mode="lines",
					line=dict(color=colors[i], width=1.5),
					showlegend=False,
				)
			)

		layout = dict(
			height=700,
			showlegend=False,
			shapes=pulse_shapes,
			yaxis=dict(title_text="Likelihood", range=[0, 1.1 * np.max(final_likelihood)]),
			yaxis2=dict(title_text="Likelihood"),
			xaxis2=dict(title_text="Time (t)"),
		)

		return {
			"traces": [[traces_row1], [traces_row2]],
			"layout": layout,
			"fig_parameters": fig_parameters,
		}

	@staticmethod
	def graphMixtureEvolution(data, **kwargs):
		base_params = LikelihoodPlots.graphLikelihoods(data, **kwargs)

		periods, final_probs = data[0][1]
		trace_row3 = go.Bar(x=periods, y=final_probs, marker_color="green", showlegend=False)

		base_params["traces"].append([trace_row3])
		base_params["fig_parameters"]["row_heights"] = [0.33, 0.33, 0.33]
		base_params["fig_parameters"]["shared_xaxes"] = False
		base_params["fig_parameters"]["subplot_titles"] = (
			"Final Likelihood Forecast",
			"Evolution of Forecasts",
			"Period Probability",
		)

		base_params["layout"]["title_text"] = "Probabilistic Mixture Model Analysis"
		base_params["layout"]["height"] = 800
		base_params["layout"]["xaxis1"] = dict(showticklabels=False)
		base_params["layout"]["xaxis2"]["matches"] = "x1"
		base_params["layout"]["xaxis3"] = dict(title_text="Period (τ)")
		base_params["layout"]["yaxis3"] = dict(title_text="Probability", range=[-0.1, 1.1])
		base_params["layout"]["yaxis"]["range"] = [-0.1, 1.1]
		base_params["layout"]["yaxis2"]["range"] = [-0.1, 1.1]

		return base_params

	@staticmethod
	def graphResonator(data, **kwargs):
		likelihood, observed_signal = data[0][0]
		time_axis = np.arange(len(likelihood))

		pulse_traces, pulse_shapes = LikelihoodPlots._createPulseAnnotations(
			observed_signal,
			marker_style=dict(color="red", size=8),
			line_style=dict(color="red", width=2),
		)

		likelihood_trace = go.Scatter(
			x=time_axis,
			y=likelihood,
			mode="lines",
			line=dict(color="blue", width=2),
			showlegend=False,
		)

		# Correctly construct a flat list of traces
		all_traces = [likelihood_trace]
		all_traces.extend(pulse_traces)

		layout = dict(
			title="Harmonic Resonator Model",
			xaxis_title="Time (t)",
			yaxis_title="Normalized Likelihood",
			yaxis_range=[-0.05, 1.1],
			shapes=pulse_shapes,
		)

		# Ensure the structure forces subplot logic as we discussed previously
		return {"traces": [[all_traces], [None]], "layout": layout}

	_PLOT_MAPPING = {
		4: graphLikelihoods,
		5: graphMixtureEvolution,
		6: graphResonator,
	}
	PLOT_MAPPING = {**Plots.PLOT_MAPPING, **_PLOT_MAPPING}

In [None]:
from abc import ABC, abstractmethod


class model:
	# def generateSyntheticData(regimes, miss_prob=0.0, false_alarm_prob=0.0):
	# 	true_signal = []
	# 	for num_events, period in regimes:
	# 		for _ in range(num_events):
	# 			true_signal.extend([0] * (period - 1))
	# 			true_signal.append(1)
	# 	true_signal = np.array(true_signal, dtype=int)
	# 	if miss_prob == 0 and false_alarm_prob == 0:
	# 		return true_signal, true_signal
	# 	observed_signal = true_signal.copy()
	# 	true_ones_indices = np.where(true_signal == 1)[0]
	# 	miss_mask = np.random.rand(len(true_ones_indices)) < miss_prob
	# 	indices_to_flip_to_zero = true_ones_indices[miss_mask]
	# 	observed_signal[indices_to_flip_to_zero] = 0
	# 	true_zeros_indices = np.where(true_signal == 0)[0]
	# 	false_alarm_mask = np.random.rand(len(true_zeros_indices)) < false_alarm_prob
	# 	indices_to_flip_to_one = true_zeros_indices[false_alarm_mask]
	# 	observed_signal[indices_to_flip_to_one] = 1
	# 	return observed_signal
	@staticmethod
	def generateSyntheticData(
		regimes, miss_prob=0.0, false_alarm_prob=0.0, jitter_std=0.1
	):
		true_signal = []
		for num_events, period in regimes:
			current_time = 0
			for _ in range(num_events):
				# Add jitter to the period (clipped to avoid <= 0)
				noisy_period = max(1, int(period + np.random.normal(0, jitter_std * period)))
				true_signal.extend([0] * (noisy_period - 1))
				true_signal.append(1)
				current_time += noisy_period
		true_signal = np.array(true_signal, dtype=int)

		# Apply false positives/negatives (unchanged)
		observed_signal = true_signal.copy()
		true_ones_indices = np.where(true_signal == 1)[0]
		miss_mask = np.random.rand(len(true_ones_indices)) < miss_prob
		observed_signal[true_ones_indices[miss_mask]] = 0

		true_zeros_indices = np.where(true_signal == 0)[0]
		false_alarm_mask = np.random.rand(len(true_zeros_indices)) < false_alarm_prob
		observed_signal[true_zeros_indices[false_alarm_mask]] = 1

		return observed_signal	# , true_signal  # Return both noisy and ground truth

	@abstractmethod
	def computeLikelihood(*args, **kwargs):
		pass

	def __init__(self, *args, observations=None, generator=None, **kwargs):
		if observations is not None:
			self.observed_signal = observations
			self.__call__(*args, **kwargs)	# short circuit generation

		elif generator is None:

			self.observed_signal = model.generateSyntheticData(*args, **kwargs)
		else:
			self.observed_signal = generator(*args, **kwargs)

	def __call__(self, **kwargs):
		_data, t_axis = self.computeLikelihood(self.observed_signal, **kwargs)
		idx = self.computeLikelihood.plot

		_data[0] = [_data[0], self.observed_signal]
		data = [_data, t_axis]
		params = LikelihoodPlots.PLOT_MAPPING[idx](data)
		fig = LikelihoodPlots.createGraph(params, display_graph=True)

In [None]:
# --- Constants for the Model ---


class waveformSuperposition(model):

	FUTURE_HORIZON = 500
	RECENCY_DECAY = 0.85	# Decay factor for the entire likelihood signal
	WAVEFORM_WIDTH_FACTOR = 0.1	# Width of gaussian as a fraction of the period
	WAVEFORM_AMPLITUDE = 1.0	# Amplitude for each new waveform

	@staticmethod
	def generateWaveform(time_axis, center, width, amplitude):
		return amplitude * np.exp(-0.5 * ((time_axis - center) / width) ** 2)

	@staticmethod
	@LikelihoodPlots.likelihoods
	def computeLikelihood(
		observed_signal, cached_time=[0], **kwargs
	):	# hotfix for sensible plot
		total_length = len(observed_signal) + waveformSuperposition.FUTURE_HORIZON
		likelihood = np.zeros(total_length)
		time_axis = np.arange(total_length)

		pulse_times = np.where(observed_signal == 1)[0]

		likelihood_history = []

		if len(pulse_times) < 2:
			return likelihood_history, time_axis

		for i in range(1, len(pulse_times)):
			t_current = pulse_times[i]
			t_previous = pulse_times[i - 1]

			period = t_current - t_previous
			if period == 0:
				continue
			if period != cached_time[0] and cached_time[0] != 0:
				if period % cached_time[0] == 0:
					period = cached_time[0]
			cached_time[0] = period

			next_expected_time = t_current + period
			width = max(1, period * waveformSuperposition.WAVEFORM_WIDTH_FACTOR)

			new_waveform = waveformSuperposition.generateWaveform(
				time_axis, next_expected_time, width, waveformSuperposition.WAVEFORM_AMPLITUDE
			)

			likelihood = (waveformSuperposition.RECENCY_DECAY * likelihood) + new_waveform
			likelihood_history.append(likelihood.copy())

		return [likelihood_history], time_axis


class mixtureModel(model):
	FUTURE_HORIZON = 500
	MIN_PERIOD = 20
	MAX_PERIOD = 100
	PERIOD_UNCERTAINTY_FACTOR = 0.05

	@staticmethod
	def generateWaveform(time_axis, center, width, amplitude):
		return amplitude * np.exp(-0.5 * ((time_axis - center) / width) ** 2)

	@staticmethod
	def generateLikelihoodFromProbs(probabilities, periods, last_pulse_time, time_axis):
		likelihood = np.zeros_like(time_axis, dtype=float)
		for i, period in enumerate(periods):
			probability = probabilities[i]
			if probability < 1e-6:
				continue
			center = last_pulse_time + period
			width = max(1, period * mixtureModel.PERIOD_UNCERTAINTY_FACTOR * 5)
			waveform = mixtureModel.generateWaveform(time_axis, center, width, probability)
			likelihood += waveform
		return likelihood

	@staticmethod
	@LikelihoodPlots.mixture_evolution
	def computeLikelihood(observed_signal, **kwargs):
		pulse_times = np.where(observed_signal == 1)[0]
		total_length = len(observed_signal) + mixtureModel.FUTURE_HORIZON
		time_axis = np.arange(total_length)

		candidate_periods = np.arange(mixtureModel.MIN_PERIOD, mixtureModel.MAX_PERIOD + 1)
		period_probabilities = np.ones_like(candidate_periods, dtype=float)
		period_probabilities /= period_probabilities.sum()

		likelihood_history = []

		if len(pulse_times) < 2:
			return likelihood_history, time_axis, candidate_periods, period_probabilities

		for i in range(1, len(pulse_times)):
			delta_t = pulse_times[i] - pulse_times[i - 1]

			update_variance = (candidate_periods * mixtureModel.PERIOD_UNCERTAINTY_FACTOR) ** 2
			update_likelihoods = np.exp(
				-0.5 * ((delta_t - candidate_periods) ** 2 / update_variance)
			)
			period_probabilities *= update_likelihoods

			if period_probabilities.sum() > 0:
				period_probabilities /= period_probabilities.sum()
			else:
				period_probabilities.fill(1.0 / len(period_probabilities))

			current_likelihood = mixtureModel.generateLikelihoodFromProbs(
				period_probabilities, candidate_periods, pulse_times[i], time_axis
			)

			likelihood_history.append(current_likelihood)

		return [likelihood_history, [candidate_periods, period_probabilities]], time_axis


class resonatorModel(model):

	FUTURE_HORIZON = 500
	DAMPING_FACTOR = 0.85	# Controls decay rate
	DEFAULT_PERIOD = 50

	def getResonatorCoefficients(period, damping):
		omega = 2.0 * np.pi / period
		c1 = 2.0 * damping * np.cos(omega)
		c2 = -(damping**2)
		return c1, c2

	@staticmethod
	@LikelihoodPlots.resonator
	def computeLikelihood(observed_signal, **kwargs):
		total_length = len(observed_signal) + resonatorModel.FUTURE_HORIZON
		raw_oscillation = np.zeros(total_length + 1)

		input_signal = np.pad(observed_signal, (0, resonatorModel.FUTURE_HORIZON), "constant")
		pulse_times = np.where(observed_signal == 1)[0]

		if len(pulse_times) < 1:
			return raw_oscillation

		period = float(resonatorModel.DEFAULT_PERIOD)
		damping_factor = (resonatorModel.DAMPING_FACTOR) ** (1 / period)
		c1, c2 = resonatorModel.getResonatorCoefficients(period, damping_factor)
		last_pulse_time = 0

		for t in range(1, total_length):
			if input_signal[t] > 0:
				current_pulse_time = t
				if last_pulse_time > 0:
					new_period = current_pulse_time - last_pulse_time
					if new_period > 1:
						old_period = period
						period = new_period
						damping_factor = damping_factor ** (old_period / new_period)
						c1, c2 = resonatorModel.getResonatorCoefficients(period, damping_factor)
				last_pulse_time = current_pulse_time
				raw_oscillation[t] = 1.0
				omega = 2.0 * np.pi / period
				raw_oscillation[t - 1] = (damping_factor) ** 2 * np.cos(omega)

			raw_oscillation[t + 1] = c1 * raw_oscillation[t] + c2 * raw_oscillation[t - 1]
		likelihood = np.maximum(0, raw_oscillation)

		return [likelihood], None	# None as convention since we didnt define a time axis


if __name__ == "__main__":
	signal_regimes = [(20, 40), (20, 75)]
	obs = model.generateSyntheticData(
		regimes=signal_regimes, miss_prob=0.1, false_alarm_prob=0.0
	)
	# waveFormSuperPosition(regimes=signal_regimes, miss_prob=0.1, false_alarm_prob=0
	waveformSuperposition(observations=obs)

	mixtureModel(observations=obs)

	resonatorModel(observations=obs)

In [47]:
class resonatorModel(model):

	FUTURE_HORIZON = 500
	DAMPING_FACTOR = 0.85	# Controls decay rate
	DEFAULT_PERIOD = 50

	def getResonatorCoefficients(period, damping):
		omega = 2.0 * np.pi / period
		c1 = 2.0 * damping * np.cos(omega)
		c2 = -(damping**2)
		return c1, c2

	@staticmethod
	def computeLikelihood(observed_signal, **kwargs):
		total_length = len(observed_signal) + resonatorModel.FUTURE_HORIZON
		raw_oscillation = np.zeros(total_length + 1)

		input_signal = np.pad(observed_signal, (0, resonatorModel.FUTURE_HORIZON), "constant")
		pulse_times = np.where(observed_signal == 1)[0]

		if len(pulse_times) < 1:
			return raw_oscillation

		period = float(resonatorModel.DEFAULT_PERIOD)
		damping_factor = (resonatorModel.DAMPING_FACTOR) ** (1 / period)
		c1, c2 = getResonatorCoefficients(period, damping_factor)
		last_pulse_time = 0

		for t in range(1, total_length):
			if input_signal[t] > 0:
				current_pulse_time = t
				if last_pulse_time > 0:
					new_period = current_pulse_time - last_pulse_time
					if new_period > 1:
						old_period = period
						period = new_period
						damping_factor = damping_factor ** (old_period / new_period)
						c1, c2 = getResonatorCoefficients(period, damping_factor)
				last_pulse_time = current_pulse_time
				raw_oscillation[t] = 1.0
				omega = 2.0 * np.pi / period
				raw_oscillation[t - 1] = (damping_factor) ** 2 * np.cos(omega)

			raw_oscillation[t + 1] = c1 * raw_oscillation[t] + c2 * raw_oscillation[t - 1]
		likelihood = np.maximum(0, raw_oscillation)

		return [likelihood], None

	@staticmethod
	def _graph(data, **kwargs):
		params = LikelihoodPlots.graphResonator(data)
		return params


if __name__ == "__main__":
	signal_regimes = [(10, 50), (10, 80)]
	resonatorModel(regimes=signal_regimes, miss_prob=0.1, false_alarm_prob=0.0)

In [48]:
import numpy as np


def generateSyntheticDataWithBAC(regimes, miss_prob=0.05, false_alarm_prob=0.01):
	"""'regimes' is a list of tuples: [(num_events, period), ...]"""

	true_signal = []
	for num_events, period in regimes:
		for _ in range(num_events):
			true_signal.extend([0] * (period - 1))
			true_signal.append(1)
	true_signal = np.array(true_signal, dtype=int)

	observed_signal = true_signal.copy()

	true_ones_indices = np.where(true_signal == 1)[0]
	miss_mask = np.random.rand(len(true_ones_indices)) < miss_prob
	indices_to_flip_to_zero = true_ones_indices[miss_mask]
	observed_signal[indices_to_flip_to_zero] = 0

	true_zeros_indices = np.where(true_signal == 0)[0]
	false_alarm_mask = np.random.rand(len(true_zeros_indices)) < false_alarm_prob
	indices_to_flip_to_one = true_zeros_indices[false_alarm_mask]
	observed_signal[indices_to_flip_to_one] = 1

	return observed_signal, true_signal


if __name__ == "__main__":
	SIGNAL_REGIMES = [
		(20, 30),
		(15, 70),
		(25, 45),
	]

	observations, true_signal = generateSyntheticDataWithBAC(
		SIGNAL_REGIMES, miss_prob=0.05, false_alarm_prob=0.01
	)

	num_true_events = np.sum(true_signal)
	num_observed_events = np.sum(observations)

In [49]:
import numpy as np
import plotly.graph_objects as go

# --- Constants for the Model ---
FUTURE_HORIZON = 500
DAMPING_FACTOR = 0.995	# How slowly the resonance decays (closer to 1 is slower)
DEFAULT_PERIOD = 50	# Initial guess for the period


def generateSyntheticData(regimes, miss_prob=0.0, false_alarm_prob=0.0):
	true_signal = []
	for num_events, period in regimes:
		for _ in range(num_events):
			true_signal.extend([0] * (period - 1))
			true_signal.append(1)
	true_signal = np.array(true_signal, dtype=int)
	if miss_prob == 0 and false_alarm_prob == 0:
		return true_signal, true_signal
	observed_signal = true_signal.copy()
	true_ones_indices = np.where(true_signal == 1)[0]
	miss_mask = np.random.rand(len(true_ones_indices)) < miss_prob
	indices_to_flip_to_zero = true_ones_indices[miss_mask]
	observed_signal[indices_to_flip_to_zero] = 0
	true_zeros_indices = np.where(true_signal == 0)[0]
	false_alarm_mask = np.random.rand(len(true_zeros_indices)) < false_alarm_prob
	indices_to_flip_to_one = true_zeros_indices[false_alarm_mask]
	observed_signal[indices_to_flip_to_one] = 1
	return observed_signal, true_signal


def getResonatorCoefficients(period, damping):
	# Calculates IIR filter coefficients for a resonator
	omega = 2.0 * np.pi / period
	c1 = 2.0 * damping * np.cos(omega)
	c2 = -(damping**2)
	return c1, c2


def computeLikelihoodResonator(observed_signal):
	total_length = len(observed_signal) + FUTURE_HORIZON
	likelihood = np.zeros(total_length)

	# Pad observed signal to act as input for the full duration
	input_signal = np.pad(observed_signal, (0, FUTURE_HORIZON), "constant")
	pulse_times = np.where(observed_signal == 1)[0]

	if len(pulse_times) < 1:
		return likelihood

	c1, c2 = getResonatorCoefficients(DEFAULT_PERIOD, DAMPING_FACTOR)
	last_pulse_time = 0

	# The filter calculation loop starts from t=2
	for t in range(2, total_length):
		if input_signal[t - 1] > 0:	# Check if a pulse just occurred
			current_pulse_time = t - 1
			if last_pulse_time > 0:
				period = current_pulse_time - last_pulse_time
				if period > 1:
					c1, c2 = getResonatorCoefficients(period, DAMPING_FACTOR)
			last_pulse_time = current_pulse_time

		likelihood[t] = c1 * likelihood[t - 1] + c2 * likelihood[t - 2] + input_signal[t]

	return likelihood


def plotResults(observed_signal, likelihood):
	time_axis = np.arange(len(likelihood))

	fig = go.Figure()

	# Plot observed signal as stem plot
	marker_line_stems = dict(
		x=np.where(observed_signal == 1)[0],
		y=observed_signal[observed_signal == 1],
		mode="markers",
		name="Observed Pulses",
		marker=dict(color="red", size=8),
		showlegend=True,
	)
	for i in range(len(marker_line_stems["x"])):
		fig.add_shape(
			type="line",
			x0=marker_line_stems["x"][i],
			y0=0,
			x1=marker_line_stems["x"][i],
			y1=marker_line_stems["y"][i],
			line=dict(color="red", width=2),
		)
	fig.add_trace(go.Scatter(**marker_line_stems))

	# Plot likelihood function
	fig.add_trace(
		go.Scatter(
			x=time_axis,
			y=likelihood,
			mode="lines",
			name="Likelihood L(t)",
			line=dict(color="blue", width=2),
		)
	)

	fig.update_layout(
		title="Harmonic Resonator Model",
		xaxis_title="Time (t)",
		yaxis_title="Value / Likelihood",
		legend_title="Signal",
	)
	fig.show()


if __name__ == "__main__":
	# Regimes: (number of events, period)
	signal_regimes = [(20, 50), (15, 80)]

	observed_signal, _ = generateSyntheticData(
		regimes=signal_regimes, miss_prob=0.2, false_alarm_prob=0.0
	)

	likelihood_function = computeLikelihoodResonator(observed_signal)

	plotResults(observed_signal, likelihood_function)