## PUC-Rio 
### Departamento de Engenharia Elétrica 


### Aula prática - Echo State Network

Forma mais prática (didática) para a implementação do Echo State Network

In [1]:
import torch
import torch.nn as nn

class ESN(nn.Module):
    def __init__(self, input_size, reservoir_size, output_size):
        super(ESN, self).__init__()
        self.input_size = input_size
        self.reservoir_size = reservoir_size
        self.output_size = output_size
        
        # Reservoir parameters
        self.reservoir = nn.Linear(reservoir_size, reservoir_size, bias=False)
        self.activation = nn.Tanh()
        self.initial_state = nn.Parameter(torch.randn(1, reservoir_size), requires_grad=False)
        
        # Readout parameters
        self.readout = nn.Linear(reservoir_size + input_size, output_size)
        
    def forward(self, x):
        batch_size = x.size(0)
        seq_length = x.size(1)
        
        # Initialize reservoir state
        reservoir_state = self.initial_state.expand(batch_size, -1)
        
        # Iterate through time steps
        for t in range(seq_length):
            # Update reservoir state
            input_data = torch.cat((x[:, t], reservoir_state), dim=1)
            reservoir_state = self.activation(self.reservoir(input_data))
        
        # Concatenate reservoir state and input
        input_data = torch.cat((x[:, -1], reservoir_state), dim=1)
        
        # Compute output
        output = self.readout(input_data)
        
        return output


Implementação do Echo State Network baseado em https://github.com/stefanonardo/pytorch-esn

Esta será a versão que usaremos no hands-on, já que possui diversas técnicas e hiperparâmetros utilizados. :-) 

Lembrando que em um Echo State Network, nós temos que assegurar alguns critérios:

- Os neurônios do reservatório devem ser esparsamente conectados. 

- Deve ser satisfeita a propriedade de Echo State. 

In [2]:
"""
This examples is not intended to be optimized. Its purpose is to show how to handle
big datasets with multiple sequences. The accuracy should be around 10%.
"""

import re
import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.nn.utils.rnn import PackedSequence
import torch.sparse


def apply_permutation(tensor, permutation, dim=1):
    # type: (Tensor, Tensor, int) -> Tensor
    return tensor.index_select(dim, permutation)


