In [None]:
#| default_exp data.transforms
#| default_cls_lvl 3

In [None]:
#| export
from fastai.basics import *
from tsfast.data.core import TensorSequencesInput
from fastai.vision.augment import RandTransform

## 3. Transformations


### 3.1 Sequence Slicing Transformation

In [None]:
#| export 
class SeqSlice(Transform):
    '''Take a slice from an array-like object. Useful for e.g. shifting input and output'''
    def __init__(self, l_slc=None,r_slc=None):
        self.l_slc,self.r_slc = l_slc,r_slc
        
    def encodes(self, o): return o[self.l_slc:self.r_slc]

In [None]:
l_shift = SeqSlice(r_slc=-1)
arr = np.ones((5))
test_eq(l_shift(arr),arr[:-1])

### 3.2 Sequence Noise Injection Transformation

In [None]:
#| export
class SeqNoiseInjection(RandTransform):
    split_idx=0 #apply only to training data, if None it will be applied to all data
    '''Adds normal distributed noise to the tensor sequence with seperate mean and std for every signal'''
    def __init__(self, std=1e-1,mean=0.,p=1.0):
        super().__init__(p=p)
        self.std = tensor(std).type(torch.float)
        self.mean = tensor(mean).type(torch.float)
        
    def encodes(self, o:TensorSequencesInput): 
        if o.device != self.mean.device:
            self.std = self.std.to(o.device)
            self.mean = self.mean.to(o.device)
        #expand creates a view on a tensor and is therefore very fast compared to copy
        return o+torch.normal(mean=self.mean.expand_as(o), 
                              std=self.std.expand_as(o))

In [None]:
x = TensorSequencesInput(tensor([[1,1,1],[-1,-1,-1.0]]))
ns_mean = tensor([0.,10.1,3.1])
ns_std = tensor([1.,1.1,0.1])
x,x.shape

(TensorSequencesInput([[ 1.,  1.,  1.],
                       [-1., -1., -1.]]),
 torch.Size([2, 3]))

In [None]:
seq_noise = SeqNoiseInjection(std=ns_std,mean=ns_mean)
seq_noise(x)

TensorSequencesInput([[ 1.,  1.,  1.],
                      [-1., -1., -1.]])

In [None]:
seq_noise = SeqNoiseInjection(std=ns_std*10)
seq_noise(x)

TensorSequencesInput([[ 1.,  1.,  1.],
                      [-1., -1., -1.]])

In [None]:
#| export
class SeqNoiseInjection_Varying(RandTransform):
    split_idx=0
    '''Adds normal distributed noise to the tensor sequence with a normal distributed standard deviation for every application'''
    def __init__(self, std_std=0.1,p=1.0):
        super().__init__(p=p)
        self.std_std = tensor(std_std).type(torch.float)
        
    def encodes(self, o:TensorSequencesInput): 
        if o.device != self.std_std.device:
            self.std_std = self.std_std.to(o.device)
            
        #expand creates a view on a tensor and is therefore very fast compared to copy
        std = torch.normal(mean=0,std=self.std_std).abs()
        return o+torch.normal(mean=0,std=std.expand_as(o))

In [None]:
x = TensorSequencesInput(tensor([[0,0,0],[0,0,0]]))
ns_std = tensor([1.,1.1,0.1])
x,x.shape

(TensorSequencesInput([[0, 0, 0],
                       [0, 0, 0]]),
 torch.Size([2, 3]))

In [None]:
seq_noise = SeqNoiseInjection_Varying(std_std=ns_std)
seq_noise(x)

TensorSequencesInput([[0, 0, 0],
                      [0, 0, 0]])

