# Detrended Fluctuation Analysis

https://github.com/CSchoel/nolds/blob/master/nolds/measures.py

In [1]:
import numpy as np
import numba
import warnings
from entropy import utils

from nolds import dfa

warnings.simplefilter('ignore', np.RankWarning)

np.random.seed(1234567)
x = np.random.rand(1000)

In [2]:
def logarithmic_n(min_n, max_n, factor):
    """
    Creates a list of values by successively multiplying a minimum value min_n by
    a factor > 1 until a maximum value max_n is reached.
    Non-integer results are rounded down.
    Args:
    min_n (float):
      minimum value (must be < max_n)
    max_n (float):
      maximum value (must be > min_n)
    factor (float):
      factor used to increase min_n (must be > 1)
    Returns:
    list of integers:
      min_n, min_n * factor, min_n * factor^2, ... min_n * factor^i < max_n
      without duplicates
    """
    assert max_n > min_n
    assert factor > 1
    # stop condition: min * f^x = max
    # => f^x = max/min
    # => x = log(max/min) / log(f)
    max_i = int(np.floor(np.log(1.0 * max_n / min_n) / np.log(factor)))
    ns = [min_n]
    for i in range(max_i + 1):
        n = int(np.floor(min_n * (factor ** i)))
        if n > ns[-1]:
            ns.append(n)
    return ns


def dfa2(x, nvals=None, overlap=False):
    """
    NO NUMBA.
    """
    x = np.array(x)
    N = x.size
    
    if nvals is None:
        nvals = np.array(logarithmic_n(4, 0.1 * N, 1.2))
    
    # create the signal profile
    # (cumulative sum of deviations from the mean => "walk")
    walk = np.cumsum(x - x.mean())
    
    fluctuations = np.zeros(nvals.size)
    
    for i_n, n in enumerate(nvals):
        # subdivide data into chunks of size n
        if overlap:
            # step size n/2 instead of n
            d = np.array([walk[i:i + n] for i in range(0, len(walk) - n, n // 2)])
        else:
            # non-overlapping windows => we can simply do a reshape
            d = walk[:N - (N % n)]
            d = d.reshape((N // n, n))
        # calculate local trends as polynomes
        ran_n = np.arange(n, dtype=np.float64)
        ran_n_mean = ran_n.mean()
        d_len = d.shape[0]
        d_mean = d.mean(1)
        slope = np.empty(d_len)
        intercept = np.empty(d_len)
        trend = np.empty((d_len, ran_n.size))
        for i in range(d_len):
            slope[i] = utils._slope_lstsq(ran_n, d[i])
            intercept[i] = d_mean[i] - slope[i] * ran_n_mean
            trend[i, :] = np.polyval([slope[i], intercept[i]], ran_n)
            
        # calculate standard deviation ("fluctuation") of walks in d around trend
        flucs = np.sqrt(np.sum((d - trend) ** 2, axis=1) / n)
        # calculate mean fluctuation over all subsequences
        fluctuations[i_n] = flucs.sum() / flucs.size

    # filter zeros from fluctuations
    nonzero = np.nonzero(fluctuations)[0]
    fluctuations = fluctuations[nonzero]
    nvals = nvals[nonzero]

    if len(fluctuations) == 0:
        # all fluctuations are zero => we cannot fit a line
        slope = np.nan
    else:
        slope = utils._slope_lstsq(np.log(nvals), np.log(fluctuations))
    return slope

In [3]:
print(dfa(x, overlap=False))
print(dfa2(x, overlap=False))

0.5330608987017387
0.5330608987017413


In [4]:
print(dfa(x, overlap=True))
print(dfa2(x, overlap=True))

0.5347468094425519
0.5347468094425555


In [5]:
%timeit dfa(x, overlap=False)
%timeit dfa2(x)

86.2 ms ± 4.97 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
16 ms ± 677 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


## Numba

In [6]:
from math import floor, log

@numba.jit('i8[:](f8, f8, f8)', nopython=True)
def log_n(min_n, max_n, factor):
    max_i = int(floor(log(1.0 * max_n / min_n) / log(factor)))
    ns = [min_n]
    for i in range(max_i + 1):
        n = int(floor(min_n * (factor ** i)))
        if n > ns[-1]:
            ns.append(n)
    return np.array(ns, dtype=np.int64)

@numba.jit('f8(f8[:])', nopython=True)
def dfa3(x):
    N = len(x)
    nvals = log_n(4, 0.1 * N, 1.2)
    walk = np.cumsum(x - x.mean())
    fluctuations = np.zeros(len(nvals))
    
    for i_n, n in enumerate(nvals):
        d = np.reshape(walk[:N - (N % n)], (N // n, n))
        ran_n = np.array([float(na) for na in range(n)])
        ran_n_mean = ran_n.mean()
        d_len = len(d)
        slope = np.empty(d_len)
        intercept = np.empty(d_len)
        trend = np.empty((d_len, ran_n.size))
        for i in range(d_len):
            sl = utils._slope_lstsq(ran_n, d[i])
            di_mean = d[i].mean()
            inter = di_mean - sl * ran_n_mean
            slope[i] = sl
            intercept[i] = inter
            y = np.zeros_like(ran_n)
            # Equivalent to np.polyval function
            for p in [sl, inter]:
                y = y * ran_n + p
            trend[i, :] = y

        # calculate standard deviation ("fluctuation") of walks in d around trend
        flucs = np.sqrt(np.sum((d - trend) ** 2, axis=1) / n)
        # calculate mean fluctuation over all subsequences
        fluctuations[i_n] = flucs.sum() / flucs.size
      
    # Filter zero
    nonzero = np.nonzero(fluctuations)[0]
    fluctuations = fluctuations[nonzero]
    nvals = nvals[nonzero]

    if len(fluctuations) == 0:
        # all fluctuations are zero => we cannot fit a line
        dfa = np.nan
    else:
        dfa = utils._slope_lstsq(np.log(nvals), np.log(fluctuations))
    return dfa   

In [7]:
print(dfa(x, overlap=False))
print(dfa2(x))
print(dfa3(x))

0.5330608987017387
0.5330608987017413
0.5330608987017409


In [8]:
%timeit dfa(x, overlap=False)
%timeit dfa2(x)
%timeit dfa3(x)

92.8 ms ± 4.17 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
16.7 ms ± 1.04 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
883 µs ± 53.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [9]:
PURE_SINE = np.sin(2 * np.pi * 1 * np.arange(3000) / 100)

print(dfa(PURE_SINE, overlap=False))
print(dfa2(PURE_SINE))
print(dfa3(PURE_SINE))

1.615840712681188
1.6158407126811893
1.6158407126811898