class Reservoir(nn.Module):

    def __init__(self, mode, input_size, hidden_size, num_layers, leaking_rate,
                 spectral_radius, w_ih_scale,
                 density, bias=True, batch_first=False):
        super(Reservoir, self).__init__()
        self.mode = mode
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.leaking_rate = leaking_rate
        self.spectral_radius = spectral_radius
        self.w_ih_scale = w_ih_scale
        self.density = density
        self.bias = bias
        self.batch_first = batch_first

        self._all_weights = []
        for layer in range(num_layers):
            layer_input_size = input_size if layer == 0 else hidden_size

            w_ih = nn.Parameter(torch.Tensor(hidden_size, layer_input_size))
            w_hh = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
            b_ih = nn.Parameter(torch.Tensor(hidden_size))
            layer_params = (w_ih, w_hh, b_ih)

            param_names = ['weight_ih_l{}{}', 'weight_hh_l{}{}']
            if bias:
                param_names += ['bias_ih_l{}{}']
            param_names = [x.format(layer, '') for x in param_names]

            for name, param in zip(param_names, layer_params):
                setattr(self, name, param)
            self._all_weights.append(param_names)

        self.reset_parameters()

    def _apply(self, fn):
        ret = super(Reservoir, self)._apply(fn)
        return ret

    def reset_parameters(self):
        weight_dict = self.state_dict()
        for key, value in weight_dict.items():
            if key == 'weight_ih_l0':
                nn.init.uniform_(value, -1, 1)
                value *= self.w_ih_scale[1:]
            elif re.fullmatch('weight_ih_l[^0]*', key):
                nn.init.uniform_(value, -1, 1)
            elif re.fullmatch('bias_ih_l[0-9]*', key):
                nn.init.uniform_(value, -1, 1)
                value *= self.w_ih_scale[0]
            elif re.fullmatch('weight_hh_l[0-9]*', key):
                w_hh = torch.Tensor(self.hidden_size * self.hidden_size)
                w_hh.uniform_(-1, 1)
                if self.density < 1:
                    zero_weights = torch.randperm(
                        int(self.hidden_size * self.hidden_size))
                    zero_weights = zero_weights[
                                   :int(
                                       self.hidden_size * self.hidden_size * (
                                                   1 - self.density))]
                    w_hh[zero_weights] = 0
                w_hh = w_hh.view(self.hidden_size, self.hidden_size)
                abs_eigs = torch.abs(torch.linalg.eigvals(w_hh))
                weight_dict[key] = w_hh * (self.spectral_radius / torch.max(abs_eigs))

        self.load_state_dict(weight_dict)

    def check_input(self, input, batch_sizes):
        # type: (Tensor, Optional[Tensor]) -> None
        expected_input_dim = 2 if batch_sizes is not None else 3
        if input.dim() != expected_input_dim:
            raise RuntimeError(
                'input must have {} dimensions, got {}'.format(
                    expected_input_dim, input.dim()))
        if self.input_size != input.size(-1):
            raise RuntimeError(
                'input.size(-1) must be equal to input_size. Expected {}, got {}'.format(
                    self.input_size, input.size(-1)))

    def get_expected_hidden_size(self, input, batch_sizes):
        # type: (Tensor, Optional[Tensor]) -> Tuple[int, int, int]
        if batch_sizes is not None:
            mini_batch = batch_sizes[0]
            mini_batch = int(mini_batch)
        else:
            mini_batch = input.size(0) if self.batch_first else input.size(1)
        expected_hidden_size = (self.num_layers, mini_batch, self.hidden_size)
        return expected_hidden_size

    def check_hidden_size(self, hx, expected_hidden_size, msg='Expected hidden size {}, got {}'):
        # type: (Tensor, Tuple[int, int, int], str) -> None
        if hx.size() != expected_hidden_size:
            raise RuntimeError(msg.format(expected_hidden_size, tuple(hx.size())))

    def check_forward_args(self, input, hidden, batch_sizes):
        # type: (Tensor, Tensor, Optional[Tensor]) -> None
        self.check_input(input, batch_sizes)
        expected_hidden_size = self.get_expected_hidden_size(input, batch_sizes)

        self.check_hidden_size(hidden, expected_hidden_size)

    def permute_hidden(self, hx, permutation):
        # type: (Tensor, Optional[Tensor]) -> Tensor
        if permutation is None:
            return hx
        return apply_permutation(hx, permutation)

    def forward(self, input, hx=None):
        is_packed = isinstance(input, PackedSequence)
        if is_packed:
            input, batch_sizes, sorted_indices, unsorted_indices = input
            max_batch_size = int(batch_sizes[0])
        else:
            batch_sizes = None
            max_batch_size = input.size(0) if self.batch_first else input.size(1)
            sorted_indices = None
            unsorted_indices = None

        if hx is None:
            hx = input.new_zeros(self.num_layers, max_batch_size,
                                 self.hidden_size, requires_grad=False)
        else:
            # Each batch of the hidden state should match the input sequence that
            # the user believes he/she is passing in.
            hx = self.permute_hidden(hx, sorted_indices)

        flat_weight = None

        self.check_forward_args(input, hx, batch_sizes)
        func = AutogradReservoir(
            self.mode,
            self.input_size,
            self.hidden_size,
            num_layers=self.num_layers,
            batch_first=self.batch_first,
            train=self.training,
            variable_length=is_packed,
            flat_weight=flat_weight,
            leaking_rate=self.leaking_rate
        )
        output, hidden = func(input, self.all_weights, hx, batch_sizes)
        if is_packed:
            output = PackedSequence(output, batch_sizes, sorted_indices, unsorted_indices)
        return output, self.permute_hidden(hidden, unsorted_indices)

    def extra_repr(self):
        s = '({input_size}, {hidden_size}'
        if self.num_layers != 1:
            s += ', num_layers={num_layers}'
        if self.bias is not True:
            s += ', bias={bias}'
        if self.batch_first is not False:
            s += ', batch_first={batch_first}'
        s += ')'
        return s.format(**self.__dict__)

    def __setstate__(self, d):
        super(Reservoir, self).__setstate__(d)
        self.__dict__.setdefault('_data_ptrs', [])
        if 'all_weights' in d:
            self._all_weights = d['all_weights']
        if isinstance(self._all_weights[0][0], str):
            return
        num_layers = self.num_layers
        self._all_weights = []
        for layer in range(num_layers):
            weights = ['weight_ih_l{}{}', 'weight_hh_l{}{}', 'bias_ih_l{}{}']
            weights = [x.format(layer) for x in weights]
            if self.bias:
                self._all_weights += [weights]
            else:
                self._all_weights += [weights[:2]]

    @property
    def all_weights(self):
        return [[getattr(self, weight) for weight in weights] for weights in
                self._all_weights]


def AutogradReservoir(mode, input_size, hidden_size, num_layers=1,
                      batch_first=False, train=True,
                      batch_sizes=None, variable_length=False, flat_weight=None,
                      leaking_rate=1):
    if mode == 'RES_TANH':
        cell = ResTanhCell
    elif mode == 'RES_RELU':
        cell = ResReLUCell
    elif mode == 'RES_ID':
        cell = ResIdCell

    if variable_length:
        layer = (VariableRecurrent(cell, leaking_rate),)
    else:
        layer = (Recurrent(cell, leaking_rate),)

    func = StackedRNN(layer,
                      num_layers,
                      False,
                      train=train)

    def forward(input, weight, hidden, batch_sizes):
        if batch_first and batch_sizes is None:
            input = input.transpose(0, 1)

        nexth, output = func(input, hidden, weight, batch_sizes)

        if batch_first and not variable_length:
            output = output.transpose(0, 1)

        return output, nexth

    return forward


