<a href="https://colab.research.google.com/github/frtrigg5/A-new-signature-model/blob/main/ModelConstruction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **PACKAGES REQUESTED**

The package signatory computes the signature (check https://pypi.org/project/signatory/ to see its documentation).

It needs an elder version of torch.

In [None]:
!pip uninstall torch -y
!pip install torch==1.7.1

After running the first cell you will need to restart the runtime.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from torch.utils.tensorboard import SummaryWriter

In [None]:
!pip install signatory==1.2.6.1.7.1
!pip install fbm

In [None]:
import signatory
import fbm
from scipy import optimize
device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# **MODEL DEFINITION**

New layers construction:

In [None]:

'''phi function: the ones needed for computing the tensor normalization'''

def phi(x,C,a):

'''  x: signature norm squared
  C: phi function parameter, it controls how strict the normalization is
  a: phi function parameter, tipically fixed to 1 in order to avoid too much hyperparameter'''

  if x>C:
    return C+(C**(1+a))*(C**(-a)-x**(-a))/a
  else:
    return x



'''dilatation function: it determines the normalization constant lambda (see graphical representation of the model, layer 6)'''

def dilatation(x,C,a,M,d):

  '''x: an array with dimension batch x signature
  C,a : phi function parameters
  M : truncation level (called L in the paper)
  d : dimension of the time series we are computing the signature of (!! if time augmentation is deployed, then the dimension is increased by 1)'''

  xNumpy=x.detach().numpy()

  coefficients=numpy.zeros((xNumpy.shape[0],(M+1)))
  normalizz=numpy.zeros((xNumpy.shape[0],1))
  for i in range(0,xNumpy.shape[0]):
      normQuad=1+numpy.sum(xNumpy[i]**2) #signature norm squared
      coefficients[i,0]=1-phi(normQuad,C,a)
      for j in range(1,(M+1)):
         coefficients[i,j]=numpy.sum(xNumpy[i,int(((d**j-1)/(d-1)-1)):int(((d**(j+1)-1)/(d-1)-1))]**2)
      def polin(input):
        xMonomials=numpy.zeros((M+1))+1
        for k in range(1,(M+1)):
            xMonomials[k]=input**(2*k)
        return numpy.dot(xMonomials,coefficients[i])
      normalizz[i]=optimize.brentq(polin,0,2)
      if normalizz[i]>1:
        normalizz[i]=1
  return torch.from_numpy(normalizz)


'''grad computation of dilatation function (corollary B.14 in the paper)
'''
class Normalization(torch.autograd.Function):

    @staticmethod
    def forward(ctx,input,C,M,d,exponents): #a=1, input should have dimension batch x length_sign, M is the truncation level, d the dimension of the timeseries, C normalization constant, exponents is useful to define correctly the dilatation
      '''input: an array batch x signature, it is x in the previous function
      C, M ,d: as in the previous function
      exponents: a vector as long as the signature, it has d times 1, d^2 times 2,..., d^M times the value M'''

      norm=dilatation(input,C,1,M,d)
      ctx.save_for_backward(input,exponents)
      ctx.C=C
      ctx.M=M
      ctx.d=d
      ctx.norm=norm #batchx1
      return norm.to(torch.float32)

    @staticmethod
    def backward(ctx, grad_output):
      '''grad_output: grad of upper layers'''

      input,exponents=ctx.saved_tensors
      normToSquarePower=(ctx.norm**(2*exponents)).to(torch.float64) #batch x length_sign
      normToSquarePowerMinus1=(ctx.norm**(2*exponents-1)).to(torch.float64) #batch x length_sign
      denominator=torch.sum((input**2)*(normToSquarePowerMinus1),1,keepdim=True) #batch x 1
      inputnorm=(1+torch.sum(input**2,1,keepdim=True))**(1/2) # batch x 1 ,norm of the pre-normalized signature

      #computing phi derivative based on the 2 branches of the phi function
      phiDerivative=torch.zeros(input.shape[0],1,dtype=torch.float64) #batch x 1
      #second branch
      index1=torch.where(inputnorm[:,0]>(ctx.C)**(1/2))[0]
      if len(index1)>0:
        phiDerivative[index1,:]=2*(ctx.C)**2/(inputnorm[index1,:]**3)
      #first branch
      index2=torch.where(inputnorm[:,0]<=(ctx.C)**(1/2))[0]
      if len(index2)>0:
        phiDerivative[index2,:]=2*inputnorm[index2,:]
      #Numerator
      Numerator=normToSquarePower-(phiDerivative/(2*inputnorm))*torch.ones(input.shape[0],int((ctx.d**(ctx.M+1)-1)/(ctx.d-1)-1)) #batch x length_sign
      Numerator=input*Numerator
      gradient=Numerator/denominator #batch x length_sign
      return grad_output*gradient, None, None, None, None


'Triangular: transformed a vector into a lower triangular matrix'
class Triangular(torch.nn.Module):
  def __init__(self,dim,L2,x_indices,y_indices):
    '''dim: dimension of the starting time series
    L2: number of new time instants
    x_indices, y_indices: explained in the model construction (below)'''

    super(Triangular,self).__init__()
    self.dim=dim
    self.L2=L2
    self.x_indices=x_indices
    self.y_indices=y_indices


  def forward(self,x): #x is of size batch x int((int(L2*(L2+1)/2)-int((L2-alp)*(L2-alp+1)/2))*(int(dim*(dim+1)/2)))
      A=torch.zeros((x.shape[0],self.L2*self.dim,self.L2*self.dim))
      A[:,torch.Tensor.long(self.x_indices),torch.Tensor.long(self.y_indices)]=x
      return A


'''Preparation with time augmentation: combines original time series and values sampled, then applies time augmentation'''

class PreparationWithTimeAugmentation(torch.nn.Module):
  def __init__(self,order,timesteps_cut,dim,extended_order):
    '''dim: as in the previous function
    timesteps_cut: number of time steps, sum of known an new time steps
    order, extended_order: explained in the model'''

    super(PreparationWithTimeAugmentation,self).__init__()
    self.order=order
    self.extended_order=extended_order
    self.cut=timesteps_cut
    self.d=dim

  def forward(self,x,y):
'''    x: starting time series
    y: new values sampled'''

      timesteps=x[:,:self.cut] # time instants: before known and then new ones
      values=x[:,self.cut:] #starting time series
      values=torch.cat((values,y),1) #concatenate values with the new values
      #reorder values
      values=values[:,self.extended_order.type(torch.LongTensor)]
      values=values.reshape([values.shape[0],self.cut,self.d])
      #adding time component
      timesteps=timesteps[:,self.order]
      Path=torch.cat((values,timesteps.unsqueeze(2)),2)
      return Path



The Model
IMPORTANT: IT IS BUILT IN A SPECIFIC WAY, SO IT NEEDS THAT THE INPUT IS STRUCTURED AS FOLLOWS:
- FIRSTLY, KNOWN TIMES INSTANTS
- SECONDLY, NEW TIMES INSTANTS
- LASTLY THE TIME SERIES VALUES WHERE IF THE TIME SERIES IS D- DIMENSIONAL, THEN THERE ARE THE D VALUES OF THE FIRST ELEMENT OF THE TIME SERIES, FOLLOWED BY THE D VALUES OF THE SECOND ELEMENT OF THE TIME SERIES AND SO ON

In [None]:
class MyModel(torch.nn.Module):

  def __init__(self,L1,L2,dim,order,extended_order,alp,level,number_classes,C,a,K):
'''
     L1 : number of known time instants, i.e. length of the time series
     L2 : number of new time instants
     order : in Preparationwithtimeaugmentation we concatenate the starting values and the sampled one, order reorganize them (FOR EXAMPLE L1=100, L2=99, THEN ORDER=[0,100,1,101,2,102,..] IF THE NEW TIME INSTANTS ARE THE MIDDLE POINTS)
     extended_order: as order but takes into account the dimension of the time series (FOR EXAMPLE D=2, L1=100,L2=99 SO STARTING TIME SERIES HAS 200 VALUES AND NEW VALUES ARE 198, SO EXTENDED ORDER=[0,1,200,201,2,3,202,203,...])
     ALP : how many subdiagonals of the lower triangular matrix are non zero, alp=L2 means no zero in the lower part
     level : signature truncation level (called M above)
     number_classes : number of labels in the classification problem
     C,a : phi function parameters
     K : numbers of augmented paths generated
     dim : dimension of the starting time series'''

     super(MyModel,self).__init__()
     self.K=K
     self.C=C
     self.a=a
     self.alp=alp
     self.L1=L1
     self.L2=L2
     self.dim=dim
     self.order=order
     self.extended_order=extended_order
     #compute how much elements in the lower triangular matrix
     self.MatrixEl=int((int(self.L2*(self.L2+1)/2)-int((self.L2-self.alp)*(self.L2-self.alp+1)/2))*(int(self.dim*(self.dim+1)/2)))
     self.level=level
     self.number_classes=number_classes
     #number of components of the signature (remember we have time augmentation)
     self.outputSigDim=int(((self.dim+1)**(self.level+1)-1)/(self.dim)-1)
     #bulding the exponents useful for normalization procedure
     self.exponents=torch.ones(int(((self.dim+1)**(self.level+1)-1)/self.dim-1)).to(torch.float64)
     for j in range(2,(self.level+1)):
         self.exponents[int((((self.dim+1)**j-1)/(self.dim)-1)):int((((self.dim+1)**(j+1)-1)/(self.dim)-1))]=torch.ones((int(self.dim+1)**j))*j

     #needed to reshape first layer output into a matrix (in triangular function)
     self.tril_indices=torch.tril_indices(row=(self.dim),col=(self.dim),offset=0)
     self.x_indices=torch.zeros(int(self.L2*(self.L2+1)/2)-int((self.L2-self.alp)*(self.L2-self.alp+1)/2),dtype=torch.int32)
     for i in range(self.alp):
        self.x_indices[i*self.L2-int(i*(i-1)/2):(i+1)*self.L2-int((i+1)*i/2)]=torch.arange(i,self.L2,1)

     self.y_indices=torch.zeros(int(self.L2*(self.L2+1)/2)-int((self.L2-self.alp)*(self.L2-self.alp+1)/2),dtype=torch.int32)
     for i in range(self.alp):
        self.y_indices[i*self.L2-int(i*(i-1)/2):(i+1)*self.L2-int((i+1)*i/2)]=torch.arange(0,self.L2-i,1)

     self.x_indicesFull=torch.zeros(int((int(self.L2*(self.L2+1)/2)-int((self.L2-self.alp)*(self.L2-self.alp+1)/2))*(int(self.dim*(self.dim+1)/2))),dtype=torch.int32)
     self.y_indicesFull=torch.zeros(int((int(self.L2*(self.L2+1)/2)-int((self.L2-self.alp)*(self.L2-self.alp+1)/2))*(int(self.dim*(self.dim+1)/2))),dtype=torch.int32)
     for j in range(self.x_indices.shape[0]):
       self.x_indicesFull[(j*int(self.dim*(self.dim+1)/2)):((j+1)*int(self.dim*(self.dim+1)/2))]=(self.x_indices[j]*self.dim)+self.tril_indices[0]
       self.y_indicesFull[(j*int(self.dim*(self.dim+1)/2)):((j+1)*int(self.dim*(self.dim+1)/2))]=(self.y_indices[j]*self.dim)+self.tril_indices[1]



     self.meanLayer= torch.nn.Linear((self.L1*(self.dim+1)+self.L2),(self.L2*self.dim))#(L1+L2+L1*d,L2*d)
     self.sqrtCovLayer=torch.nn.Linear((self.L1*(self.dim+1)+self.L2),self.MatrixEl)
     self.finaLayer1=torch.nn.Linear(self.outputSigDim,self.number_classes) #2 classi  #usando piÃƒÆ’Ã‚Â¹ layer poi conviene mettere delle funzioni di attivazione, altrimenti ÃƒÆ’Ã‚Â¨ inutile andare deep
     self.Normalization=()
     self.N=torch.distributions.Normal(0,1)
     self.Sig=signatory.Signature(self.level)
     self.LogSoftmax=torch.nn.LogSoftmax(1)

  def forward(self,x):
    self.mean=self.meanLayer(x)
    sqrtCov=self.sqrtCovLayer(x)
    self.sqrtCovMatrix=Triangular(self.dim,self.L2,self.x_indicesFull,self.y_indicesFull)(sqrtCov)
    self.epsilon=self.N.sample(torch.Size([x.shape[0],self.L2*self.dim,self.K]))
    self.NewValues=torch.bmm(self.sqrtCovMatrix,self.epsilon)+self.mean.unsqueeze(2)

    self.Signatures=torch.zeros(x.shape[0],self.outputSigDim,self.K)
    for i in range(self.K):
      Path=PreparationWithTimeAugmentation(self.order,self.L1+self.L2,self.dim,self.extended_order)(x,self.NewValues[:,:,i])
      Sig=self.Sig(Path)
      norm=Normalization.apply(Sig.to(torch.float64),self.C,self.level,self.dim+1,self.exponents)
      self.Signatures[:,:,i]=(Sig*(norm**self.exponents)).type(torch.float32)


    self.MeanSig=torch.mean(self.Signatures,2)
    output=self.finaLayer1(self.MeanSig)
    output=self.LogSoftmax(output)
    return output


Example of order and extended order construction

In [None]:
begin,end,number,division,dim=0,1,100,1,1
'''division=1 means we are taking the middle points as new time instants->L2=L1-1
number=L1
begin=first time steps
end=last known time steps'''

#generating the time steps
Known_times=torch.linspace(begin,end,number)
New_times=torch.zeros(division*(number-1))
for i in range(0,(number-1)):
  New_times[(division*i):(division*(i+1))]=torch.linspace(Known_times[i],Known_times[i+1],(division+2))[1:(1+division)]
#Length of known values and new values
L1=Known_times.shape[0]
L2=New_times.shape[0]

timesteps=torch.cat((Known_times,New_times),axis=0)
timesteps_sorted,order=torch.sort(timesteps)

extended_order=torch.zeros(dim*order.size(0))
for i in range(0,order.size(0)):
  extended_order[(i*dim):((i+1)*dim)]=torch.arange(order[i]*dim,(order[i]+1)*dim)