In [55]:
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.utils.data as Data
import torchvision
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import math
import pandas as pd
import time
#from utils.utils import *
import os
from torch.nn.utils import weight_norm
from contiguous_params import ContiguousParams

from sklearn.metrics import (
    accuracy_score,
    confusion_matrix,
    f1_score,
    log_loss,
    precision_score,
    recall_score,
)

class PositionalEncoding(nn.Module):
    "Implement the PE function."
    def __init__(self, d_model, dropout, max_len=128):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0., max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0., d_model, 2) *
                             -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        pe = pe.transpose(1,2)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + Variable(self.pe[:, :x.size(1)],requires_grad=False)
        # x = x + Variable(self.pe, requires_grad=False)
        return self.dropout(x)

class SelfAttention(nn.Module):
    def __init__(self, k, heads = 8, drop_rate = 0):
        super(SelfAttention, self).__init__()
        self.k, self.heads = k, heads
        # map k-dimentional input to k*heads dimentions
        self.tokeys    = nn.Linear(k, k * heads, bias = False)
        self.toqueries = nn.Linear(k, k * heads, bias = False)
        self.tovalues  = nn.Linear(k, k * heads, bias = False)
        # set dropout rate
        self.dropout_attention = nn.Dropout(drop_rate)
        # squeeze to k dimentions through linear transformation
        self.unifyheads = nn.Linear(heads * k, k)
        
    def forward(self, x):
        
        b, t, k = x.size()
        h = self.heads
        queries = self.toqueries(x).view(b, t, h, k)
        keys    = self.tokeys(x).view(b, t, h, k)
        values  = self.tovalues(x).view(b, t, h, k)
        # squeeze head into batch dimension
        queries = queries.transpose(1, 2).contiguous().view(b * h, t, k)
        keys    = keys.transpose(1, 2).contiguous().view(b * h, t, k)
        values  = values.transpose(1, 2).contiguous().view(b * h, t, k)
        # normalize the dot products
        queries = queries / (k ** (1/4))
        keys = keys / (k ** (1/4))
        # matrix multiplication
        dot  = torch.bmm(queries, keys.transpose(1,2))
        # softmax normalization
        dot = F.softmax(dot, dim=2)
        dot = self.dropout_attention(dot)
        out = torch.bmm(dot, values).view(b, h, t, k)
        # swap h, t back, unify heads
        out = out.transpose(1, 2).contiguous().view(b, t, h*k)
        
        return self.unifyheads(out) # (b, t, k)

class TransformerBlock(nn.Module):
    def __init__(self, k, heads, drop_rate):
        super(TransformerBlock, self).__init__()

        self.attention = SelfAttention(k, heads = heads, drop_rate = drop_rate)
        self.norm1 = nn.LayerNorm(k)

        self.mlp = nn.Sequential(
            nn.Linear(k, 4*k),
            nn.ReLU(),
            nn.Linear(4*k, k)
        )
        self.norm2 = nn.LayerNorm(k)
        self.dropout_forward = nn.Dropout(drop_rate)

    def forward(self, x):
        
        # perform self-attention
        attended = self.attention(x)
        # perform layer norm
        x = self.norm1(attended + x)
        # feedforward and layer norm
        feedforward = self.mlp(x)
        
        return self.dropout_forward(self.norm2(feedforward + x))

class Chomp2d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp2d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :, :-self.chomp_size].contiguous()