def Recurrent(inner, leaking_rate):
    def forward(input, hidden, weight, batch_sizes):
        output = []
        steps = range(input.size(0))
        for i in steps:
            hidden = inner(input[i], hidden, leaking_rate, *weight)
            # hack to handle LSTM
            output.append(hidden[0] if isinstance(hidden, tuple) else hidden)

        output = torch.cat(output, 0).view(input.size(0), *output[0].size())

        return hidden, output

    return forward


def VariableRecurrent(inner, leaking_rate):
    def forward(input, hidden, weight, batch_sizes):
        output = []
        input_offset = 0
        last_batch_size = batch_sizes[0]
        hiddens = []
        flat_hidden = not isinstance(hidden, tuple)
        if flat_hidden:
            hidden = (hidden,)
        for batch_size in batch_sizes:
            step_input = input[input_offset:input_offset + batch_size]
            input_offset += batch_size

            dec = last_batch_size - batch_size
            if dec > 0:
                hiddens.append(tuple(h[-dec:] for h in hidden))
                hidden = tuple(h[:-dec] for h in hidden)
            last_batch_size = batch_size

            if flat_hidden:
                hidden = (inner(step_input, hidden[0], leaking_rate, *weight),)
            else:
                hidden = inner(step_input, hidden, leaking_rate, *weight)

            output.append(hidden[0])
        hiddens.append(hidden)
        hiddens.reverse()

        hidden = tuple(torch.cat(h, 0) for h in zip(*hiddens))
        assert hidden[0].size(0) == batch_sizes[0]
        if flat_hidden:
            hidden = hidden[0]
        output = torch.cat(output, 0)

        return hidden, output

    return forward


def StackedRNN(inners, num_layers, lstm=False, train=True):
    num_directions = len(inners)
    total_layers = num_layers * num_directions

    def forward(input, hidden, weight, batch_sizes):
        assert (len(weight) == total_layers)
        next_hidden = []
        all_layers_output = []

        for i in range(num_layers):
            all_output = []
            for j, inner in enumerate(inners):
                l = i * num_directions + j

                hy, output = inner(input, hidden[l], weight[l], batch_sizes)
                next_hidden.append(hy)
                all_output.append(output)

            input = torch.cat(all_output, input.dim() - 1)
            all_layers_output.append(input)

        all_layers_output = torch.cat(all_layers_output, -1)
        next_hidden = torch.cat(next_hidden, 0).view(
            total_layers, *next_hidden[0].size())

        return next_hidden, all_layers_output

    return forward


def ResTanhCell(input, hidden, leaking_rate, w_ih, w_hh, b_ih=None):
    hy_ = torch.tanh(F.linear(input, w_ih, b_ih) + F.linear(hidden, w_hh))
    hy = (1 - leaking_rate) * hidden + leaking_rate * hy_
    return hy


def ResReLUCell(input, hidden, leaking_rate, w_ih, w_hh, b_ih=None):
    hy_ = F.relu(F.linear(input, w_ih, b_ih) + F.linear(hidden, w_hh))
    hy = (1 - leaking_rate) * hidden + leaking_rate * hy_
    return hy


def ResIdCell(input, hidden, leaking_rate, w_ih, w_hh, b_ih=None):
    hy_ = F.linear(input, w_ih, b_ih) + F.linear(hidden, w_hh)
    hy = (1 - leaking_rate) * hidden + leaking_rate * hy_
    return hy

In [4]:
import torch


def prepare_target(target, seq_lengths, washout, batch_first=False):
    """ Preprocess target for offline training.

    Args:
        target (seq_len, batch, output_size): tensor containing
            the features of the target sequence.
        seq_lengths: list of lengths of each sequence in the batch.
        washout: number of initial timesteps during which output of the
            reservoir is not forwarded to the readout. One value per sample.
        batch_first: If ``True``, then the input and output tensors are provided
            as (batch, seq, feature). Default: ``False``

    Returns:
        tensor containing the features of the batch's sequences rolled out along
        one axis, minus the washouts and the padded values.
    """

    if batch_first:
        target = target.transpose(0, 1)
    n_sequences = target.size(1)
    target_dim = target.size(2)
    train_len = sum(torch.tensor(seq_lengths) - torch.tensor(washout)).item()

    new_target = torch.zeros(train_len, target_dim, device=target.device)

    idx = 0
    for s in range(n_sequences):
        batch_len = seq_lengths[s] - washout[s]
        new_target[idx:idx + batch_len, :] = target[washout[s]:seq_lengths[s], s, :]
        idx += batch_len

    return new_target