In [None]:
#| export
class SeqNoiseInjection_Grouped(RandTransform):
    split_idx=0
    '''Adds normal distributed noise to the tensor sequence with a normal distributed standard deviation for every application, every group gert'''
    def __init__(self, std_std,std_idx,p=1.0):
        super().__init__(p=p)
        self.std_std = tensor(std_std).type(torch.float)
        self.std_idx = tensor(std_idx).type(torch.long)
        
    def encodes(self, o:TensorSequencesInput): 
        if o.device != self.std_std.device:
            self.std_std = self.std_std.to(o.device)
            
        #expand creates a view on a tensor and is therefore very fast compared to copy
        std = torch.normal(mean=0,std=self.std_std).abs()[self.std_idx]
        return o+torch.normal(mean=0,std=std.expand_as(o))

In [None]:
x = TensorSequencesInput(tensor([[0,0,0],[0,0,0]]))
ns_std = tensor([1.,1.1,0.1])
x,x.shape

(TensorSequencesInput([[0, 0, 0],
                       [0, 0, 0]]),
 torch.Size([2, 3]))

In [None]:
seq_noise = SeqNoiseInjection_Grouped(std_std=[3.,0],std_idx=[0,0,1])
seq_noise(x)

TensorSequencesInput([[0, 0, 0],
                      [0, 0, 0]])

### 3.2 Sequence Bias Injection Transformation

In [None]:
#| export
class SeqBiasInjection(RandTransform):
    split_idx=0
    '''Adds a normal distributed offset to the tensor sequence with seperate mean and std for every signal'''
    def __init__(self, std=1e-1,mean=0.,p=1.0):
        super().__init__(p=p)
        self.std = tensor(std).type(torch.float)
        self.mean = tensor(mean).type(torch.float)
        
    def encodes(self, o:TensorSequencesInput): 
        if o.device != self.mean.device:
            self.std = self.std.to(o.device)
            self.mean = self.mean.to(o.device)
        
        #expand creates a view on a tensor and is therefore very fast compared to copy
        mean=self.mean.repeat((o.shape[0],1,1)).expand((o.shape[0],1,o.shape[2]))
        std= self.std.repeat((o.shape[0],1,1)).expand((o.shape[0],1,o.shape[2]))
        n = torch.normal(mean=mean, std=std).expand_as(o)
        return o+n

In [None]:
x = TensorSequencesInput(tensor([[[1,1,1],[-1,-1,-1.0]]]))
ns_mean = tensor([0.,10.1,3.1])
ns_std = tensor([1.,1.1,0.1])
seq_bias = SeqBiasInjection(std=ns_std,mean=ns_std)
seq_bias(x)[...,0]

TensorSequencesInput([[ 1., -1.]])

In [None]:
seq_bias.mean

tensor([1.0000, 1.1000, 0.1000])

In [None]:
seq_bias = SeqBiasInjection(std=ns_std*10)
seq_bias(x)

TensorSequencesInput([[[ 1.,  1.,  1.],
                       [-1., -1., -1.]]])

### 3.3 Normalization
`Normalize` is programmed for `TensorImage` as an input tensor. It gets. At init the variable axes need to be chosen correspondingly to the shape of your tensor.

In [None]:
#| export
@Normalize
def encodes(self, x:TensorSequencesInput): 
    if x.device != self.mean.device:
        self.mean = self.mean.to(x.device)
        self.std = self.std.to(x.device)
    return (x-self.mean) / self.std

@Normalize
def decodes(self, x:TensorSequencesInput):
    if x.device != self.mean.device:
        self.mean = self.mean.to(x.device)
        self.std = self.std.to(x.device)
    return (x*self.std + self.mean)

In [None]:
norm = Normalize.from_stats(mean=ns_mean,std=ns_std,dim=1,ndim=2,cuda=False)
x,norm(x)

(TensorSequencesInput([[[ 1.,  1.,  1.],
                        [-1., -1., -1.]]]),
 TensorSequencesInput([[[  1.0000,  -8.2727, -21.0000],
                        [ -1.0000, -10.0909, -41.0000]]]))

In [None]:
#| include: false
import nbdev
nbdev.nbdev_export()