class IMU_Fusion_Block(nn.Module):
    def __init__(self, input_2Dfeature_channel, input_channel, 
                 feature_channel, kernel_size_grav,
                 scale_num, dataset_name):
        super(IMU_Fusion_Block, self).__init__()
        
        self.scale_num         = scale_num
        self.input_channel     = input_channel
        self.tcn_grav_convs    = []
        self.tcn_gyro_convs    = []
        self.tcn_acc_convs     = []
        
        for i in range(self.scale_num):
            
            dilation_num_grav = i+1
            
            padding_grav      = (kernel_size_grav - 1) * dilation_num_grav
            kernel_size_gyro  = padding_grav
            kernel_size_acc   = padding_grav + 1
            
            tcn_grav = nn.Sequential(
                weight_norm(nn.Conv2d(input_2Dfeature_channel, feature_channel, 
                                      (1,kernel_size_grav), 1, (0,padding_grav), 
                                      dilation=dilation_num_grav)),
                Chomp2d(padding_grav),
                nn.ReLU(),
                )
            
            
            if kernel_size_gyro == 1:
                tcn_gyro = nn.Sequential(
                    weight_norm(nn.Conv2d(input_2Dfeature_channel, feature_channel, 
                                          (1,1), 1, (0,0), 
                                          dilation=1)),
                    nn.ReLU(),
                    )
            else:
                tcn_gyro = nn.Sequential(
                    weight_norm(nn.Conv2d(input_2Dfeature_channel, feature_channel, 
                                          (1,kernel_size_gyro), 1, (0,(kernel_size_gyro-1)*1), 
                                          dilation=1)),
                    Chomp2d((kernel_size_gyro-1)*1),
                    nn.ReLU(),
                    )
            
            tcn_acc = nn.Sequential(
                weight_norm(nn.Conv2d(input_2Dfeature_channel, feature_channel, 
                                      (1,kernel_size_acc), 1, (0,(kernel_size_acc-1)*1), 
                                      dilation=1)),
                Chomp2d((kernel_size_acc-1)*1),
                nn.ReLU(),
                )
            
            setattr(self, 'tcn_grav_convs%i' % i, tcn_grav)
            self.tcn_grav_convs.append(tcn_grav)
            setattr(self, 'tcn_gyro_convs%i' % i, tcn_gyro)
            self.tcn_gyro_convs.append(tcn_gyro)
            setattr(self, 'tcn_acc_convs%i' % i, tcn_acc)
            self.tcn_acc_convs.append(tcn_acc)
        
        self.attention = nn.Sequential(
                nn.Linear(3*feature_channel, 1),
                # nn.Tanh()
                nn.PReLU()
                )
        
    def forward(self, x):
        
        x_grav = x[:,:,0:3,:]
        x_gyro = x[:,:,3:6,:]
        x_acc  = x[:,:,6:9,:]
    
        for i in range(self.scale_num):
            
            out_grav = self.tcn_grav_convs[i](x_grav).unsqueeze(4)
            out_gyro = self.tcn_gyro_convs[i](x_gyro).unsqueeze(4)
            out_acc  = self.tcn_acc_convs[i](x_acc)
            
            if i == 0:
                out_attitude = torch.cat([out_grav, out_gyro], dim=4)
                out_dynamic  = out_acc
            else:
                out_attitude = torch.cat([out_attitude, out_grav], dim=4)
                out_attitude = torch.cat([out_attitude, out_gyro], dim=4)
                out_dynamic  = torch.cat([out_dynamic, out_acc], dim=2)
                
        # (batch_size, time_length, sensor_num*scale_num, 3(xyz), feature_chnnl)
        out_attitude = out_attitude.permute(0,3,4,2,1)
        # (batch_size, time_length, sensor_num*scale_num, 3(xyz)*feature_chnnl)
        out_attitude = out_attitude.reshape(out_attitude.shape[0], out_attitude.shape[1], out_attitude.shape[2], -1)
        # time-step-wise sensor attention, sensor_attn:(batch_size, time_length, sensor_num*scale_num, 1)
        sensor_attn  = self.attention(out_attitude).squeeze(3)
        sensor_attn  = F.softmax(sensor_attn, dim=2).unsqueeze(-1)
        out_attitude = sensor_attn * out_attitude
        
        # used for normalization
        norm_num     = torch.mean(sensor_attn.squeeze(-1), dim=1)
        norm_num     = torch.pow(norm_num, 2)
        norm_num     = torch.sqrt(torch.sum(norm_num, dim=1))
        norm_num     = (pow(self.scale_num,0.5)/norm_num).unsqueeze(1).unsqueeze(2).unsqueeze(3)
        
        out_attitude = out_attitude * norm_num
        
        # (batch_size, time_length, sensor_num*scale_num, 3(xyz), feature_chnnl)
        out_attitude = out_attitude.reshape(out_attitude.shape[0], out_attitude.shape[1], out_attitude.shape[2], 3, -1)
        # (batch_size, time_length, sensor_num*scale_num*3(xyz), feature_chnnl)
        out_attitude = out_attitude.reshape(out_attitude.shape[0], out_attitude.shape[1], out_attitude.shape[2]*3, -1)
        # (batch_size, feature_chnnl, sensor_num*scale_num*3(xyz), time_length)
        out_attitude = out_attitude.permute(0,3,2,1)
        
        # concatenate all the different scales
        out_attitude = torch.split(out_attitude, 6, dim=2)
        for j in range(len(out_attitude)):
            per_scale_attitude = torch.split(out_attitude[j], 3, dim=2)
            for k in range(len(per_scale_attitude)):
                if k == 0:
                    per_attitude   = per_scale_attitude[k]
                else:
                    per_attitude   = per_attitude + per_scale_attitude[k]
            if j == 0:
                all_attitude = per_attitude
            else:
                all_attitude = torch.cat([all_attitude, per_attitude], dim=2)
        out_attitude = all_attitude
        
        out          = torch.cat([out_attitude, out_dynamic], dim = 2)
        
        return out, sensor_attn

