# De-convolution

In [40]:
# %load '/Users/ivaylopetkantchin/Desktop/pythonImport.py'
import pandas as pd
import os 
import sys
import numpy as np
import matplotlib.pyplot as plt
plt.style.use('classic')
%matplotlib inline
import tifffile as tiff
%load_ext autoreload
%autoreload 2
# Plotly for graphs
import plotly as py
from plotly.offline import init_notebook_mode, iplot
import plotly.graph_objs as go
init_notebook_mode() # run at the start of every notebook

import warnings
warnings.filterwarnings("ignore")
import colorlover as cl

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Building signals

### 1-Parameters

In [2]:
from scipy.signal import gaussian

In [3]:
sigLen = 1800
psf_sigma = 7.0
noiseVal = 2.0
# Signal
f_true = np.zeros(sigLen)
peaks = {135:100,180:150,210:120,600:170,1350:90,900:140,1005:98,1410:130}
for peakPosi, peakValue in peaks.items():
    f_true[peakPosi] = peakValue

# Real signal
psf = gaussian(sigLen, psf_sigma)
y = np.convolve(f_true, psf, mode='same') + noiseVal*np.random.randn(sigLen)
y[y<0] = 0.0
x = list(range(sigLen))

tr = [
    go.Scatter(
        x=x,
        y=y,
        name='Signal'
    )
    ,
    go.Scatter(
        x=x,
        y=f_true,
        name='Target',
        marker=dict(color='green')
    )
]
iplot(tr)

In [4]:
sigLen = 600
psf_sigma = 7.0
noiseVal = 3
# Signal
f_true = np.zeros(sigLen)
peaks = {45:100,60:150,73:120,200:170,450:90,300:140,335:98,470:130}
for peakPosi, peakValue in peaks.items():
    f_true[peakPosi] = peakValue

# Real signal
psf = gaussian(sigLen, psf_sigma)
y = np.convolve(f_true, psf, mode='same') + noiseVal*np.random.randn(sigLen)
y[y<0] = 0.0

psf2 = gaussian(sigLen, 3.0)
y2 = np.convolve(f_true, psf2, mode='same') + noiseVal*np.random.randn(sigLen)
y2[y2<0] = 0.0

x = list(range(sigLen))

tr = [
    go.Scatter(
        x=x,
        y=y2,
        name='Old signal'
    )
    ,
    go.Scatter(
        x=x,
        y=y,
        name='New signal',
        marker=dict(color='black')
    )
    ,
    go.Scatter(
        x=x,
        y=f_true,
        name='Target',
        marker=dict(color='green')
    )
]

iplot(tr)

### 2 - De-convolution algorithm:
Starting from the signal and a given psf h

In [63]:
sigLen = 600
stdTr = 12.0
iterNb = 500

h = gaussian(sigLen, stdTr)
h = h/np.sum(h)
# Intialization
f = y
err = []
# Iterations
corrList = []
ratioList = []
flist = []
for i in range(iterNb):
    r = np.convolve(f,h, mode='same')
    corr = np.convolve(h, y/r, mode='same')
    f = f*corr
    if(i%(iterNb/10)==0):
        #         corrList.append(corr)
        #         ratioList.append(y/r)
        flist.append(f)
    err.append(np.sum((y - np.convolve(f,h, mode='same'))**2))

In [64]:
iplot([go.Scatter(x=list(range(sigLen)), y=np.sqrt(err))])

In [65]:
# colors = cl.scales['10']['div']['YlOrBr']
colors = cl.scales['10']['div']['RdYlBu'] 
tr1 = [go.Scatter(x=list(range(sigLen)),y=y,name='y',line=dict(width=5,color='black')),go.Scatter(x=list(range(sigLen)),y=f_true,name='Target',line=dict(width=2,color='green'))]
tr2 = [go.Scatter(x=list(range(sigLen)),y=cor,name='f, Iter nb: {}'.format(int(i*(iterNb/10))), line=dict(color=colo)) for i,(cor,colo) in enumerate(zip(flist,colors))]
tr = tr1+tr2
iplot(tr)

In [44]:
ryb = cl.scales['10']['div']['RdYlBu'] 

['rgb(165,0,38)',
 'rgb(215,48,39)',
 'rgb(244,109,67)',
 'rgb(253,174,97)',
 'rgb(254,224,144)',
 'rgb(224,243,248)',
 'rgb(171,217,233)',
 'rgb(116,173,209)',
 'rgb(69,117,180)',
 'rgb(49,54,149)']

