In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from IPython.display import display
import sympy

In [2]:
# Code modified from 
# https://datacrayon.com/posts/signal-processing/sp-basics/summing-sine-waves/

def summing_sine_waves(frequency, amplitude, theta, baseline=0):
    
    sinewave = np.zeros(len(time))
    for i in range(len(frequency)):
        sinewave += amplitude[i] * np.sin(2 * np.pi * frequency[i] * (time) + theta[i]) + baseline
    sinewave = sinewave/len(frequency)
    fig = go.Figure(
        layout=dict(
            xaxis=dict(title='Time (min)'),
            yaxis=dict(title='Temperature (°C)')))

    fig.update_layout(
        autosize=False,
        width=900,
        margin=dict(l=20, r=20, t=20, b=20),
        xaxis = dict(
            tickmode = 'linear',
            tick0 = 0,
            dtick = 1)
        )

    fig.add_scatter(x=time, y=sinewave)
    fig.data[0].line.color = 'royalblue'
            
    return sinewave,fig


def multiple_sine_waves(frequency, amplitude, theta, baseline=0, legend=False):

    fig = make_subplots(
        rows=len(frequency), 
        cols=1, 
        shared_xaxes=True,
        shared_yaxes='all')

    fig.update_layout(
        height=300,
        width=900,
        margin=dict(l=20, r=20, t=20, b=20),
        showlegend=legend)

    sinewaves = np.zeros(len(time))
    for i in range(len(frequency)):
        sinewaves = amplitude[i] * np.sin(2 * np.pi * frequency[i] * (time) + theta[i]) + baseline
        fig.add_scatter(
        x=time,
        y=sinewaves,
        row=i+1, col=1,
        name=f'wave {i+1}')
    fig.show()

    return fig


In [3]:
def symbolic_functions(frequency, amplitude, theta, baseline=0, display_functions=True):
    '''The same as above but with sympy
    Returns:
        f: lambdify function
        f_dot: lambdify derivative of f
    '''
    
    x = sympy.symbols('x')
    wavelines = 0
    for i in range(len(frequency)):
        wavelines += amplitude[i] * sympy.sin(2 * sympy.pi * frequency[i] * x + theta[i]) + baseline
    f = wavelines / len(frequency)
    f_dot = sympy.diff(f, x)
    
    if display_functions:
        print("f(x) and f'(x) are:")
        display(f,f_dot)
    
    f = sympy.lambdify(x, f, 'numpy')
    f_dot = sympy.lambdify(x, f_dot, 'numpy')
    return f, f_dot


def stimuli_extra(f,f_dot,time):
    '''For plotly graphing of f(x), f'(x) and labels
    Returns:
        labels: 0 for cooling, 1 for heating
    '''
    fig = go.Figure(
    layout=dict(
        xaxis=dict(title='Time (min)'),
        yaxis=dict(title='Temperature (°C) \ RoC (°C/min)')))

    fig.update_layout(
    autosize=False,
    width=900,
    margin=dict(l=20, r=20, t=20, b=20),
    xaxis = dict(
        tickmode= 'linear',
        tick0 = 0,
        dtick = 1)
    )
    
    # 0 for cooling, 1 for heating
    labels = (f_dot(time)>0).astype(int)

    func = [f(time),f_dot(time),labels]
    func_names = "f(x)","f'(x)","Label"
    colors = 'royalblue', 'skyblue', 'springgreen'
    for idx, i in enumerate(func):
        fig.add_scatter(x=time, y=i, name=func_names[idx])
        fig.data[idx].line.color = colors[idx]

    fig.show()
    
    return labels, fig

def cooling_segments(labels):
    '''Displays the number and length of cooling segments'''
    change = np.concatenate([
        np.array([0]), # sign cannot change with first value
        np.diff(labels>0)*1], axis=0)

    change_idx = np.where(change==1)[0]

    match labels[0]: # label we started with (cooling or heating)
        # in case 0 the first change_idx starts the first heating segment, 
        # that is why we start with i=1::2 and not i=0::2
        # (we don't prepend / append any values from np.diff here)
        case 0:
            segments = {
                idx:np.diff(change_idx)[i::2] for idx, i in enumerate(list(range(2))[::-1])
                }
        case 1:
            segments = {
                i:np.diff(change_idx)[i::2] for i in list(range(2))
                }

    # in seconds; only 1 column because jagged arrays can appear
    display(pd.DataFrame(
        {"segments 0 [sec]":segments[0]/sample_rate*60}
        ).describe().applymap('{:,.2f}'.format))
    
    return change, change_idx, segments

In [4]:
# Parameters
sample_rate = 100
start_time = 0
end_time = 15

time = np.arange(
    start_time,
    end_time,
    1/sample_rate)

baseline = 32

frequency = np.array([0.3,1])
amplitude = [4]*len(frequency)
theta = [0*np.pi]*len(frequency) 
# for individual phases use theta = np.array([0,0])*np.pi

# Sine Waves


In [5]:
# Plot stimuli
wave, fig = summing_sine_waves(frequency, amplitude, theta, baseline=baseline)
fig.add_hline(y=baseline)
fig.show()

_ = multiple_sine_waves(frequency, amplitude, theta, baseline=baseline)

In [6]:
# Mathematical representations
f, f_dot = symbolic_functions(frequency, amplitude, theta, baseline=baseline, display_functions=True)

# Plot derivative & labels
labels, fig_extra = stimuli_extra(f, f_dot, time)

# Analyze cooling segments
_ = cooling_segments(labels)

f(x) and f'(x) are:


2*sin(0.6*pi*x) + 2*sin(2*pi*x) + 32

1.2*pi*cos(0.6*pi*x) + 4*pi*cos(2*pi*x)

Unnamed: 0,segments 0 [sec]
count,15.0
mean,30.0
std,3.71
min,25.2
25%,27.0
50%,30.0
75%,33.0
max,34.8


In [7]:
# TODO
# - prepend / append values from np.diff to get the first and last segment
# - add epsilon-function to categorize stimuli with a small slope as a third neutral category
#
# - unit test for max length of segments (should never be higer than 35 sec (?)), using nbdev
# - fig.show("png") when script is finalized

# Test: Square Wave from Sine Waves


In [8]:
frequency = np.array(range(1,20,2))
amplitude = 1/frequency
theta = [0*np.pi]*len(frequency)

wave, fig = summing_sine_waves(frequency, amplitude, theta)
fig.show()
_ = multiple_sine_waves(frequency, amplitude, theta)

from IPython.display import Audio
Audio(wave, rate=10000)
