In [1]:
import sympy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from IPython.display import display


In [2]:
def summing_sine_waves(time, frequencies, amplitudes, thetas, baseline=0):
    """Sums sine waves with different frequencies, amplitudes and phases into one wave.

    Returns:
        sinewaves (array): sum of sine waves
        fig (plotly figure): plot of the sum of sine waves
    """
    sinewaves = np.sum(
        amplitudes[:, np.newaxis] * np.sin(2 * np.pi * frequencies[:, np.newaxis] * time + thetas[:, np.newaxis]), axis=0) + baseline

    fig = go.Figure(
        go.Scatter(x=time, y=sinewaves, line=dict(color='royalblue')),
        layout=dict(
            xaxis=dict(title='Time (s)', tickmode='linear', tick0=0, dtick=10),
            yaxis=dict(title='Temperature (°C)'),
            autosize=False,
            height=300,
            width=900,
            margin=dict(l=20, r=20, t=20, b=20)))

    return sinewaves, fig


def multiple_sine_waves(time, frequencies, amplitudes, thetas, baseline=0, legend=False):
    """Plots multiple sine waves with different frequencies, amplitudes and phases."""
    fig = make_subplots(
        rows=len(frequencies),
        cols=1,
        shared_xaxes=True,
        shared_yaxes='all')

    fig.update_layout(
        height=300,
        width=900,
        margin=dict(l=20, r=20, t=20, b=20),
        showlegend=legend)

    sinewave = np.zeros(len(time))
    for i in range(len(frequencies)):
        sinewave = amplitudes[i] * np.sin(2 * np.pi *
                                          frequencies[i] * (time) + thetas[i]) + baseline
        fig.add_scatter(
            x=time,
            y=sinewave,
            row=i+1, col=1)
    fig.show()

    return fig


In [3]:

def symbolic_functions(frequencies, amplitudes, thetas, baseline=0, display_f=True, export_f=False):
    '''The same as summing_sine_waves but with sympy.

    Returns:
        f: lambdify function
        f_dot: lambdify derivative of f
    '''

    x = sympy.symbols('x')
    wavelines = sympy.S.Zero
    for i in range(len(frequencies)):
        wavelines += amplitudes[i] * \
            sympy.sin(2 * sympy.pi * frequencies[i] * x + thetas[i]) + baseline
    # nsimplify to simplify fractions
    f = sympy.nsimplify(wavelines / len(frequencies))
    f_dot = sympy.diff(f, x)

    # display functions in latex before lambdify
    if display_f:
        print("f(x) and f'(x) are:")
        display(f, f_dot)

    # export mathematical functions
    if export_f:
        with open('stimuli_f.txt', 'w') as f_file:
            f_file.write(str(f))
        with open('stimuli_f_dot.txt', 'w') as f_dot_file:
            f_dot_file.write(str(f_dot))

    # lamdify functions
    f = sympy.lambdify(x, f, 'numpy')
    f_dot = sympy.lambdify(x, f_dot, 'numpy')

    return f, f_dot


def stimuli_extra(f, f_dot, time, s_RoC=0.5):
    '''For plotly graphing of f(x), f'(x) and labels.

    Returns:
        labels: 0 for cooling, 1 for heating
        labels_alt: 0 for cooling, 1 for heating, 2 for RoC < s_RoC
    '''
    fig = go.Figure(
        layout=dict(
            xaxis=dict(title='Time (s)'),
            yaxis=dict(title='Temperature (°C) \ RoC (°C/s)')))

    fig.update_layout(
        autosize=False,
        width=900,
        margin=dict(l=20, r=20, t=20, b=20),
        xaxis=dict(
            tickmode='linear',
            tick0=0,
            dtick=10)
    )

    # 0 for cooling, 1 for heating
    labels = (f_dot(time) > 0).astype(int)
    # alternative: 0 for cooling, 1 for heating, 2 for RoC < s_RoC
    labels_alt = np.where(
        np.abs(f_dot(time)) > s_RoC,
        labels, 2)

    func = [f(time), f_dot(time), labels]
    func_names = "f(x)", "f'(x)", "Label"
    colors = 'royalblue', 'skyblue', 'springgreen'
    for idx, i in enumerate(func):
        fig.add_scatter(x=time, y=i, name=func_names[idx])
        fig.data[idx].line.color = colors[idx]

    fig.add_scatter(x=time, y=labels_alt,
                    name="Label (alt)", visible="legendonly")

    fig.show()

    return labels, labels_alt, fig