In [34]:
tr=[go.Scatter(x=list(range(sigLen)),y=cor,name='Iter nb: {}'.format(int(i*(iterNb/10)))) for i,cor in enumerate(ratioList)]
iplot(tr)
tr=[go.Scatter(x=list(range(sigLen)),y=cor,name='Iter nb: {}'.format(int(i*(iterNb/10)))) for i,cor in enumerate(corrList)]
iplot(tr)
tr=[go.Scatter(x=list(range(sigLen)),y=y,name='y')] + [go.Scatter(x=list(range(sigLen)),y=cor,name='f, Iter nb: {}'.format(int(i*(iterNb/10)))) for i,cor in enumerate(flist)]
iplot(tr)

In [5]:
def deconvAlg(y, stdTr, iterNb=500):
    '''
    Performs an iterative de-convolution algorithm with the std of the psf known.
    ----
    Inputs:
    y = signal to deconvolve, numpy.array
    stdTr = std of the gaussian psf, float
    iterNb = Number of iteration of the algorithm, int
    ----
    Outputs:
    f = deconvolved signal, numpy.array
    err = list of errors at every step of the algorithm, list
    '''
    # Setting the psf
    h = gaussian(sigLen, stdTr)
    h = h/np.sum(h)
    # Intialization
    f = y
    err = []
    # Iterations
    for i in range(iterNb):
        r = np.convolve(f,h, mode='same')
        corr = np.convolve(h, y/r, mode='same')
        corr = corr
        f = f*corr
        err.append(np.sum((y - np.convolve(f,h, mode='same'))**2))
    return f, err

def blindDeconv(y, stdRange, iterNb):
    '''
    Performs blind de-convolution for a 1D signal y.
    It will look for the optimal psf std in a given range.
    ----
    Inputs:
    y = original sigal, numpy.array,
    stdRange = range of std values for th blind de-convolution, list like
    iterNb = Number of iterations for the de-convolution algorithm, int
    ----
    Outputs:
    f = de-convolved signals, numpy.array
    std_opt = Optimal std found for the de-convolution, float
    '''
    # Table that stores the different final errors for each std in stdRange
    finalErrList = []
    for std in stdRange:
        f, err = deconvAlg(y, std, iterNb=iterNb)
        finalErrList.append([std, err[-1], f])

    finalErrList = pd.DataFrame(finalErrList, columns=['Std', 'finalErr', 'Results'])
    # Here we use the second derivative (acce) technique to get the optimal std
    deriv = finalErrList.finalErr.values[1:] - finalErrList.finalErr.values[:-1]
    acce = deriv[1:] - deriv[:-1]
    acceArgMax = np.argmax(acce)
    std_acce = finalErrList.Std.values[:-2]
    # We fit a polynom to get a better precision on the std
    top3acce_y = acce[acceArgMax-1:acceArgMax+2]
    top3acce_x = std_acce[acceArgMax-1:acceArgMax+2]
    polynomCoeffs = np.polyfit(top3acce_x, top3acce_y, 2)
    std_opt = -polynomCoeffs[1]/(2*polynomCoeffs[0])
    # Finnaly, we redo a deconvolution with the optimal std found
    f, err = deconvAlg(y, std_opt, iterNb=iterNb)
    
    return f, std_opt
    
def windowNormalizer(y, f, window=100):
    '''
    Normalize result f according to y signal after dividing it in smaller windowed signals.
    ----
    Inputs:
    y = original sigal, numpy.array
    f = de-convoluted signal, numpy.array
    ----
    Outputs:
    f = amplitude corrected signal, numpy.array
    '''
    nbOfWindows = int(1.0*len(f)/window)
    for i in range(nbOfWindows):
        maxOnWindow = np.max(f[i*window:min((i+1)*window, len(f))])
        maxOnY = np.max(y[i*window:min((i+1)*window, len(y))])
        f[i*window:min((i+1)*window, len(f))] *= (maxOnY/maxOnWindow)
    
    return f

In [6]:
stdTry = 7.0
iterNb = 500

f, err = deconvAlg(y, stdTry, iterNb=2000)

In [7]:
# visu error
tr = [
    go.Scatter(
        x=list(range(len(err)))
        ,
        y=err
    )
]
lay = go.Layout(
    title='MSE',
    xaxis=dict(title='Algo iteration'),
    yaxis=dict(title='MSE')
)

