# Trawl processes simulations

In [1]:
from scipy.stats import levy_stable
import matplotlib.pyplot as plt
import numpy as np
np.random.seed(seed=1)

The matrix `F` has shape (u-dim, k-dim, timesteps)

The Hurst exponent $H$ equals:

In [68]:
class TrawlProcess:
    def __init__(self, alpha, gamma, N=1000, n=125, m=1):
        assert(alpha > 1 and alpha <2)
        assert(gamma > 0 and gamma <1)
        assert(alpha > gamma + 1)
        self.H = (alpha-gamma)/alpha
        self.alpha = alpha
        self.gamma = gamma
        
        self.M = m*N
        self.N = N
        self.n = n
        self.scaling = np.power((1./(self.N**(2+self.gamma)*(1+self.gamma))), 1./self.alpha)
    
    def sample_paths(self, num_paths=1, plot=True):
        
        F = self._calculate_f()
        S_scaled = np.power(self._calculate_scale(), 1./self.alpha)
        
        
        
        stable_dist = levy_stable(self.alpha, 0.)
        paths = np.zeros((num_paths, self.n + 1))
        for j in range(num_paths):
            draw = stable_dist.rvs(size=(self.M, self.M))
            samples = draw * S_scaled
            #samples = samples.reshape(-1, 1, self.M, self.M)
            time_samples =  samples * F
            time_samples = self.scaling*time_samples
            paths[j, 1:] = np.sum(time_samples, axis=(1,2))
        if plot:
            plt.style.use('seaborn-dark')
            fig, ax = plt.subplots(1,1, figsize=(6,6))
            ax.set_xlim(0,1)
            ax.grid()
            
            for path in range(paths.shape[0]):
                ax.plot(np.linspace(0,1, self.n+1), paths[path], 
                        label='path '+str(path))
                ax.legend()
            plt.title("H = " + str(np.around(self.H, decimals=3)))
            plt.show()
        return paths
        
    
    def _calculate_scale(self):
        S = np.zeros((self.M, self.M), dtype=np.float64)
        for J in range(self.M):
            if J != 0:
                S[:, J] = (J**(-1-self.gamma) - (J+1)**(-1-self.gamma))
        return S
        
    def _calculate_f(self):
        F = np.zeros((self.M, self.M, self.n), dtype=np.float32)
        for timestep in range(self.n):
            # (K, J)
            mask1 =np.triu(np.ones((self.M, self.M), dtype=np.bool)) # J >= K
    
            mask2 = np.zeros((self.M, self.M),  dtype=np.bool)
            mask2[0: timestep*(self.N//self.n), :] = True # K < timestep * (N/n)
            mask3 = np.triu(np.ones((self.M, self.M), dtype=np.bool), -timestep*(self.N//self.n)) # K - timestep (N/n) <= J
            f = np.zeros((self.N,self.N), dtype=np.float64)
    
            m1 = mask1*mask2
            f1 = np.zeros((self.M,self.M), dtype=np.float64)
            # Get a matrix A1 with column values
            A = np.array(range(self.M))
            A = A.reshape(1,-1)
            B = np.ones((self.M,1))
            A1 = (B @ A) # A1.T[k,j] = k
            f1 = A1.T * m1
    
            m2 = (~mask1) * mask2 # J < K and 0 <= k < timestep* N/n
            f2 = m2 * A1 
    
            m3 = mask1 * (~mask2)
            f3 = m3 * timestep*(self.N//self.n)
    
            m4 = (~mask1) * (~mask2) * mask3
            f4 = (timestep * (self.N//self.n) + A1 - A1.T) * m4
            f = f1 + f2 + f3 + f4
            #print(f.shape)
            F[:, :, timestep] = f
        
        F = F.reshape(self.n, self.M, self.M)
        return F
   

In [69]:
class TrawlProcess2:
    def __init__(self, alpha, gamma, N=400, n=100, m=3):
        assert(alpha > 1 and alpha <2)
        assert(gamma > 0 and gamma <1)
        assert(alpha > gamma + 1)
        self.H = (alpha-gamma)/alpha
        self.alpha = alpha
        self.gamma = gamma
        self.m = m
        self.M = m*N
        self.N = N
        self.n = n
    
    def sample_path(self, plot=True):
        
        F = self._calculate_f()
        print("F matrix calculated!")
        S_scaled = np.power(self._calculate_scale(), 1./self.alpha)
        print("S_scaled matrix calculated!")
        stable_dist = levy_stable(self.alpha, 0.)
        draw = stable_dist.rvs(size=(self.M, self.M))
        paths=[]
        for j in range(self.m):
            
            samples = draw[:(j+1)*self.M, :(j+1)*self.M] * S_scaled[:(j+1)*self.M, :(j+1)*self.M]
            #samples = samples.reshape(-1, 1, self.M, self.M)
            time_samples =  samples * F[:, :(j+1)*self.M, :(j+1)*self.M]
            path = np.zeros((self.n + 1))
            path[1:] = np.sum(time_samples, axis=(1,2))
            paths.append(path)
            
        if plot:
            plt.style.use('seaborn-dark')
            #fig, ax = plt.subplots(1,1, figsize=(8,8))
            plt.xlim(0,1)
            plt.grid()
            
            for j in range(self.m):
                plt.plot(np.linspace(0,1, self.n+1), paths[j], 
                        label='path '+str(j))
                plt.show()
            plt.legend()
            plt.title("H = " + str(np.around(self.H, decimals=3)))
            plt.show()
            
            
        #return paths
        
    
    def _calculate_scale(self):
        S = np.zeros((self.M, self.M), dtype=np.float64)
        for J in range(self.M):
            if J != 0:
                S[:, J] = (1./(self.N**(2+self.gamma)*(1+self.gamma))) * (J**(-1-self.gamma) - (J+1)**(-1-self.gamma))
        return S
        
    def _calculate_f(self):
        F = np.zeros((self.M, self.M, self.n), dtype=np.float32)
        for timestep in range(self.n):
            # (K, J)
            mask1 =np.triu(np.ones((self.M, self.M), dtype=np.bool)) # J >= K
    
            mask2 = np.zeros((self.M, self.M),  dtype=np.bool)
            mask2[0: timestep*(self.N//self.n), :] = True # K < timestep * (N/n)
            mask3 = np.triu(np.ones((self.M, self.M), dtype=np.bool), -timestep*(self.N//self.n)) # K - timestep (N/n) <= J
            f = np.zeros((self.N,self.N), dtype=np.float64)
    
            m1 = mask1*mask2
            f1 = np.zeros((self.M,self.M), dtype=np.float64)
            # Get a matrix A1 with column values
            A = np.array(range(self.M))
            A = A.reshape(1,-1)
            B = np.ones((self.M,1))
            A1 = (B @ A) # A1.T[k,j] = k
            f1 = A1.T * m1
    
            m2 = (~mask1) * mask2 # J < K and 0 <= k < timestep* N/n
            f2 = m2 * A1 
    
            m3 = mask1 * (~mask2)
            f3 = m3 * timestep*(self.N//self.n)
    
            m4 = (~mask1) * (~mask2) * mask3
            f4 = (timestep * (self.N//self.n) + A1 - A1.T) * m4
            f = f1 + f2 + f3 + f4
            #print(f.shape)
            F[:, :, timestep] = f
        
        F = F.reshape(self.n, self.M, self.M)
        return F
   

In [70]:
trawl.scaling

0.00020344680185057536

In [None]:
trawl = TrawlProcess(1.9, 0.3)
#print(trawl.H)
S = trawl._calculate_scale()
paths = trawl.sample_paths(num_paths=3)
    

In [62]:
S.mean()

0.001999380032404981