def cooling_segments(labels, sample_rate):
    '''Displays the number and length of cooling segments.'''
    change = np.concatenate([
        np.array([0]),  # as the sign cannot change with first value
        np.diff(labels > 0)*1], axis=0)

    # returns a list of indices where the conditions have been met
    change_idx = np.where(change == 1)[0]

    match labels[0]:  # label we started with (cooling or heating)
        # in case 0 the first change_idx starts the first heating segment,
        # that is why we start with i=1::2 and not i=0::2
        # (we don't prepend / append any values from np.diff here)
        case 0:
            segments = {
                idx: np.diff(change_idx)[i::2] for idx, i in enumerate(list(range(2))[::-1])
            }
        case 1:
            segments = {
                i: np.diff(change_idx)[i::2] for i in list(range(2))
            }

    # in seconds; only 1 column because jagged arrays can appear
    display(pd.DataFrame(
        {"Cooling segments [s]": segments[0]/sample_rate}
    ).describe().applymap('{:,.2f}'.format))

    return change, change_idx, segments


In [4]:
# Parameters
sample_rate = 10
start_time = 0
end_time = 60*5

time = np.arange(
    start_time,
    end_time,
    1/sample_rate)

baseline = np.array(32)

period_lengths = np.array([18, 60])  # in seconds; 0,3 : 1 gives a good ratio
frequencies = np.array(1./period_lengths)
amplitudes = np.array([4]*len(frequencies))
thetas = np.array([0*np.pi]*len(frequencies))
# for individual phases use for instance thetas = np.array([0,0])*np.pi

len(time)


3000

# Sine Waves


In [5]:
# Plot stimuli
wave, fig = summing_sine_waves(
    time, frequencies, amplitudes, thetas, baseline=baseline)
fig.add_hline(y=int(baseline))
fig.show()

_ = multiple_sine_waves(time, frequencies, amplitudes,
                        thetas, baseline=baseline)


In [6]:
# Mathematical representations
f, f_dot = symbolic_functions(
    frequencies, amplitudes, thetas, baseline=baseline, display_f=True, export_f=True)

# Plot derivative & labels
labels, labels_alt, fig_extra = stimuli_extra(f, f_dot, time, s_RoC=0.3)

# Analyze cooling segments
_ = cooling_segments(labels, sample_rate)


f(x) and f'(x) are:


2*sin(pi*x/30) + 2*sin(pi*x/9) + 32

pi*cos(pi*x/30)/15 + 2*pi*cos(pi*x/9)/9

Unnamed: 0,Cooling segments [s]
count,16.0
mean,9.08
std,1.12
min,7.5
25%,8.1
50%,9.1
75%,10.0
max,10.4


New idea:
1. fluctuating frequencies in high frequency track
1. all above pain threshold
1. 4 minutes
1. 10 - 20 s cooling
1. find out lowest RoC for pain decrease
1. constant plateaus to be able to calculate interaction
1. plateaus only in 25 to 75 % range of amplitude, maybe 1 per minute

TODO
- plateaus can be perceived differently if they follow a peak or not
-> plateaus only after peaks
- refactor cooling_segements with np.where if possible for cleaner code
- add labels for - and + RoC, for no pain and high pain (dont overinterpret, beware of psychological expectation effects)


In [166]:

import math
import random
import numpy as np
import pandas as pd
import scipy.signal
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from IPython.display import display