class If_ConvTransformer(nn.Module):
    def __init__(self, input_2Dfeature_channel, input_channel, feature_channel,
                 kernel_size, kernel_size_grav, scale_num, feature_channel_out,
                 multiheads, drop_rate, dataset_name, data_length, num_class):
        
        super(If_ConvTransformer, self).__init__()
        
        self.IMU_fusion_block = IMU_Fusion_Block(input_2Dfeature_channel, input_channel, feature_channel,
                                                 kernel_size_grav, scale_num, dataset_name)
        
        self.conv2 = nn.Sequential(
            nn.Conv2d(feature_channel, feature_channel, (1,kernel_size), 1, (0,kernel_size//2)),
            nn.BatchNorm2d(feature_channel),
            nn.ReLU(),
            # nn.MaxPool2d(2)
            )
        
        self.conv3 = nn.Sequential(
            nn.Conv2d(feature_channel, feature_channel, (1,kernel_size), 1, (0,kernel_size//2)),
            nn.BatchNorm2d(feature_channel),
            nn.ReLU(),
            # nn.MaxPool2d(2)
            )
        
        self.conv4 = nn.Sequential(
            nn.Conv2d(feature_channel, feature_channel, (1,kernel_size), 1, (0,kernel_size//2)),
            nn.BatchNorm2d(feature_channel),
            nn.ReLU(),
            # nn.MaxPool2d(2)
            )
        
        if input_channel  == 12:
            reduced_channel = 6
        else:
            reduced_channel = 3
        
        self.transition = nn.Sequential(
            nn.Conv1d(feature_channel*(input_channel-reduced_channel)*scale_num, feature_channel_out, 1, 1),
            nn.BatchNorm1d(feature_channel_out),
            nn.ReLU()
            )
        
        self.position_encode = PositionalEncoding(feature_channel_out, drop_rate, data_length)
        
        self.transformer_block1 = TransformerBlock(feature_channel_out, multiheads, drop_rate)
        
        self.transformer_block2 = TransformerBlock(feature_channel_out, multiheads, drop_rate)
        
        self.global_ave_pooling = nn.AdaptiveAvgPool1d(1)
        
        self.linear = nn.Linear(feature_channel_out, num_class)

    def forward(self, x):
        
        # hidden = None
        batch_size = x.shape[0]
        feature_channel = x.shape[1]
        input_channel = x.shape[2]
        data_length = x.shape[-1]
        
        x, out_attn = self.IMU_fusion_block(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        print(x.shape)
        x = x.view(batch_size, -1, data_length)
        print(x.shape)
        x = self.transition(x)
        
        x = self.position_encode(x)
        x = x.permute(0,2,1)
        
        x = self.transformer_block1(x)
        x = self.transformer_block2(x)
        x = x.permute(0,2,1)
        
        x = self.global_ave_pooling(x).squeeze()
        
        output = self.linear(x)
        
        return output, out_attn
    
def train_op(network, EPOCH, BATCH_SIZE, LR,
             train_x, train_y, val_x, val_y, X_test, y_test,
             output_directory_models, log_training_duration, test_split):
    # prepare training_data
    if train_x.shape[0] % BATCH_SIZE == 1:
        drop_last_flag = True
    else:
        drop_last_flag = False
    torch_dataset = Data.TensorDataset(torch.FloatTensor(train_x), torch.tensor(train_y).long())
    train_loader = Data.DataLoader(dataset = torch_dataset,
                                    batch_size = BATCH_SIZE,
                                    shuffle = True,
                                    drop_last = drop_last_flag
                                   )
    
    # init lr&train&test loss&acc log
    lr_results = []
    
    loss_train_results = []
    accuracy_train_results = []
    
    loss_validation_results = []
    accuracy_validation_results = []
    macro_f1_val_results        = []
    
    loss_test_results = []
    accuracy_test_results = []
    macro_f1_test_results       = []
    
    # prepare optimizer&scheduler&loss_function
    parameters = ContiguousParams(network.parameters())
    optimizer = torch.optim.Adam(parameters.contiguous(),lr = LR)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.5, 
                                                           patience=5,
                                                           min_lr=LR/10, verbose=True)
    # loss_function = nn.CrossEntropyLoss(reduction='sum')
    loss_function = LabelSmoothingCrossEntropy()
    
    # save init model    
    output_directory_init = output_directory_models+'init_model.pkl'
    torch.save(network.state_dict(), output_directory_init)   # only save the init parameters
    
    training_duration_logs = []
    start_time = time.time()
    for epoch in range (EPOCH):
        
        epoch_tau = epoch+1
        tau = max(1 - (epoch_tau - 1) / 50, 0.5)
        for m in network.modules():
            if hasattr(m, '_update_tau'):
                m._update_tau(tau)
                # print(a)
        
        for step, (x,y) in enumerate(train_loader):
            
            batch_x = x.cuda()
            batch_y = y.cuda()
            output_bc = network(batch_x)[0]
            
            # cal the sum of pre loss per batch 
            loss = loss_function(output_bc, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        
        # test per epoch
        network.eval()
        test_flag = True
        # loss_train:loss of training set; accuracy_train:pre acc of training set
        loss_train, accuracy_train, _ = get_test_loss_acc(network, loss_function, train_x, train_y, test_split)
        loss_validation, accuracy_validation, macro_f1_val = get_test_loss_acc(network, loss_function, val_x, val_y, test_split)
        loss_test, accuracy_test, macro_f1_test = get_test_loss_acc(network, loss_function, X_test, y_test, test_split)
        test_flag = False
        network.train()
        
        # update lr
        scheduler.step(accuracy_validation)
        lr = optimizer.param_groups[0]['lr']
        
        ######################################dropout#####################################
        # loss_train, accuracy_train = get_loss_acc(network.eval(), loss_function, train_x, train_y, test_split)
        
        # loss_validation, accuracy_validation = get_loss_acc(network.eval(), loss_function, test_x, test_y, test_split)
        
        # network.train()
        ##################################################################################
        
        # log lr&train&validation loss&acc per epoch
        lr_results.append(lr)
        loss_train_results.append(loss_train)    
        accuracy_train_results.append(accuracy_train)
        
        loss_validation_results.append(loss_validation)    
        accuracy_validation_results.append(accuracy_validation)
        macro_f1_val_results.append(macro_f1_val)
        
        loss_test_results.append(loss_test)    
        accuracy_test_results.append(accuracy_test)
        macro_f1_test_results.append(macro_f1_test)
        
        # print training process
        if (epoch+1) % 1 == 0:
            print('Epoch:', (epoch+1), '|lr:', lr,
                  '| train_loss:', loss_train, 
                  '| train_acc:', accuracy_train, 
                  '| validation_loss:', loss_validation, 
                  '| validation_acc:', accuracy_validation)
        
        save_models(network, output_directory_models, 
                    loss_train, loss_train_results, 
                    accuracy_validation, accuracy_validation_results,
                    start_time, training_duration_logs)
    
    # log training time 
    per_training_duration = time.time() - start_time
    log_training_duration.append(per_training_duration)
    
    # save last_model
    output_directory_last = output_directory_models+'last_model.pkl'
    torch.save(network.state_dict(), output_directory_last)   # save only the init parameters
    
    # log history
    history = log_history(EPOCH, lr_results, loss_train_results, accuracy_train_results, 
                          loss_validation_results, accuracy_validation_results,
                          loss_test_results, accuracy_test_results,
                          output_directory_models)
    
    plot_learning_history(EPOCH, history, output_directory_models)
    
    return(history, per_training_duration, log_training_duration)
from ptflops import get_model_complexity_info

In [59]:
input_channel = 12
dataset_name = "HAPT"
data_length = 128
nb_classes = 6
model = If_ConvTransformer(1, input_channel, 64, 5, 3, 2, 128, 1, 0.2, dataset_name, data_length, nb_classes)

In [60]:
input = torch.randn(1, 1,input_channel , data_length)

In [61]:
t = model(input)

torch.Size([1, 64, 12, 128])
torch.Size([1, 768, 128])


In [54]:
macs, params = get_model_complexity_info(model, (1,input_channel , data_length), as_strings=True,
                                       print_per_layer_stat=True, verbose=True)
print('{:<30}  {:<8}'.format('Computational complexity: ', macs))
print('{:<30}  {:<8}'.format('Number of parameters: ', params))

torch.Size([1, 64, 12, 128])
torch.Size([1, 768, 128])
If_ConvTransformer(
  559.5 k, 100.000% Params, 159.65 MMac, 94.647% MACs, 
  (IMU_fusion_block): IMU_Fusion_Block(
    2.24 k, 0.401% Params, 899.46 KMac, 0.533% MACs, 
    (tcn_grav_convs0): Sequential(
      320, 0.057% Params, 124.42 KMac, 0.074% MACs, 
      (0): Conv2d(320, 0.057% Params, 99.84 KMac, 0.059% MACs, 1, 64, kernel_size=(1, 3), stride=(1, 1), padding=(0, 2))
      (1): Chomp2d(0, 0.000% Params, 0.0 Mac, 0.000% MACs, )
      (2): ReLU(0, 0.000% Params, 24.58 KMac, 0.015% MACs, )
    )
    (tcn_gyro_convs0): Sequential(
      256, 0.046% Params, 98.88 KMac, 0.059% MACs, 
      (0): Conv2d(256, 0.046% Params, 74.3 KMac, 0.044% MACs, 1, 64, kernel_size=(1, 2), stride=(1, 1), padding=(0, 1))
      (1): Chomp2d(0, 0.000% Params, 0.0 Mac, 0.000% MACs, )
      (2): ReLU(0, 0.000% Params, 24.58 KMac, 0.015% MACs, )
    )
    (tcn_acc_convs0): Sequential(
      320, 0.057% Params, 124.42 KMac, 0.074% MACs, 
      (0): Conv2

In [None]:
text = "If_ConvTransformer( 559.5 k, 100.000% Params, 159.65 MMac, 94.647% MACs, (IMU_fusion_block): IMU_Fusion_Block( 2.24 k, 0.401% Params, 899.46 KMac, 0.533% MACs, (tcn_grav_convs0): Sequential( 320, 0.057% Params, 124.42 KMac, 0.074% MACs, (0): Conv2d(320, 0.057% Params, 99.84 KMac, 0.059% MACs, 1, 64, kernel_size=(1, 3), stride=(1, 1), padding=(0, 2)) (1): Chomp2d(0, 0.000% Params, 0.0 Mac, 0.000% MACs, ) (2): ReLU(0, 0.000% Params, 24.58 KMac, 0.015% MACs, ) ) (tcn_gyro_convs0): Sequential( 256, 0.046% Params, 98.88 KMac, 0.059% MACs, (0): Conv2d(256, 0.046% Params, 74.3 KMac, 0.044% MACs, 1, 64, kernel_size=(1, 2), stride=(1, 1), padding=(0, 1)) (1): Chomp2d(0, 0.000% Params, 0.0 Mac, 0.000% MACs, ) (2): ReLU(0, 0.000% Params, 24.58 KMac, 0.015% MACs, ) ) (tcn_acc_convs0): Sequential( 320, 0.057% Params, 124.42 KMac, 0.074% MACs, (0): Conv2d(320, 0.057% Params, 99.84 KMac, 0.059% MACs, 1, 64, kernel_size=(1, 3), stride=(1, 1), padding=(0, 2)) (1): Chomp2d(0, 0.000% Params, 0.0 Mac, 0.000% MACs, ) (2): ReLU(0, 0.000% Params, 24.58 KMac, 0.015% MACs, ) ) (tcn_grav_convs1): Sequential( 320, 0.057% Params, 125.95 KMac, 0.075% MACs, (0): Conv2d(320, 0.057% Params, 101.38 KMac, 0.060% MACs, 1, 64, kernel_size=(1, 3), stride=(1, 1), padding=(0, 4), dilation=(2, 2)) (1): Chomp2d(0, 0.000% Params, 0.0 Mac, 0.000% MACs, ) (2): ReLU(0, 0.000% Params, 24.58 KMac, 0.015% MACs, ) ) (tcn_gyro_convs1): Sequential( 384, 0.069% Params, 150.34 KMac, 0.089% MACs, (0): Conv2d(384, 0.069% Params, 125.76 KMac, 0.075% MACs, 1, 64, kernel_size=(1, 4), stride=(1, 1), padding=(0, 3)) (1): Chomp2d(0, 0.000% Params, 0.0 Mac, 0.000% MACs, ) (2): ReLU(0, 0.000% Params, 24.58 KMac, 0.015% MACs, ) ) (tcn_acc_convs1): Sequential( 448, 0.080% Params, 176.64 KMac, 0.105% MACs, (0): Conv2d(448, 0.080% Params, 152.06 KMac, 0.090% MACs, 1, 64, kernel_size=(1, 5), stride=(1, 1), padding=(0, 4)) (1): Chomp2d(0, 0.000% Params, 0.0 Mac, 0.000% MACs, ) (2): ReLU(0, 0.000% Params, 24.58 KMac, 0.015% MACs, ) ) (attention): Sequential( 194, 0.035% Params, 98.82 KMac, 0.059% MACs, (0): Linear(193, 0.034% Params, 98.31 KMac, 0.058% MACs, in_features=192, out_features=1, bias=True) (1): PReLU(1, 0.000% Params, 512.0 Mac, 0.000% MACs, num_parameters=1) ) ) (conv2): Sequential( 20.67 k, 3.695% Params, 31.85 MMac, 18.883% MACs, (0): Conv2d(20.54 k, 3.672% Params, 31.56 MMac, 18.708% MACs, 64, 64, kernel_size=(1, 5), stride=(1, 1), padding=(0, 2)) (1): BatchNorm2d(128, 0.023% Params, 196.61 KMac, 0.117% MACs, 64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) (2): ReLU(0, 0.000% Params, 98.3 KMac, 0.058% MACs, ) ) (conv3): Sequential( 20.67 k, 3.695% Params, 31.85 MMac, 18.883% MACs, (0): Conv2d(20.54 k, 3.672% Params, 31.56 MMac, 18.708% MACs, 64, 64, kernel_size=(1, 5), stride=(1, 1), padding=(0, 2)) (1): BatchNorm2d(128, 0.023% Params, 196.61 KMac, 0.117% MACs, 64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) (2): ReLU(0, 0.000% Params, 98.3 KMac, 0.058% MACs, ) ) (conv4): Sequential( 20.67 k, 3.695% Params, 31.85 MMac, 18.883% MACs, (0): Conv2d(20.54 k, 3.672% Params, 31.56 MMac, 18.708% MACs, 64, 64, kernel_size=(1, 5), stride=(1, 1), padding=(0, 2)) (1): BatchNorm2d(128, 0.023% Params, 196.61 KMac, 0.117% MACs, 64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) (2): ReLU(0, 0.000% Params, 98.3 KMac, 0.058% MACs, ) ) (transition): Sequential( 98.69 k, 17.639% Params, 12.65 MMac, 7.499% MACs, (0): Conv1d(98.43 k, 17.593% Params, 12.6 MMac, 7.470% MACs, 768, 128, kernel_size=(1,), stride=(1,)) (1): BatchNorm1d(256, 0.046% Params, 32.77 KMac, 0.019% MACs, 128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True) (2): ReLU(0, 0.000% Params, 16.38 KMac, 0.010% MACs, ) ) (position_encode): PositionalEncoding( 0, 0.000% Params, 0.0 Mac, 0.000% MACs, (dropout): Dropout(0, 0.000% Params, 0.0 Mac, 0.000% MACs, p=0.2, inplace=False) ) (transformer_block1): TransformerBlock( 197.89 k, 35.369% Params, 25.26 MMac, 14.978% MACs, (attention): SelfAttention( 65.66 k, 11.736% Params, 8.39 MMac, 4.973% MACs, (tokeys): Linear(16.38 k, 2.928% Params, 2.1 MMac, 1.243% MACs, in_features=128, out_features=128, bias=False) (toqueries): Linear(16.38 k, 2.928% Params, 2.1 MMac, 1.243% MACs, in_features=128, out_features=128, bias=False) (tovalues): Linear(16.38 k, 2.928% Params, 2.1 MMac, 1.243% MACs, in_features=128, out_features=128, bias=False) (dropout_attention): Dropout(0, 0.000% Params, 0.0 Mac, 0.000% MACs, p=0.2, inplace=False) (unifyheads): Linear(16.51 k, 2.951% Params, 2.1 MMac, 1.243% MACs, in_features=128, out_features=128, bias=True) ) (norm1): LayerNorm(256, 0.046% Params, 16.38 KMac, 0.010% MACs, (128,), eps=1e-05, elementwise_affine=True) (mlp): Sequential( 131.71 k, 23.541% Params, 16.84 MMac, 9.986% MACs, (0): Linear(66.05 k, 11.805% Params, 8.39 MMac, 4.974% MACs, in_features=128, out_features=512, bias=True) (1): ReLU(0, 0.000% Params, 65.54 KMac, 0.039% MACs, ) (2): Linear(65.66 k, 11.736% Params, 8.39 MMac, 4.973% MACs, in_features=512, out_features=128, bias=True) ) (norm2): LayerNorm(256, 0.046% Params, 16.38 KMac, 0.010% MACs, (128,), eps=1e-05, elementwise_affine=True) (dropout_forward): Dropout(0, 0.000% Params, 0.0 Mac, 0.000% MACs, p=0.2, inplace=False) ) (transformer_block2): TransformerBlock( 197.89 k, 35.369% Params, 25.26 MMac, 14.978% MACs, (attention): SelfAttention( 65.66 k, 11.736% Params, 8.39 MMac, 4.973% MACs, (tokeys): Linear(16.38 k, 2.928% Params, 2.1 MMac, 1.243% MACs, in_features=128, out_features=128, bias=False) (toqueries): Linear(16.38 k, 2.928% Params, 2.1 MMac, 1.243% MACs, in_features=128, out_features=128, bias=False) (tovalues): Linear(16.38 k, 2.928% Params, 2.1 MMac, 1.243% MACs, in_features=128, out_features=128, bias=False) (dropout_attention): Dropout(0, 0.000% Params, 0.0 Mac, 0.000% MACs, p=0.2, inplace=False) (unifyheads): Linear(16.51 k, 2.951% Params, 2.1 MMac, 1.243% MACs, in_features=128, out_features=128, bias=True) ) (norm1): LayerNorm(256, 0.046% Params, 16.38 KMac, 0.010% MACs, (128,), eps=1e-05, elementwise_affine=True) (mlp): Sequential( 131.71 k, 23.541% Params, 16.84 MMac, 9.986% MACs, (0): Linear(66.05 k, 11.805% Params, 8.39 MMac, 4.974% MACs, in_features=128, out_features=512, bias=True) (1): ReLU(0, 0.000% Params, 65.54 KMac, 0.039% MACs, ) (2): Linear(65.66 k, 11.736% Params, 8.39 MMac, 4.973% MACs, in_features=512, out_features=128, bias=True) ) (norm2): LayerNorm(256, 0.046% Params, 16.38 KMac, 0.010% MACs, (128,), eps=1e-05, elementwise_affine=True) (dropout_forward): Dropout(0, 0.000% Params, 0.0 Mac, 0.000% MACs, p=0.2, inplace=False) ) (global_ave_pooling): AdaptiveAvgPool1d(0, 0.000% Params, 16.38 KMac, 0.010% MACs, output_size=1) (linear): Linear(774, 0.138% Params, 774.0 Mac, 0.000% MACs, in_features=128, out_features=6, bias=True) )"
to_profile = ["(IMU_fusion_block)","(conv2)","(conv3)","(conv4)","(transition)"]#,"(transformer_block1)","(transformer_block2)","(global_ave_pooling)","(linear)"]
per = 0
for key in to_profile:
    extracted_numbers = find_percentage_before_MACs(text,key)
    #print(float(extracted_numbers[-1]))
    per = per + float(extracted_numbers[-1])
per

In [38]:
import re

def extract_numbers_from_string(s):
    """
    Extract all numbers from the given string.

    :param s: The string to extract numbers from.
    :return: A list of numbers (as strings) extracted from the string.
    """
    # Regular expression pattern for matching numbers
    pattern = r'\d+\.\d+|\d+'

    # Finding all occurrences of the pattern in the string
    numbers = re.findall(pattern, s)

    return numbers

# Example string
text = "IMU_Fusion_Block( 2.24 k, 0.401% Params, 899.46 KMac, 0.533% MACs"

# Extract numbers
extracted_numbers = extract_numbers_from_string(text)
print(extracted_numbers)

['2.24', '0.401', '899.46', '0.533']


In [40]:
import re

def find_percentage_before_MACs(text, substring):
    """
    Find the percentage value before the first occurrence of 'MACs' after a given substring.

    :param text: The text in which to search.
    :param substring: The substring after which to find the percentage.
    :return: The percentage found or a message if not found.
    """
    # Constructing the regular expression pattern
    pattern = rf"{re.escape(substring)}.*?(\d+\.\d+%).*?MACs"
    
    # Searching for the pattern in the text
    match = re.search(pattern, text)
    start = match.start()
    end = match.end()
    text = text[start:end]

    return extract_numbers_from_string(text)
    # Returning the found percentage or a message if not found
    #return match.group(1) if match else "No match found"

# Example usage
text = "(IMU_fusion_block): IMU_Fusion_Block( 2.24 k, 0.401% Params, 899.46 KMac, 0.533% MACs, (tcn_grav_convs0): Sequential( 320, 0.057% Params, 124.42 KMac, 0.074% MACs, (0): Conv2d(320, 0.057% Params, 99.84 KMac, 0.059% MACs, 1, 64, kernel_size=(1, 3), stride=(1, 1), padding=(0, 2)) (1): Chomp2d(0, 0.000% Params, 0.0 Mac, 0.000% MACs, ) (2): ReLU(0, 0.000% Params, 24.58 KMac, 0.015% MACs, )"
substring = "tcn_grav_convs0"

percentage = find_percentage_before_MACs(text, substring)
print(percentage[-4:])

['320', '0.057', '124.42', '0.074']
