In [1]:
import holoviews as hv
hv.extension('bokeh')
hv.opts.defaults(hv.opts.Curve(width=500), 
                 hv.opts.Image(width=500, colorbar=True, cmap='Viridis'))

In [2]:
import numpy as np
import scipy.signal
import scipy.fft
from IPython.display import Audio

# Sistemas para el procesamiento de señales

## Introducción

### Definición de sistema

Hasta ahora hemos realizado *análisis de señales*, es decir el estudio de las señales y sus propiedades en el dominio del tiempo y frecuencia

En esta unidad nos enfocaremos en el *procesamiento de señales*, es decir el diseño de **sistemas** que procesan una **señal de entrada** y producen una **señal de salida**

<img src="../images/system.png">

Usaremos

- $x[n]$ para denotar la señal (discreta) de entrada y $X[k]$ su espectro
- $y[n]$ para denotar la señal (discreta) de salida e $Y[k]$ su espectro
 


### Ejemplos de sistemas

Utilizando sistemas podemos modificar una señal para mejorar su calidad o remover efectos indeseados

- Un sistema para reducir el ruido de una señal de electroencefalograma (EEG)

<img src="../images/system-denoise-eeg.png" width="600">

- Un sistema para mejorar una imagen fuera de foco (sharpening)

<img src="../images/system-sharpen.jpg"  width="500">

- Un sistema para eliminar el eco de un audio

<img src="../images/system-echo.png"  width="500">


## Propiedades y ejemplos de sistemas


### Sistemas sin memoria

Diremos que un sistema $\Phi$ es un sistema **sin memoria** si 

$$
y[n] = \Phi(x[n]),
$$

es decir que la salida del sistema en un instante $n$ dado depende solo de la entrada en ese mismo instante

Veamos algunos ejemplos

**Sistema atenuador/amplificador ideal** 

$$
y[n] = A x[n], 
$$

donde $A>0$ se llama *ganancia*

Este sistema puede atenuar la entrada si $0<A<1$ y amplificar si $A>1$

**Sistema saturador (clamp)**

$$
y[n] = \begin{cases} B &x[n] > B \\x[n] & x[n] \in [-B, B]\\ -B & x[n] < -B\end{cases}
$$

Este sistema limita los valores de la señal de entrada en un rango fijo

**Sistema rectificador**

$$
y[n] = | x[n] |
$$

Este sistema eliminar la parte negativa de la señal de entrada

### Sistema Lineal

Diremos que un sistema $\Phi$ es lineal si cumple con las siguientes propiedades

**Homogeneidad**

Un cambio en la amplitud de la entrada produce un cambio equivalente en la salida

$$
\Phi(cx[n]) = c \Phi(x[n]) = c y[n]
$$

**Aditividad**

Señales que se suman en la entrada producen señales que se suman en la salida

$$
\Phi(x_1[n] + x_2[n]) = \Phi(x_1[n]) + \Phi(x_2[n]) = y_1[n] + y_2[n]
$$

Es decir que las señales pasan por el sistema sin interactuar entre ellas



**Otras propiedades de los sistemas lineales**

Producto de las propiedades anteriores se tiene que una cascada de sistemas lineales forma un sistema lineal equivalente y además la cascada de sistemas es **conmutativa**, es decir que el orden de los sistemas en la cascada no altera el resultado final

La siguiente figura ejemplifica esta propiedad

<img src="../images/system-conmu.png" width="400px">


Otra propiedad interesante de los sistemas lineales es que cumplen el **Principio de superposición:**

1. Si descomponemos una señal en $M$ componentes: $x[n] = x_1[n] + x_2[n] + \ldots +  x_M[n]$
1. Y aplicamos un **sistema lineal** a cada componente: $y_j[n] = \Phi(x_j[n])$
1. Podemos recuperar la salida total usando **aditividad**: $y_1[n] + y_2[n] + \ldots +  y_M[n] = y[n]$

La siguiente figura ejemplifica esta propiedad

<img src="../images/system-superpos.png" width="400px">
    


