In [158]:
import pandas as pd
import numpy as np

from sklearn.preprocessing import MinMaxScaler

import plotly.express as px
import plotly.io as pio
pio.templates.default = 'plotly_dark'

## Знакомство с данными

### Кодировка жестов

- `Neutral/NOGO` (все нули) - нейтральное положение кисти, кисть в расслабленном состоянии

- `Thumb` (*Thumb* равен 1) - сгиб большого пальца

- `Grab` (*Thumb*, *Index*, *Middle*, *Ring*, *Pinky* равны 1) - схват, кисть сжата в кулак

- `Open` (*Thumb_stretch*, *Index_stretch*, *Middle_stretch*, *Ring_stretch*, *Pinky_stretch* равны 1) - открытая ладонь, пальцы выпрямлены

- `OK` (*Thumb*, *Index* равны 1) - жест “Окей”

- `Pistol` *(Middle*, *Ring*, *Pinky* равны 1) - жест “Пистолет”

In [159]:
gest_labels = {
# Thumb Index Middle Ring Pinky Thumb_stretch Index_stretch Middle_stretch Ring_stretch Pinky_stretch
    '0000000000': 0, # 'NOGO'
    '1000000000': 1, # 'Thumb'
    '1111100000': 2, # 'Grab'
    '0000011111': 3, # 'Open'
    '1100000000': 4, # 'OK'
    '0011100000': 5  # 'Pistol'
}

### Дополнение метаданных

Условимся называть ***монтажом*** набор измерений, представленный в одном файле `.palm`.

Для чтения монтажей будем использовать предоставленную заказчиком функцию `read_omg_csv()`, установив дефолтные значения аргументов так, чтобы они сразу соответствовали актуальной структуре данных:

In [160]:
def read_omg_csv(
    path_palm_data: str, 
    n_omg_channels: int = 50,    # '0', ..., '49' - каналы OMG датчиков 
    n_acc_channels: int = 3,     # 'ACC0', 'ACC1', 'ACC2' - акселерометр (потенциальные факторы)
    n_gyr_channels: int = 3,     # 'GYR0', 'GYR1', 'GYR2' - гироскоп (потенциальные факторы) 
    n_mag_channels: int = 0,     #  отсутствуют в данных
    n_enc_channels: int = 6,     # 'ENC0'...'ENC5' - не используются ???
    drop_enc: bool = True,       # нужно ли удалить столбцы 'ENC'
    button_ch: bool = True,      # 'BUTTON' - не используется
    drop_button: bool = True,    # нужно ли удалить столбец 'BUTTON'
    sync_ch: bool = True,        # 'SYNC' - синхронизация данных с протоколом
    timestamp_ch: bool = True,   # 'ts' - метка времени
    drop_timestamp: bool = False,# нужно ли удалить столбец 'ts'
    label_ch: bool = False       # присутствует ли столец с меткой (кодом) жеста
) -> pd.DataFrame:
    
    '''
    Reads CSV data for OMG data
    NB: data must be separated by " " separator

        Parameters:
                path_palm_data  (str): path to csv data file
                n_omg_channels  (int): Number of OMG channels
                n_acc_channels  (int): Number of Accelerometer channels, default = 0
                n_gyr_channels  (int): Number of Gyroscope channels, default = 0
                n_mag_channels  (int): Number of Magnetometer channels, default = 0
                n_enc_channels  (int): Number of Encoder channels, default = 0
                button_ch      (bool): If button channel is present, default = True
                sync_ch        (bool): If synchronization channel is present, default = True
                timestamp_ch   (bool): If timestamp channel is present, default = True
                label_ch       (bool): If label channel is present, default = False

        Returns:
                df_raw (pd.DataFrame): Parsed pandas Dataframe with OMG data
    '''
    
    df_raw = pd.read_csv(path_palm_data, sep=' ', 
                         header=None, 
                         skipfooter=1, 
                         skiprows=1, 
                         engine='python')
    columns = np.arange(n_omg_channels).astype('str').tolist()
    
    for label, label_count in zip(['ACC', 'GYR', 'MAG', 'ENC'], 
                                  [n_acc_channels, n_gyr_channels, n_mag_channels, n_enc_channels]):
        columns = columns + ['{}{}'.format(label, i) for i in range(label_count)]
        
    if button_ch:
        columns = columns + ['BUTTON']
        
    if sync_ch:
        columns = columns + ['SYNC']
        
    if timestamp_ch:
        columns = columns + ['ts']

    if label_ch:
        columns = columns + ['label']
        
    df_raw.columns = columns

    if drop_enc:
        enc_columns = [f"ENC{i}" for i in range(n_enc_channels)]
        df_raw.drop(enc_columns, axis=1, inplace=True)

    if drop_button:
        df_raw.drop('BUTTON', axis=1, inplace=True)

    if drop_timestamp:
        df_raw.drop('ts', axis=1, inplace=True)
    
    return df_raw