def washout_tensor(tensor, washout, seq_lengths, bidirectional=False, batch_first=False):
    tensor = tensor.transpose(0, 1) if batch_first else tensor.clone()
    if type(seq_lengths) == list:
        seq_lengths = seq_lengths.copy()
    if type(seq_lengths) == torch.Tensor:
        seq_lengths = seq_lengths.clone()

    for b in range(tensor.size(1)):
        if washout[b] > 0:
            tmp = tensor[washout[b]:seq_lengths[b], b].clone()
            tensor[:seq_lengths[b] - washout[b], b] = tmp
            tensor[seq_lengths[b] - washout[b]:, b] = 0
            seq_lengths[b] -= washout[b]

            if bidirectional:
                tensor[seq_lengths[b] - washout[b]:, b] = 0
                seq_lengths[b] -= washout[b]

    if type(seq_lengths) == list:
        max_len = max(seq_lengths)
    else:
        max_len = max(seq_lengths).item()

    return tensor[:max_len], seq_lengths

In [5]:
import torch
import torch.nn as nn
#from torch.nn.utils.rnn import PackedSequence, pad_packed_sequence
#from .reservoir import Reservoir
#from ..utils import washout_tensor


class ESN(nn.Module):
    """ Applies an Echo State Network to an input sequence. Multi-layer Echo
    State Network is based on paper
    Deep Echo State Network (DeepESN): A Brief Survey - Gallicchio, Micheli 2017

    Args:
        input_size: The number of expected features in the input x.
        hidden_size: The number of features in the hidden state h.
        output_size: The number of expected features in the output y.
        num_layers: Number of recurrent layers. Default: 1
        nonlinearity: The non-linearity to use ['tanh'|'relu'|'id'].
            Default: 'tanh'
        batch_first: If ``True``, then the input and output tensors are provided
            as (batch, seq, feature). Default: ``False``
        leaking_rate: Leaking rate of reservoir's neurons. Default: 1
        spectral_radius: Desired spectral radius of recurrent weight matrix.
            Default: 0.9
        w_ih_scale: Scale factor for first layer's input weights (w_ih_l0). It
            can be a number or a tensor of size '1 + input_size' and first element
            is the bias' scale factor. Default: 1
        lambda_reg: Ridge regression's shrinkage parameter. Default: 1
        density: Recurrent weight matrix's density. Default: 1
        w_io: If 'True', then the network uses trainable input-to-output
            connections. Default: ``False``
        readout_training: Readout's traning algorithm ['gd'|'svd'|'cholesky'|'inv'].
            If 'gd', gradients are accumulated during backward
            pass. If 'svd', 'cholesky' or 'inv', the network will learn readout's
            parameters during the forward pass using ridge regression. The
            coefficients are computed using SVD, Cholesky decomposition or
            standard ridge regression formula. 'gd', 'cholesky' and 'inv'
            permit the usage of mini-batches to train the readout.
            If 'inv' and matrix is singular, pseudoinverse is used.
        output_steps: defines how the reservoir's output will be used by ridge
            regression method ['all', 'mean', 'last'].
            If 'all', the entire reservoir output matrix will be used.
            If 'mean', the mean of reservoir output matrix along the timesteps
            dimension will be used.
            If 'last', only the last timestep of the reservoir output matrix
            will be used.
            'mean' and 'last' are useful for classification tasks.

    Inputs: input, washout, h_0, target
        input (seq_len, batch, input_size): tensor containing the features of
            the input sequence. The input can also be a packed variable length
            sequence. See `torch.nn.utils.rnn.pack_padded_sequence`
        washout (batch): number of initial timesteps during which output of the
            reservoir is not forwarded to the readout. One value per batch's
            sample.
        h_0 (num_layers, batch, hidden_size): tensor containing
             the initial reservoir's hidden state for each element in the batch.
             Defaults to zero if not provided.

        target (seq_len*batch - washout*batch, output_size): tensor containing
            the features of the batch's target sequences rolled out along one
            axis, minus the washouts and the padded values. It is only needed
            for readout's training in offline mode. Use `prepare_target` to
            compute it.

    Outputs: output, h_n
        - output (seq_len, batch, hidden_size): tensor containing the output
        features (h_k) from the readout, for each k.
        - **h_n** (num_layers * num_directions, batch, hidden_size): tensor
          containing the reservoir's hidden state for k=seq_len.
    """

    def __init__(self, input_size, hidden_size, output_size, num_layers=1,
                 nonlinearity='tanh', batch_first=False, leaking_rate=1,
                 spectral_radius=0.9, w_ih_scale=1, lambda_reg=0, density=1,
                 w_io=False, readout_training='svd', output_steps='all'):
        super(ESN, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.num_layers = num_layers
        if nonlinearity == 'tanh':
            mode = 'RES_TANH'
        elif nonlinearity == 'relu':
            mode = 'RES_RELU'
        elif nonlinearity == 'id':
            mode = 'RES_ID'
        else:
            raise ValueError("Unknown nonlinearity '{}'".format(nonlinearity))
        self.batch_first = batch_first
        self.leaking_rate = leaking_rate
        self.spectral_radius = spectral_radius
        if type(w_ih_scale) != torch.Tensor:
            self.w_ih_scale = torch.ones(input_size + 1)
            self.w_ih_scale *= w_ih_scale
        else:
            self.w_ih_scale = w_ih_scale

        self.lambda_reg = lambda_reg
        self.density = density
        self.w_io = w_io
        if readout_training in {'gd', 'svd', 'cholesky', 'inv'}:
            self.readout_training = readout_training
        else:
            raise ValueError("Unknown readout training algorithm '{}'".format(
                readout_training))

        self.reservoir = Reservoir(mode, input_size, hidden_size, num_layers,
                                   leaking_rate, spectral_radius,
                                   self.w_ih_scale, density,
                                   batch_first=batch_first)

        if w_io:
            self.readout = nn.Linear(input_size + hidden_size * num_layers,
                                     output_size)
        else:
            self.readout = nn.Linear(hidden_size * num_layers, output_size)
        if readout_training == 'offline':
            self.readout.weight.requires_grad = False

        if output_steps in {'all', 'mean', 'last'}:
            self.output_steps = output_steps
        else:
            raise ValueError("Unknown task '{}'".format(
                output_steps))

        self.XTX = None
        self.XTy = None
        self.X = None

    def forward(self, input, washout, h_0=None, target=None):
        with torch.no_grad():

            output, hidden = self.reservoir(input, h_0)

            if self.batch_first:
                seq_lengths = output.size(0) * [output.size(1)]
            else:
                seq_lengths = output.size(1) * [output.size(0)]

            if self.batch_first:
                output = output.transpose(0, 1)

            output, seq_lengths = washout_tensor(output, washout, seq_lengths)

            if self.w_io:

                input_lengths = [input.size(0)] * input.size(1)

                if self.batch_first:
                    input = input.transpose(0, 1)

                input, _ = washout_tensor(input, washout, input_lengths)
                output = torch.cat([input, output], -1)

            if self.readout_training == 'gd' or target is None:
                with torch.enable_grad():
                    output = self.readout(output)

                    if self.batch_first:
                        output = output.transpose(0, 1)

                    # Uncomment if you want packed output.
                    # if is_packed:
                    #     output = pack_padded_sequence(output, seq_lengths,
                    #                                   batch_first=self.batch_first)

                    return output, hidden

            else:
                batch_size = output.size(1)

                X = torch.ones(target.size(0), 1 + output.size(2), device=target.device)
                row = 0
                for s in range(batch_size):
                    if self.output_steps == 'all':
                        X[row:row + seq_lengths[s], 1:] = output[:seq_lengths[s],
                                                          s]
                        row += seq_lengths[s]
                    elif self.output_steps == 'mean':
                        X[row, 1:] = torch.mean(output[:seq_lengths[s], s], 0)
                        row += 1
                    elif self.output_steps == 'last':
                        X[row, 1:] = output[seq_lengths[s] - 1, s]
                        row += 1

                if self.readout_training == 'cholesky':
                    if self.XTX is None:
                        self.XTX = torch.mm(X.t(), X)
                        self.XTy = torch.mm(X.t(), target)
                    else:
                        self.XTX += torch.mm(X.t(), X)
                        self.XTy += torch.mm(X.t(), target)

                elif self.readout_training == 'svd':
                    # Scikit-Learn SVD solver for ridge regression.
                    U, s, V = torch.svd(X)
                    idx = s > 1e-15  # same default value as scipy.linalg.pinv
                    s_nnz = s[idx][:, None]
                    UTy = torch.mm(U.t(), target)
                    d = torch.zeros(s.size(0), 1, device=X.device)
                    d[idx] = s_nnz / (s_nnz ** 2 + self.lambda_reg)
                    d_UT_y = d * UTy
                    W = torch.mm(V, d_UT_y).t()

                    self.readout.bias = nn.Parameter(W[:, 0])
                    self.readout.weight = nn.Parameter(W[:, 1:])
                elif self.readout_training == 'inv':
                    self.X = X
                    if self.XTX is None:
                        self.XTX = torch.mm(X.t(), X)
                        self.XTy = torch.mm(X.t(), target)
                    else:
                        self.XTX += torch.mm(X.t(), X)
                        self.XTy += torch.mm(X.t(), target)

                return None, None

    def fit(self):
        if self.readout_training in {'gd', 'svd'}:
            return

        if self.readout_training == 'cholesky':
            W = torch.linalg.solve(self.XTy,
                                   self.XTX + self.lambda_reg * torch.eye(
                                       self.XTX.size(0), device=self.XTX.device))[0].t()
            self.XTX = None
            self.XTy = None

            self.readout.bias = nn.Parameter(W[:, 0])
            self.readout.weight = nn.Parameter(W[:, 1:])
        elif self.readout_training == 'inv':
            I = (self.lambda_reg * torch.eye(self.XTX.size(0))).to(
                self.XTX.device)
            A = self.XTX + I
            X_rank = torch.linalg.matrix_rank(A).item()

            if X_rank == self.X.size(0):
                W = torch.mm(torch.inverse(A), self.XTy).t()
            else:
                W = torch.mm(torch.pinverse(A), self.XTy).t()

            self.readout.bias = nn.Parameter(W[:, 0])
            self.readout.weight = nn.Parameter(W[:, 1:])

            self.XTX = None
            self.XTy = None

    def reset_parameters(self):
        self.reservoir.reset_parameters()
        self.readout.reset_parameters()

### 1. Preparando os dados

Para o primeiro experimento, vamos utilizar uma ESN para modelar a dinâmica de um dispositivo de CD player, tendo as entradas as forças aplicadas (nos dois eixos, x e y) pelo 'braço' do CD, e como saídas as posições (x e y)

In [6]:
import torch.nn
import numpy as np
import time

dtype = torch.float
torch.set_default_dtype(dtype)

filename = 'CD_player_arm.dat'

data = np.loadtxt(filename, dtype=np.float32)

X_data = np.expand_dims(data[:, :2], axis=1)
Y_data = np.expand_dims(data[:, 2:], axis=1)

X_data = torch.from_numpy(X_data)
Y_data = torch.from_numpy(Y_data)


FileNotFoundError: CD_player_arm.dat not found.

Fazendo uma divisão para treinamento e teste. Neste caso, vamos usar os primeiros 80% dos dados para treinamento, e o restante para o <b> teste </b>.

In [7]:
s = int(0.8*X_data.shape[0])

trX = X_data[:s]
trY = Y_data[:s]
tsX = X_data[s:]
tsY = Y_data[s:]
tsY = tsY.reshape(tsY.shape[0], tsY.shape[2])

Lembrando que, para o ESN, temos o tempo de washout. Isso significa que vamos esperar um determinado período para 'passar' a condição inicial aleatória e começarmos a computar as saídas do reservatório para o treinamento da matriz de pesos da saída. 

In [8]:
washout = [100]

# Training
trY_flat = prepare_target(trY.clone(), [trX.size(0)], washout)

In [9]:
print(f'Dimensão do treinamento era {trY.shape[0], trY.shape[2]} e agora é {trY_flat.shape[0], trY_flat.shape[1]} usando washout = {washout[0]}')

Dimensão do treinamento era (1638, 2) e agora é (1538, 2) usando washout = 100


### 2. Definindo o modelo

In [10]:
input_size = trX.shape[2]
output_size = trY.shape[2]
hidden_size = 100
density = 0.9

In [11]:
model = ESN(input_size, hidden_size, output_size, density=density)

_ = model(trX, washout, None, trY_flat)

### 3. Treinamento da Echo State Network

In [12]:
loss_fcn = torch.nn.MSELoss()

In [13]:
%%timeit

model.fit()
output, hidden = model(trX, washout)

301 ms ± 6.19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [14]:
model.fit()
output, hidden = model(trX, washout)

In [15]:
print("Training error:", loss_fcn(output, trY[washout[0]:]).item())


Training error: 0.008149297907948494


### 4. Teste da Echo State Network

In [16]:
# Test
output, hidden = model(tsX, [0], hidden)
print("Test error:", loss_fcn(output, tsY).item())

Test error: 0.48554036021232605


  return F.mse_loss(input, target, reduction=self.reduction)


In [17]:
import numpy as np

def root_relative_squared_error(y_true, y_pred):
    """
    Calculates the Root Relative Squared Error (RRSE).

    Args:
        y_true (array-like): Array of true values.
        y_pred (array-like): Array of predicted values.

    Returns:
        float: RRSE value.
    """
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    # Calculate the mean squared error (MSE)
    mse = np.mean((y_true - y_pred) ** 2)

    # Calculate the squared difference between the actual values and the mean of the actual values
    mean_actual = np.mean(y_true)
    squared_difference = np.mean((y_true - mean_actual) ** 2)

    # Calculate the root relative squared error (RRSE)
    rrse = np.sqrt(mse / squared_difference)

    return rrse

In [18]:
output = output.detach()
output = torch.squeeze(output, dim=1)

In [19]:
rrse_var1 = root_relative_squared_error(output[:,0], tsY[:,0])
rrse_var2 = root_relative_squared_error(output[:,1], tsY[:,1])

mae = np.array(torch.mean(abs(output - tsY),dim=0))

In [20]:
print(f'Output #1 - RRSE = {rrse_var1}, MAE = {mae[0]} \nOutput #2 - RRSE = {rrse_var2}, MAE = {mae[1]}')

Output #1 - RRSE = 0.3855954110622406, MAE = 0.0916481763124466 
Output #2 - RRSE = 0.1852477788925171, MAE = 0.09282106906175613


Sabemos que esse modelo, assim como o MLP, depende muito da inicialização. Vamos rodar algumas vezes. :)