### Sistemas con memoria

Un sistema $\Phi$ es un sistema con memoria si su salida actual depende sólo de la entrada actual, las entradas anteriores o las salidas anteriores

$$
\begin{align}
y[n] = \Phi(x[n], & x[n-1], x[n-2], \ldots, x[0], \\ \nonumber
& y[n-1], y[n-2], \ldots, y[0]) \nonumber
\end{align}
$$

esto también se conoce como **sistema causal**

Un **sistema con memoria no-causal** usa entradas futuras (es decir $x[n+1]$, $x[n+2]$, ...) y por ende solo se puede implementar de forma offline, es decir una vez que sea ha observado toda la señal

A continuación veremos algunos ejemplos de sistemas con memoria causales

**Sistema con un retardo (delay)**

Definido como 

$$
y[n] = x[n-m],
$$

donde
- la salida depende solo de "una" entrada anterior
- el valor de m define que tan "antigua" es la entrada pasada

El *delay* o retarno no afecta la amplitud de los componentes frecuenciales de la señal pero si su fase, como muestra la siguiente figura

In [3]:
n = np.arange(0, 200, step=1)
x = lambda m: np.sin(2.0*np.pi*0.05*(n-m)) 
f = scipy.fft.rfftfreq(d=1, n=len(n))

p = []
for m in [0, 4, 8]:
    x_delayed = x(m)
    p.append(hv.Curve((n, x_delayed), 'Tiempo [s]', 'Señal'))
    X = scipy.fft.rfft(x_delayed)
    Xm = np.absolute(X) 
    p.append(hv.Curve((f, Xm), 'Frecuencia [Hz]', 'Espectro de amplitud'))
    Xp = np.angle(X)
    p.append(hv.Curve((f, Xm*Xp/np.amax(Xm)), 'Frecuencia [Hz]', 'Espectro de fase'))

In [4]:
hv.Layout(p).cols(3).opts(hv.opts.Curve(width=250, height=200))

**Sistema reverberador o eco**

Definido como

$$
y[n] = x[n] + A x[n-m],
$$

donde
- la salida depende de una entrada "pasada" y la entrada actual
- la ganancia controla si el "eco" se atenua o amplifica


Al contrario del sistema anterior, el eco si puede modificar el espectro de amplitud. 

Notemos el efecto de interferencia constructiva y destructiva al modificar el retardo, como muestra la siguiente animación 

In [5]:
n = np.arange(0, 200, step=1)
x = lambda m, A=1: A*np.sin(2.0*np.pi*0.05*(n-m)) 
f = scipy.fft.rfftfreq(d=1, n=len(n))

buffer = {}
for m in range(0, 40, 2):
    xm = x(m)
    buffer[m] = (xm, np.abs(scipy.fft.rfft(x(0) + xm)))    

In [6]:
hMap1 = hv.HoloMap(kdims='m')
hMap2 = hv.HoloMap(kdims='m')
hMap3 = hv.HoloMap(kdims='m')

for m, (xm, X) in buffer.items():     
    hMap1[m] = hv.Curve((n, xm), 'Tiempo', 'x', label='Ax[n-m]')    
    hMap2[m] = hv.Curve((n, x(0) + xm), 'Tiempo', 'y')
    hMap3[m] = hv.Curve((f, X), 'Frecuencia', 'Espectro')

p_clean = hv.Curve((n, x(0)), 'Tiempo', label='x[n]').opts(width=500, height=200)
plot = (p_clean * hMap1 + hMap2 + hMap3).cols(1).opts(hv.opts.Curve(height=200), 
                                               hv.opts.Overlay(legend_position='top_right'))

hv.output(plot, holomap='gif', fps=5)

El siguiente video muestra un experimento que ejemplifica la interferencia destructiva en una onda mecánica: https://www.youtube.com/watch?v=IU8xeJlJ0mk

**Sistemas con múltiples ecos**

Pueden combinarse más retardos para hacer un sistema reverberante más complejo

Por ejemplo el siguiente sistema