tr2 = [
    go.Scatter(
        x=x,
        y=y,
        name='Signal'
    )
    ,
    go.Scatter(
        x=x,
        y=f_true,
        name='Target'
    )
    ,
    go.Scatter(
        x=x,
        #         y=windowNormalizer(y,f),
        y=f,
        name='Result: {} iterations'.format(iterNb)
    )  
]
lay2 = go.Layout(
    title='All signals',
    xaxis=dict(title='Time'),
    yaxis=dict(title='Value')
)

fig = go.Figure(data=tr, layout=lay)
fig2 = go.Figure(data=tr2, layout=lay2)
iplot(fig)
iplot(fig2)

## 3 - De-convulution: Not knowing the PSF
We try the algorithm for different PSF functions and see the errors

In [8]:
stdRange = np.linspace(4.0,9.0,20)
finalErrList = []

for std in stdRange:
    f, err = deconvAlg(y, std, iterNb=500)
    finalErrList.append([std, err[-1], f])

finalErrList = pd.DataFrame(finalErrList, columns=['Std', 'finalErr', 'Results'])

deriv = finalErrList.finalErr.values[1:] - finalErrList.finalErr.values[:-1]
acce = deriv[1:] - deriv[:-1]

### Fiting a 2-D polynom for better maximum estimation
Using numpy.polyfit we obtain the polynom:

$$
poly = polynomCoeffs[0]*x^2 + polynomCoeffs[1]*x + polynomCoeffs[2]
$$

and the maximum is where the first derivative is null:

$$
std_{max} = -polynomCoeffs[1]\div(2*polynomCoeffs[0])
$$

In [9]:
acceArgMax = np.argmax(acce)
std_acce = finalErrList.Std.values[:-2]

top3acce_y = acce[acceArgMax-1:acceArgMax+2]
top3acce_x = std_acce[acceArgMax-1:acceArgMax+2]

polynomCoeffs = np.polyfit(top3acce_x, top3acce_y, 2)
std_max = -polynomCoeffs[1]/(2*polynomCoeffs[0])
print('Solution std: {}'.format(std_max))

Solution std: 7.009925331852017


### Re-doing a de-convolution for $std_{max}$

In [10]:
f, err = deconvAlg(y, std_max, iterNb=2000)

In [11]:
tr = [
    go.Scatter(
        x=finalErrList.Std,
        y=finalErrList.finalErr,
        yaxis='y',
        name='Errors'
    )
    ,
    go.Scatter(
        x=finalErrList.Std.values[:-2],
        y=acce,
        yaxis='y2',
        name='Second derivative'
    )
    ,
    go.Scatter(
        x=[std_max,std_max],
        y=[np.min(acce),np.max(acce)+10],
        yaxis='y2',
        mode='lines',
        name='Polyfit maximum'
    )
]

lay = go.Layout(
    title='Error distribution with std',
    xaxis=dict(title='std'),
    yaxis=dict(#title='final err',
                autorange=True,
                showgrid=False,
                zeroline=False,
                showline=False,
                autotick=True,
                ticks='',
                showticklabels=False
              ),
    yaxis2=dict(
        autorange=True,
        showgrid=False,
        zeroline=False,
        showline=False,
        autotick=True,
        ticks='',
        showticklabels=False,
        #         title='Acceleration',
            #         titlefont=dict(
            #             color='rgb(148, 103, 189)'
            #         ),
            #         tickfont=dict(
            #             color='rgb(148, 103, 189)'
            #         ),
                    overlaying='y',
            #         side='right'
    )
)

tr2 = [
    go.Scatter(
        x=x,
        y=y,
        name='Signal'
    )
    ,
    go.Scatter(
        x=x,
        y=windowNormalizer(y,f),
        name='Result: {} iterations'.format(iterNb)
    )
    ,
    go.Scatter(
        x=x,
        y=f_true,
        name='Target'
    )
]
lay2 = go.Layout(
    title='All signals: std_pred={}, std_true={}'.format((int(std_max*100.)/100.),psf_sigma),
    xaxis=dict(title='Time'),
    yaxis=dict(title='Value')
)

fig = go.Figure(data=tr, layout=lay)
fig2 = go.Figure(data=tr2, layout=lay2)
iplot(fig)
iplot(fig2)