In [161]:
cols_omg = list(map(str, range(50)))

cols_gyr = [f'GYR{i}' for i in range(3)]

cols_acc = [f'ACC{i}' for i in range(3)]

Подгрузим предоставленное заказчиком описание (`meta_information.csv`) имеющихся данных:

In [162]:
meta_info = pd.read_csv('data/meta_information.csv', index_col=1).drop('Unnamed: 0', axis=1)
display(meta_info.head(3))

print("Количество монтажей:", meta_info.shape[0])
print("Количество пилотов, с которых снимались данные:", meta_info['pilote_id'].nunique())

Unnamed: 0_level_0,pilote_id,last_train_idx,len(train),len(test)
montage,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2023-05-15_16-16-08.palm,1,23337,23337,5810
2023-05-15_17-12-24.palm,1,23336,23336,5803
2023-06-05_16-12-38.palm,1,17939,17939,4431


Количество монтажей: 31
Количество пилотов, с которых снимались данные: 4


Дополним метаданные следующей информацией:

- периодичность измерений (*мс*) – разность между соседними метками времени

- количество измерений на один жест

- количество выполненных жестов

Попутно добавим в файлы с измерениями метки выполняемых жестов (по факту поступления команды в соответствии с протоколом).

In [163]:
# Поочередно для каждого набора данных добавим в метаданные:
for montage in meta_info.index:
    data = read_omg_csv(f"data/{montage}")
    protocol = pd.read_csv(f"data/{montage}.protocol.csv")
    
    # 1) периодичность измерений – разность между соседними метками времени
    ts_delta = (data['ts'].shift(-1) - data['ts']).value_counts().index.to_list()
    meta_info.loc[montage, 'ts_delta'] = ts_delta

    # 2) среднее кол-во измерений на один (не нейтральный) жест
    ticks_per_gest = data.groupby('SYNC')['ts'].count().median().round(2)
    meta_info.loc[montage, 'ticks_per_gest'] = ticks_per_gest

    # 3) кол-во выполненных жестов
    n_gestures = protocol.shape[0]
    meta_info.loc[montage, 'n_gestures'] = n_gestures

    # проставим в измерения метки жестов и пронацию (Pronation)
    protocol['label'] = protocol.iloc[:, 1: 11].astype(int).astype(str).sum(axis=1).apply(lambda x: gest_labels[x])
    data = pd.merge(
        data, protocol[['epoch', 'Pronation', 'label']], 
        how='left', left_on='SYNC', right_on='epoch'
    ).drop('epoch', axis=1)
    data.to_csv(f"montages/{montage}.label", sep=',', index=None)

meta_info.index = [montage + '.label' for montage in meta_info.index]
meta_info['pilote_id'] = meta_info['pilote_id'].astype(str)
meta_info.to_csv('montages/meta_info_extended.csv')

In [164]:
print(f"Периодичность измерений: {'/'.join(map(str, meta_info['ts_delta'].unique()))} мс")
print(f"Медианное количество измерений на один жест: \
{', '.join(map(str, sorted(meta_info['ticks_per_gest'].astype(int).unique())))}")

Периодичность измерений: 33.0 мс
Медианное количество измерений на один жест: 19, 30, 31, 46, 60, 61


In [165]:
fig_data = meta_info.sort_values(['pilote_id', 'ticks_per_gest'], ascending=False).reset_index()
fig_data['index'] = fig_data['index'].apply(lambda x: x.split('.')[0])
fig = px.bar(
    fig_data,
    x = 'ticks_per_gest',
    color = 'pilote_id',
    width=660, height=800,
    labels={'ticks_per_gest': 'количество измерений на один жест',
            'pilote_id': 'пилот',
            '_index': 'монтаж'},
    text='index',
    title='Количество измерений на жест в предоставленных данных'
)
fig.update_yaxes(showticklabels=False)
fig.show()

Видим, что:

1. Периодичность измерений во всех монтажах одинакова и составляет 33 мс

2. В разных монтажах команды на выполнение жестов поступают с разной скоростью: от 19 до 61 измерения на один жест. При этом большинству монтажей соответствует скорость 30 измерений на один жест (одна команда в секунду)

### Датчики с высоким и низким уровнем сигнала

Посмотрим на некоторый интервал рядов показаний датчиков какого-либо монтажа:

In [213]:
np.random.seed(42)
data = pd.read_csv('montages/' + np.random.choice(meta_info.index)).loc[:1000, cols_omg]
fig = px.line(data, width=1000, height=600)
fig.update_traces(line=dict(width=1))
fig.show()

Как можно убедиться, одни датчики выдают более *сильный* сигнал, чем другие. Причем сигналы датчиков *сильной* группы на первый взгляд кажутся более информативными, чем шумные и "невнятные" сигналы *слабых* датчиков.

Посмотрим на медианные значения датчиков в приведенном фрагменте данных:

In [187]:
data_median = data.median()