$$
y[n] = x[n] + A_1 x[n-m_1] + A_2 x[n-m_2] + A_3 x[n-m_3] + \ldots,
$$

que lo implementamos como

In [7]:
Fs = 44100; 
n = np.arange(0, 4, step=1.0/Fs) 
x = lambda m: np.sin(2.0*np.pi*880*(n-m))*np.exp(-(n-m)**2/0.5**2)*np.heaviside(n-m, 0)
y = x(0) + 0.5*x(1.) + 0.25*x(2.) + 0.125*x(3.)

Da como resultado gráfico lo siguiente:

In [8]:
hv.Curve((n, y), 'Tiempo [s]', 'Señal con eco')

y el resultado sonoro es:

In [9]:
Audio(y, rate=Fs, normalize=False)

## Sistema de respuesta finita al impulso (FIR)


Generalizando el ejemplo de sistema lineal reverberante a $L$ retardos llegamos a 

$$
\begin{align}
y[n] &= h[0] x[n] + h[1] x[n-1] + h[2] x[n-2] + \ldots + h[L] x[n-L] \nonumber \\
&= \sum_{j=0}^{L} h[j] x[n-j] \nonumber \\
&= (h* x)[n] \nonumber 
\end{align}
$$

que se puede modelar como una convolución discreta entre $h$ y $x$

Este sistema se conoce como

- Sistema FIR (finite impulse response)
- Sistema MA (moving average)
- Sistema todo-zeros 

y es de orden L (posee L+1 coeficientes)



**Intepretación como media movil (MA)**

El sistema FIR es equivalente a una media movil ponderada que se aplica sobre la entrada, donde los coeficientes del sistema son los ponderadores 

Por ejemplo sea un sistema de 3 coeficientes $h[0]=a$, $h[1]=b$ y $h[2]=c$

$$
\begin{align}
y[n] = (h*x)[n] &= \sum_{j=0}^{2} h[j] x[n-j] \nonumber \\
&= a x[n] + b x[n-1] + c x[n-2] \nonumber
\end{align}
$$

donde cada salida se calcula a partir de 

$$
\overbrace{x[0], x[1], x[2]}^{y[2]} , x[3], x[4], \ldots
$$
$$
x[0], \overbrace{x[1], x[2] , x[3]}^{y[3]}, x[4], \ldots
$$
$$
x[0], x[1], \overbrace{x[2] , x[3], x[4]}^{y[4]}, \ldots
$$

Un detalle es que para obtener el valor de $y[0]$ e $y[1]$ se deben establecer "condiciones de borde", como por ejemplo $x[-2] = x[-1]= 0$

A continuación veremos algunos ejemplos de aplicaciones usando filtros FIR sencillos

### Eliminando ruido blanco aditivo

Si tenemos una señal contaminada con ruido blanco aditivo como la que se muestra a continuación

In [10]:
np.random.seed(0)
n = np.arange(0, 100, step=1)
C = 5*np.exp(-0.5*(n[:, np.newaxis] - n[:, np.newaxis].T)**2/10**2)
# Señal limpia (lo que no conocemos)
x_clean = np.random.multivariate_normal(np.zeros_like(n), C) 
# Datos: Señal + ruido (lo que medimos)
x = x_clean + 2*np.random.randn(len(n))

hv.Scatter((n, x), 'Tiempo', 'Datos (señal + ruido)').opts(width=500, height=200)

Podemos usar un sistema FIR promediador para "suavizar la contaminación"

Sea un sistema FIR con $L$ coeficientes idénticos e iguales a $1/L$

In [11]:
L = 10 
h = np.ones(shape=(L,))/L
y = scipy.signal.convolve(x, h, mode='same', method='auto')

La siguiente gráfica interactiva muestra el resultado de convolucionar este sistema con los datos contaminados

In [12]:
hMap1 = hv.HoloMap(kdims='Instante')
hMap2 = hv.HoloMap(kdims='Instante')