class StimuliFunction():
    def __init__(self, desired_duration, frequencies, amplitudes, baseline_temp=32, random_phase=True, seed=None):
        if seed is None:
            self.seed = np.random.randint(0, 1000)
        else:
            self.seed = seed
        random.seed(self.seed), np.random.seed(self.seed)

        # Sinusoidal waves where [0] is the baseline and [1] the modulation which gets added on top
        self.frequencies = np.array(frequencies)
        self.periods = 1/self.frequencies
        self.amplitudes = np.array(amplitudes)
        self.thetas = np.zeros(len(frequencies))  # not used for now
        self.baseline_temp = baseline_temp

        # Duration and sampling (without add_ methods)
        self.desired_duration = desired_duration
        # the "true" duration is a multiple of the period of the modulation
        self.duration = math.ceil(
            self.desired_duration / self.periods[1]) * self.periods[1]
        self.sample_rate = 10

        # Additional variables
        self.modulation_n_periods = self.duration / self.periods[1]
        # if True, the phase of the modulation is randomized
        self.random_phase = random_phase

        # Summing self.baseline and self.modulation create the stimuli function self.wave
        self.create_baseline()
        self.create_modulation()
        self.wave = np.array(self.baseline) + np.array(self.modulation)
        self._wave_dot = self.wave_dot
        self.peaks, _ = scipy.signal.find_peaks(
            self.wave, height=self.baseline_temp)

    def create_baseline(self):
        time = np.arange(0, self.duration, 1/self.sample_rate)
        self.baseline = self.amplitudes[0] * np.sin(
            time * 2 * np.pi * self.frequencies[0] + self.thetas[0]) + self.baseline_temp
        return self.baseline

    def create_modulation(self):
        # has to be created period-wise as the frequency varies with every period
        modulation_random_factor = self.noise_that_sums_to_0(
            n=int(self.modulation_n_periods),
            factor=0.6 if self.random_phase else 0)
        self.modulation = []
        for i in range(int(self.modulation_n_periods)):
            period = self.periods[1] + modulation_random_factor[i]
            frequency = 1/period
            time_temp = np.arange(0, 1/frequency, 1/self.sample_rate)
            # wave_temp has to be inverted every second period to get a sinosoidal wave
            if i % 2 == 0:
                wave_temp = self.amplitudes[1] * \
                    np.sin(np.pi * frequency * time_temp)
            else:
                wave_temp = self.amplitudes[1] * \
                    np.sin(np.pi * frequency * time_temp) * -1
            self.modulation.extend(wave_temp)
        self.modulation = np.array(self.modulation)
        return self.modulation
      
    @property
    def wave_dot(self):
        self._wave_dot = np.gradient(self.wave, 1/self.sample_rate) # dx in seconds
        return self._wave_dot

    def add_prolonged_peaks(self, time_to_be_added_per_peak, percetage_of_peaks):
        peaks_chosen = np.random.choice(self.peaks, int(
            len(self.peaks) * percetage_of_peaks), replace=False)
        wave_new = []
        for i in range(len(self.wave)):
            wave_new.append(self.wave[i])
            if i in peaks_chosen:
                wave_new.extend([self.wave[i]] *
                                time_to_be_added_per_peak * self.sample_rate)
        self.wave = np.array(wave_new)
        return self.wave

    def add_plateaus(self, n_plateaus, duration_of_plateau):
        q25, q75 = np.percentile(self.wave, 25), np.percentile(self.wave, 75)
        # only for IQR temp and only when temp is rising
        idx_iqr_values = [
            i 
            for i in range(len(self.wave)) 
            if self.wave[i] > q25 and self.wave[i] < q75 and self.wave_dot[i] > 0.02
        ]
        idx_plateaus = np.sort(np.random.choice(
            idx_iqr_values, n_plateaus, replace=False))
        wave_new = []
        for i in range(len(self.wave)):
            wave_new.append(self.wave[i])
            if i in idx_plateaus:
                wave_new.extend([self.wave[i]] * duration_of_plateau * self.sample_rate)
        self.wave = np.array(wave_new)
        return self.wave

    def noise_that_sums_to_0(self, n, factor):
        """Returns noise vector to be added to the period of the modulation."""
        # create noise for n/2
        noise = np.random.uniform(
            -factor * self.periods[1], 
            factor * self.periods[1],
            size = int(n/2))
        # double the noise with inverted values to sum to 0
        noise = np.concatenate((noise, -noise))
        # add 0 if length of n is odd
        if n % 2 == 1:
            noise = np.append(noise, 0)
        random.shuffle(noise)
        return np.round(noise)

    def plot(self, wave, baseline_temp=False):
        time = np.array(range(len(wave))) / self.sample_rate
        fig = go.Figure(
            go.Scatter(x=time, y=wave, line=dict(color='royalblue')),
            layout=dict(
                xaxis=dict(
                    title='Time (s)', tickmode='linear',
                    tick0=0, dtick=10),
                yaxis=dict(
                    title='Temperature (°C)'),
                    autosize=False,
                    height=300,
                    width=900,
                    margin=dict(l=20, r=20, t=20, b=20)))
        if baseline_temp:
            fig.add_hline(y=int(self.baseline_temp))
        return fig