In [21]:
n = 10
loss_train = []
loss_test = []
rrse_v1 = []
rrse_v2 = [] 
mae_ = []


for i in range(n):
    model = ESN(input_size, hidden_size, output_size, density=density)

    _ = model(trX, washout, None, trY_flat)

    model.fit()
    output_train, hidden = model(trX, washout)

    
    # Test
    output, hidden = model(tsX, [0], hidden)

    loss_train.append(loss_fcn(output_train, trY[washout[0]:]).item())

    loss_test.append(loss_fcn(output, tsY).item())

    output = torch.squeeze(output.detach(), dim=1)
        
    rrse_v1.append(root_relative_squared_error(output[:,0], tsY[:,0]))
    rrse_v2.append(root_relative_squared_error(output[:,1], tsY[:,1]))

    mae_.append(np.array(torch.mean(abs(output - tsY),dim=0)))

In [22]:
loss_train = np.array(loss_train)
loss_test = np.array(loss_test)
rrse_v1 = np.array(rrse_v1)
rrse_v2 = np.array(rrse_v2)
mae_ = np.array(mae_)


In [23]:
print(f'Output #1 - RRSE = {np.mean(rrse_v1)} +- {np.std(rrse_v1)}, MAE = {np.mean(mae_,axis=0)[0]} +- {np.std(mae_,axis=0)[0]} \nOutput #2 - RRSE = {np.mean(rrse_v2)} +- {np.std(rrse_v2)}, MAE = {np.mean(mae_,axis=0)[1]} +- {np.std(mae_,axis=0)[1]}')