for m in range(0, 100-L, 5): 
    c = np.zeros_like(n, dtype=np.float64); 
    c[m:m+L] = h
    hMap1[m] = hv.Curve((n, c), 'Tiempo', 'Filtro')    
    hMap2[m] = hv.Curve((n[:m], y[:m]), 'Tiempo', label='Señal filtrada')
    
p_clean = hv.Curve((n, x_clean), 'Tiempo', 'Señal', label='Señal limpia').opts(color='k', height=200)
(hMap1 + hMap2 * p_clean).cols(1).opts(hv.opts.Curve(height=200), 
                                       hv.opts.Overlay(legend_position='top_left'))

:::{note}

Este filtro promedia los datos vecinos resultando una versión suavizada de los mismos. Esta versión suavizada se aproxima a la "señal limpia" que está escondida en el ruido. 

:::

En general, mientras más "largo" sea el filtro mayor será el efecto de suavizado

### Encontrando cambios en una señal

Sea la siguiente señal escalonada

In [13]:
n = np.arange(0, 100, step=1)
x = np.zeros_like(n, dtype=np.float64)
x[20:] += 1.
x[40:] += 1.
x[80:] -= 1.

hv.Curve((n, x), 'Tiempo', 'Datos').opts(width=500, height=200)

Si nos interesa encontrar cambios en la señal podemos usar un sistema diferenciador

In [14]:
h = np.array([0.5, -0.5])
y = scipy.signal.convolve(x, h, mode='same', method='auto')

La siguiente gráfica interactiva muestra el resultado de convolucionar este sistema con la señal escalonada

In [15]:
hMap1 = hv.HoloMap(kdims='Instante')
hMap2 = hv.HoloMap(kdims='Instante')

for m in range(0, 100-len(h), 5): 
    c = np.zeros_like(n, dtype=np.float64); 
    c[m:m+len(h)] = h
    hMap1[m] = hv.Curve((n, c), 'Tiempo', 'Filtro')    
    hMap2[m] = hv.Curve((n[:m], y[:m]), 'Tiempo', 'Convolución')    

(hMap1 + hMap2).cols(1).opts(hv.opts.Curve(height=200))

:::{note}

Los pulsos en la convolución están asociados a un cambio (ascenso o descenso) en la señal original
            
:::

En un caso más general, este filtro nos da información de la velocidad con que cambia la señal

### Remover tendencias

En el siguiente ejemplo tenemos una señal sinusoidal "montada" sobre otra señal que cambia más lentamente.

In [16]:
np.random.seed(0); 
n = np.arange(0, 100, step=1)
C = np.exp(-0.5*(n[:, np.newaxis] - n[:, np.newaxis].T)**2/30**2)
x = np.sin(2.0*np.pi*0.1*n) + 5*np.random.multivariate_normal(np.zeros_like(n), C)

hv.Curve((n, x), 'Tiempo', 'Señal').opts(height=200)

En general nos referimos a la señal lenta como la "tendencia"

A continuación veremos un filtro para separar la parte rápida (sinusoide) de la parte lenta (tendencia) de este señal de ejemplo. En próximas lecciones veremos como diseñar este tipo de filtros