def stimuli_extra(f, f_dot, time, s_RoC=0.5):
    '''For plotly graphing of f(x), f'(x) and labels.

    Returns:
        labels: 0 for cooling, 1 for heating
        labels_alt: 0 for cooling, 1 for heating, 2 for RoC < s_RoC
    '''
    fig = go.Figure(
        layout=dict(
            xaxis=dict(title='Time (s)'),
            yaxis=dict(title='Temperature (°C) \ RoC (°C/s)')))

    fig.update_layout(
        autosize=False,
        width=900,
        margin=dict(l=20, r=20, t=20, b=20),
        xaxis=dict(
            tickmode='linear',
            tick0=0,
            dtick=10))
    
    # 0 for cooling, 1 for heating
    labels = (f_dot > 0).astype(int)
    # alternative: 0 for cooling, 1 for heating, 2 for RoC < s_RoC
    labels_alt = np.where(
        np.abs(f_dot) > s_RoC,
        labels, 2)

    func = [f, f_dot, labels]
    func_names = "f(x)", "f'(x)", "Label"
    colors = 'royalblue', 'skyblue', 'springgreen'
    for idx, i in enumerate(func):
        fig.add_scatter(x=time, y=i, name=func_names[idx])
        fig.data[idx].line.color = colors[idx]

    fig.add_scatter(x=time, y=labels_alt,
                    name="Label (alt)", visible="legendonly")

    fig.show()

    return labels, labels_alt, fig


def cooling_segments(labels, sample_rate):
    '''Displays the number and length of cooling segments.'''
    change = np.concatenate([
        np.array([0]),  # as the sign cannot change with first value
        np.diff(labels > 0)*1], axis=0)

    # returns a list of indices where the conditions have been met
    change_idx = np.where(change == 1)[0]

    match labels[0]:  # label we started with (cooling or heating)
        # in case 0 the first change_idx starts the first heating segment,
        # that is why we start with i=1::2 and not i=0::2
        # (we don't prepend / append any values from np.diff here)
        case 0:
            segments = {
                idx: np.diff(change_idx)[i::2] for idx, i in enumerate(list(range(2))[::-1])
            }
        case 1:
            segments = {
                i: np.diff(change_idx)[i::2] for i in list(range(2))
            }

    # in seconds; only 1 column because jagged arrays can appear
    display(pd.DataFrame(
        {"Cooling segments [s]": segments[0]/sample_rate}
    ).describe().applymap('{:,.2f}'.format))

    return change, change_idx, segments

In [221]:
amplitudes = [1.5, 2]
periods = [67, 10]  # 1 : 3 gives a good result
frequencies = 1./np.array(periods)
duration = 200

test = StimuliFunction(duration, frequencies, amplitudes, baseline_temp=32, random_phase=True, seed=764)
print(f"Seed: {test.seed}")

test.add_plateaus(n_plateaus=4, duration_of_plateau=15)
test.plot(test.wave, baseline_temp=True)


Seed: 764


In [223]:
# Plot derivative & labels
labels, labels_alt, _ = stimuli_extra(
    test.wave, 
    test.wave_dot,
    np.array(range(len(test.wave))) / test.sample_rate, 
    s_RoC=0.2)

# Analyze cooling segments
_ = cooling_segments(labels, test.sample_rate)


Unnamed: 0,Cooling segments [s]
count,14.0
mean,11.38
std,3.39
min,5.4
25%,8.6
50%,12.35
75%,14.57
max,14.9