Output #1 - RRSE = 0.4070626199245453 +- 0.017119862139225006, MAE = 0.09466187655925751 +- 0.0033661064226180315 
Output #2 - RRSE = 0.197372704744339 +- 0.005639947485178709, MAE = 0.09902842342853546 +- 0.0028421322349458933


In [24]:
np.mean(mae_,axis=0)

array([0.09466188, 0.09902842], dtype=float32)

### 5. Mudando hiperparâmetros

Mudando a esparsidade das conexões 

In [25]:
n = 15
loss_train = []
loss_test = []
rrse_v1 = []
rrse_v2 = [] 
mae_ = []

density = 0.6

for i in range(n):
    model = ESN(input_size, hidden_size, output_size, density=density)

    _ = model(trX, washout, None, trY_flat)

    model.fit()
    output_train, hidden = model(trX, washout)

    
    # Test
    output_, hidden = model(tsX, [0], hidden)

    loss_train.append(loss_fcn(output_train, trY[washout[0]:]).item())

    loss_test.append(loss_fcn(output_, tsY).item())

    output = torch.squeeze(output_.detach(), dim=1)
        
    rrse_v1.append(root_relative_squared_error(output[:,0], tsY[:,0]))
    rrse_v2.append(root_relative_squared_error(output[:,1], tsY[:,1]))

    mae_.append(np.array(torch.mean(abs(output - tsY),dim=0)))