fig = px.bar(data_median, width=1000, height=400)
fig.update_layout(showlegend=False)
fig.show()

print("Датчики с высоким уровнем сигнала:")
print(*data_median[data_median > data_median.mean()].index)


Датчики с высоким уровнем сигнала:
3 4 5 6 12 13 16 17 21 22 27 28 30 31 38 39


Однако в других монтажах картина может быть иной.

Чтобы проверить это, визуализируем медианы показаний датчиков по всем монтажам:

In [226]:
mont_medians = []
for montage in meta_info.index:
    data = pd.read_csv('montages/' + montage)[cols_omg]
    mont_medians.append(data.median().values.tolist())
mont_medians = np.array(mont_medians)
mont_medians = MinMaxScaler().fit_transform(mont_medians.T).T

mont_medians = np.append(mont_medians, [mont_medians.max(axis=0)], axis=0)

fig_data = pd.DataFrame(
    mont_medians,
    index=[name.split('.')[0] for name in meta_info.index] + ['максимум по монтажам'],
    columns=cols_omg
)

In [227]:
fig = px.imshow(
    fig_data,
    width=1200, height=700,
    title="Медианы показаний датчиков по монтажам"
)
fig.update_coloraxes(showscale=False)
fig.update_layout(margin=dict(l=50, r=50, t=70, b=40), title_x=0.5)
fig.show()

Можно увидеть, что в разных монтажах разные датчики являются *сильными*.

Несколько датчиков оказались слабыми во всех монтажах:

In [238]:
max_in_montages = fig_data.iloc[-1, :]
low_val_sensors = max_in_montages[max_in_montages < 0.2].index.tolist()
print(len(low_val_sensors), 'датчиков являются "слабыми" во всех монтажах:', *low_val_sensors)

17 датчиков являются "слабыми" во всех монтажах: 2 14 19 24 25 29 32 40 41 42 43 44 45 46 47 48 49


In [154]:
for montage in meta_info.index:

    pilote_id = meta_info.loc[montage, 'pilote_id']
    title = ', '.join([
        montage.split('.')[0], 'pilote ' + pilote_id, 
    ])

    data = pd.read_csv("montages/" + montage).drop(['SYNC', 'ts'], axis=1)

    data_omg = data[cols_omg]
    omg_medians = data_omg.median()
    #omg_noise_to_signal = data_omg.std() / data_omg.mean()
    #omg_medians -= omg_medians.min()

    fig = px.bar(
        omg_medians, width=900, height=250,
        title=title,
        #color=omg_noise_to_signal,
        #log_y=True
    )
    fig.update_layout(showlegend=False, margin=dict(l=50, t=50, b=30))
    fig.add_hline(omg_medians.mean(), line=dict(width=1, dash='dot'))
    fig.show()

In [48]:
def gradient_maximums(
    X: np.ndarray | pd.DataFrame,
    sample_size: int = 61,
    spacing: int = 5,
    scale: bool = True,
    window: int = 5,
    ingnore_n_left: int = 50,
    offset = 0
) -> np.ndarray[int]:
    '''
    Возвращает список номеров строк двумерного массива *X*, 
    соответствующих локальным максимумам суммарного градиента столбцов в условных наблюдениях.
    ## Параметры
    **X**: *np.ndarray* | *pd.DataFrame*<br>двумерный массив, в столбцах которого записаны числовые последовательности

    **sample_size**: *int, default=100*<br>размер кадра

    **spacing**: *int, default=5*<br>параметр `spacing` для функции `numpy.gradient()`

    **scale**: *bool, default=True*<br>выполнять ли предварительное шкалирование

    **smoothing_window**: *int, default=10*<br>ширина окна для сглаживания последовательностей по медиане; `0` - без сглаживания

    **ingnore_n_left**: *int, default=50*<br>не искать точку максимума в *ingnore_n_left* измерений слева
    
    **offset**: *int, default=0*<br>Смещение (ручная подстройка): сдвинуть итоговый результат на заданное кол-во шагов
    
    ## Возвращаемый результат
    Массив индексов измерений соответстующих локальным максимумам суммарного градиента
    '''
    res = []

    X_copy = np.array(X)

    # Сглаживание
    if window:
        X_copy = pd.DataFrame(X_copy).rolling(window).median()

    # Шкалирование
    if scale:
        X_copy = MinMaxScaler().fit_transform(X_copy)

    # Вычисляем абсолютный суммарный градиент
    grad = np.sum(np.abs(np.gradient(X_copy, spacing, axis=0)), axis=1)
    grad = np.nan_to_num(grad)

    # Проходим все наблюдения по порядку
    for start in range(0, X_copy.shape[0], sample_size):
        # и добавляем к результату точку максимума градиента внутри текущего наблюдения
        max_i = np.argmax(grad[start + ingnore_n_left: start + sample_size]) + ingnore_n_left
        res.append(start + max_i)

    return np.array(res) + offset