In [17]:
L = 5
h = -np.ones(shape=(L,))/L 
h[L//2] += 1

y = scipy.signal.convolve(x, h, mode='same', method='auto')

In [18]:
hMap1 = hv.HoloMap(kdims='Instante')
hMap2 = hv.HoloMap(kdims='Instante')

for m in range(0, 100-len(h), 5): 
    c = np.zeros_like(n, dtype=np.float64); 
    c[m:m+len(h)] = h
    hMap1[m] = hv.Curve((n, c), 'Tiempo', 'Filtro')    
    hMap2[m] = hv.Curve((n[0:m], y[0:m]), 'Tiempo', 'Convolución').opts(ylim=(-0.5, 0.5))
    
(hMap1 + hMap2).cols(1).opts(hv.opts.Curve(height=200))

## Convolución con scipy

Podemos convolucionar una señal en Python usando 

```python
scipy.signal.convolve(in1, # Señal de entrada
                      in2, # Coeficientes del sistema
                      mode='full', 
                      method='auto'  
                     )
```

donde el argumento `method` puede ser

- `direct`: Realiza la convolución en el dominio del tiempo
- `fft`: Realiza la convolución multiplicando los espectros
- `auto`: Se decide automaticamente en base al largo de las señales


y el argumento `mode` indica donde se hace la convolución

Para ejemplificar la influencia de este argumento consideremos una señal $x=[a,b,c]$ y un sistema $h=[d, e]$ 

- Si uso `mode=valid` el resultado será $y=[ad+be, bd+ce]$
- Si uso `mode=same` el resultado será $y=[ae, ad+be, bd+ce]$, es decir se agregan ceros al principio de $x$ tal que $y$ sea del mismo largo que $x$
- Si uso `mode=full` el resultado será $y=[ae, ad+be, bd+ce, cd]$, es decir se agregan ceros al principio y al final de $x$ 



### Eliminando ruido versión 2.0

Considerando la misma señal contaminada con ruido blanco que vimos anteriormente utilizaremos `scipy.signal.convolve` para convolucionar con un filtro que suavice el ruido.

Probaremos dos sistemas FIR

- coeficientes idénticos e iguales a $1/L$, es decir una ventana rectangular
- coeficientes que decaen suavemente a cero, como por ejemplo una ventana de Hamming

con distintos valores de $L$ (largo del filtro)

En este caso los valores los obtenemos usando la función de scipy

```python

scipy.signal.get_window(window, # String con el nombre de la ventana
                        Nx, # Entero con el largo de la ventana 
                        ...)
```

La ventana rectangular se llama `boxcar` mientras que la de Hamming se llama `hamming`. En la documentación de la función se puede revisar otras ventanas disponibles

In [23]:
# La señal de ejemplo de la sección 7.3.1
np.random.seed(0)
n = np.arange(0, 100, step=1)
C = 5*np.exp(-0.5*(n[:, np.newaxis] - n[:, np.newaxis].T)**2/10**2)
x_clean = np.random.multivariate_normal(np.zeros_like(n), C) 
x = x_clean + 2*np.random.randn(len(n))

# La filtramos con distintas funciones y L para luego visualizar el resultado
filters = {}
results = {}
for L in [5, 10, 20, 30, 40]:
    for filter_name in ['boxcar', 'hamming']:    
        h = scipy.signal.get_window(filter_name, L) 
        h = h/np.sum(h)
        if not L in filters:
            filters[L] = {}
            results[L] = {}
        filters[L][filter_name] = h
        results[L][filter_name] = scipy.signal.convolve(x, h, mode='same', method='auto')

In [24]:
hMap1 = hv.HoloMap(kdims='L')
hMap2 = hv.HoloMap(kdims='L')

for L, curves in filters.items():
    p = []
    for filter_name, h in curves.items():
        p.append(hv.Curve((range(L), h), 'Largo del filtro (L)', 'Filtro', label=filter_name))
    hMap1[L] = hv.Overlay(p)

for L, curves in results.items():
    p = []
    for filter_name, y in curves.items():
        p.append(hv.Curve((n, y), 'n', 'Señal', label=filter_name).opts(line_width=2, alpha=0.75))
    hMap2[L] = hv.Overlay(p)
    
p_clean = hv.Curve((n, x_clean), 'Tiempo', 'Señal', label='Señal limpia').opts(color='k')
p_data = hv.Scatter((n, x), 'Tiempo', 'Datos (señal + ruido)').opts(width=500, height=200)
(p_data + hMap1 + hMap2 * p_clean).cols(1).opts(hv.opts.Curve(height=250), 
                                       hv.opts.Overlay(legend_position='top_right'))

:::{note}

Mientras más larga es la ventana (L) mayor es el suavizado. Adicionalmente la ventana de Hamming produce un filtrado más suave que la rectangular

:::

En la lección siguiente veremos como diseñar un filtro, es decir como calcular los valores de `h` para resolver una tarea en particular