loss_train = np.array(loss_train)
loss_test = np.array(loss_test)
rrse_v1 = np.array(rrse_v1)
rrse_v2 = np.array(rrse_v2)
mae_ = np.array(mae_)

print(f'Output #1 - RRSE = {np.mean(rrse_v1)} +- {np.std(rrse_v1)}, MAE = {np.mean(mae_,axis=0)[0]} +- {np.std(mae_,axis=0)[0]} \nOutput #2 - RRSE = {np.mean(rrse_v2)} +- {np.std(rrse_v2)}, MAE = {np.mean(mae_,axis=0)[1]} +- {np.std(mae_,axis=0)[1]}')


Output #1 - RRSE = 0.4210814833641052 +- 0.015861159190535545, MAE = 0.09549697488546371 +- 0.0033339334186166525 
Output #2 - RRSE = 0.1910683512687683 +- 0.006454088259488344, MAE = 0.09664402902126312 +- 0.003075381275266409


Mudando o Leaking Rate

In [26]:
n = 15
loss_train = []
loss_test = []
rrse_v1 = []
rrse_v2 = [] 
mae_ = []

density = 0.6
leaking_rate = 0.8

for i in range(n):
    model = ESN(input_size, hidden_size, output_size, density=density, leaking_rate=leaking_rate)

    _ = model(trX, washout, None, trY_flat)

    model.fit()
    output_train, hidden = model(trX, washout)

    
    # Test
    output_, hidden = model(tsX, [0], hidden)

    loss_train.append(loss_fcn(output_train, trY[washout[0]:]).item())

    loss_test.append(loss_fcn(output_, tsY).item())

    output = torch.squeeze(output_.detach(), dim=1)
        
    rrse_v1.append(root_relative_squared_error(output[:,0], tsY[:,0]))
    rrse_v2.append(root_relative_squared_error(output[:,1], tsY[:,1]))

    mae_.append(np.array(torch.mean(abs(output - tsY),dim=0)))

loss_train = np.array(loss_train)
loss_test = np.array(loss_test)
rrse_v1 = np.array(rrse_v1)
rrse_v2 = np.array(rrse_v2)
mae_ = np.array(mae_)

print(f'Output #1 - RRSE = {np.mean(rrse_v1)} +- {np.std(rrse_v1)}, MAE = {np.mean(mae_,axis=0)[0]} +- {np.std(mae_,axis=0)[0]} \nOutput #2 - RRSE = {np.mean(rrse_v2)} +- {np.std(rrse_v2)}, MAE = {np.mean(mae_,axis=0)[1]} +- {np.std(mae_,axis=0)[1]}')


Output #1 - RRSE = 0.4314011037349701 +- 0.02539672888815403, MAE = 0.09914489090442657 +- 0.0033575049601495266 
Output #2 - RRSE = 0.19682073593139648 +- 0.0044317287392914295, MAE = 0.09940026700496674 +- 0.002229820005595684


In [28]:
n = 15
loss_train = []
loss_test = []
rrse_v1 = []
rrse_v2 = [] 
mae_ = []

density = 0.6
leaking_rate = 0.95
spectral_radius = 0.7

for i in range(n):
    model = ESN(input_size, hidden_size, output_size, density=density, leaking_rate=leaking_rate, spectral_radius=spectral_radius)

    _ = model(trX, washout, None, trY_flat)

    model.fit()
    output_train, hidden = model(trX, washout)

    
    # Test
    output_, hidden = model(tsX, [0], hidden)

    loss_train.append(loss_fcn(output_train, trY[washout[0]:]).item())

    loss_test.append(loss_fcn(output_, tsY).item())

    output = torch.squeeze(output_.detach(), dim=1)
        
    rrse_v1.append(root_relative_squared_error(output[:,0], tsY[:,0]))
    rrse_v2.append(root_relative_squared_error(output[:,1], tsY[:,1]))

    mae_.append(np.array(torch.mean(abs(output - tsY),dim=0)))

loss_train = np.array(loss_train)
loss_test = np.array(loss_test)
rrse_v1 = np.array(rrse_v1)
rrse_v2 = np.array(rrse_v2)
mae_ = np.array(mae_)

print(f'Output #1 - RRSE = {np.mean(rrse_v1)} +- {np.std(rrse_v1)}, MAE = {np.mean(mae_,axis=0)[0]} +- {np.std(mae_,axis=0)[0]} \nOutput #2 - RRSE = {np.mean(rrse_v2)} +- {np.std(rrse_v2)}, MAE = {np.mean(mae_,axis=0)[1]} +- {np.std(mae_,axis=0)[1]}')


Output #1 - RRSE = 0.41981709003448486 +- 0.02124977670609951, MAE = 0.09687023609876633 +- 0.003925939556211233 
Output #2 - RRSE = 0.19524364173412323 +- 0.00787263922393322, MAE = 0.09818563610315323 +- 0.002922265324741602


Podemos testar o mesmo modelo em diferentes problemas para identificação de sistemas. Por exemplo, vamos aplicar ESN em outro benchmark, disponível em https://homes.esat.kuleuven.be/~smc/daisy/

In [29]:
import torch.nn
import numpy as np
import time

dtype = torch.float
torch.set_default_dtype(dtype)

filename = 'cstr.dat'

data = np.loadtxt(filename, dtype=np.float32)

X_data = np.expand_dims(data[:, :2], axis=1)
Y_data = np.expand_dims(data[:, 2:], axis=1)

X_data = torch.from_numpy(X_data)
Y_data = torch.from_numpy(Y_data)


In [33]:
n = 15
loss_train = []
loss_test = []
rrse_v1 = []
rrse_v2 = [] 
mae_ = []

density = 0.6
leaking_rate = 0.8

for i in range(n):
    model = ESN(input_size, hidden_size, output_size, density=density, leaking_rate=leaking_rate)

    _ = model(trX, washout, None, trY_flat)

    model.fit()
    output_train, hidden = model(trX, washout)

    
    # Test
    output_, hidden = model(tsX, [0], hidden)

    loss_train.append(loss_fcn(output_train, trY[washout[0]:]).item())

    loss_test.append(loss_fcn(output_, tsY).item())

    output = torch.squeeze(output_.detach(), dim=1)
        
    rrse_v1.append(root_relative_squared_error(output[:,0], tsY[:,0]))
    rrse_v2.append(root_relative_squared_error(output[:,1], tsY[:,1]))

    mae_.append(np.array(torch.mean(abs(output - tsY),dim=0)))

loss_train = np.array(loss_train)
loss_test = np.array(loss_test)
rrse_v1 = np.array(rrse_v1)
rrse_v2 = np.array(rrse_v2)
mae_ = np.array(mae_)

print(f'Output #1 - RRSE = {np.mean(rrse_v1)} +- {np.std(rrse_v1)}, MAE = {np.mean(mae_,axis=0)[0]} +- {np.std(mae_,axis=0)[0]} \nOutput #2 - RRSE = {np.mean(rrse_v2)} +- {np.std(rrse_v2)}, MAE = {np.mean(mae_,axis=0)[1]} +- {np.std(mae_,axis=0)[1]}')


  return F.mse_loss(input, target, reduction=self.reduction)


Output #1 - RRSE = 0.4295271635055542 +- 0.026224739849567413, MAE = 0.0987132117152214 +- 0.0029563955031335354 
Output #2 - RRSE = 0.1988455206155777 +- 0.006065335590392351, MAE = 0.09996839612722397 +- 0.003667193464934826
