## Импорт

In [1]:
# Работа с табличными данными

import pandas as pd
import numpy as np


# Визуализация

import plotly.express as px
import plotly.io as pio
pio.templates.default = 'plotly_white'
pio.renderers.default = 'notebook'
from motorica.emg8.utils import fig_montage # кастомная функция визуализации


# Пайплайн

# константы
from motorica.emg8.constants import *
# чтение данных и разметка по фактическим жестам
from motorica.emg8.markers import TransMarker, BaseMarker, FullMarker
from motorica.emg8.pipeline import read_emg8, score_montages
# создание экземпляра пайплайна
from motorica.emg8.pipeline import create_logreg_pipeline


# Метрики
from sklearn.metrics import classification_report, f1_score
from motorica.emg8.pipeline import f1_score_shifted, get_total_shift, shift_target


# Для вывода докстрингов с форматированием markdown
from IPython.display import Markdown as md

# Работа с файлами
import os

# Для оценки скорости инференса
from time import time

# Сериализация модели (пайплайна)
import pickle

## Разметка данных

### Исходные данные

In [None]:
# Папка с файлами данных
DATA_DIR = 'data/plt_mb/all'
# DATA_DIR = 'data/plt_2'
# DATA_DIR = 'data/meeting'
# DATA_DIR = 'data/plt_misc'

montages = sorted(filter(lambda f: f.endswith('.emg8'), os.listdir(DATA_DIR)))
montages

In [None]:
montage = montages[0]
full_path = os.path.join(DATA_DIR, montage)
gestures_raw = pd.read_csv(full_path, sep=' ')
fig_montage(
    gestures_raw[OMG_CH], y_cmd=gestures_raw['id'], 
    title=f"<i>{full_path}</i> – исходные данные"
).show()

In [None]:
gestures_omg = gestures_raw[OMG_CH]
from motorica.emg8.pipeline import NoiseReduction
gestures_smth = NoiseReduction(5).fit_transform(gestures_omg.to_numpy())

fig_montage(
    gestures_smth, y_cmd=gestures_raw['id'], 
    title=f"<i>{full_path}</i> – исходные данные"
).show()

In [None]:
from motorica.emg8.pipeline import Gradients, RatioToPrev, DiffWithPrev

fig_montage(
    DiffWithPrev(150, oper='add', avg='median').fit_transform(gestures_smth), y_cmd=gestures_raw['id'], 
    title=f"<i>{full_path}</i> – исходные данные"
).show()

### Разметка по классам основных жестов

In [None]:
n_holdout_groups = 1 # отложенная выборка - последний цикл протокола

marker = BaseMarker(use_peaks='std', bounds_shift=0)

X_train, X_test, y_train, y_test, cv_groups, gestures = read_emg8(
    montage, dir=DATA_DIR,
    n_holdout_groups=n_holdout_groups,
    marker=marker
)

last_train_idx = gestures[GROUP_COL].drop_duplicates().index[-n_holdout_groups] - 1

print('X_train shape:', X_train.shape)
print('y_train shape:', y_train.shape)
print('X_test shape:', X_test.shape)
print('y_test shape:', y_test.shape)

GESTURES = marker.states.index

display(marker.states)

fig_montage(
    gestures[OMG_CH], 
    y_cmd=gestures[CMD_COL], 
    y_act=gestures[TARGET],
    protocol_cycle=gestures[GROUP_COL],
    # grad2 = marker.grad2 * 10,
    # std_grad = marker.std1 / 4,
    # std_grad_ups = marker.peaks_std1 / 4,
    # std_grad_downs = marker.peaks_std1_neg / 4,
    title=f"<i>{full_path}</i> – разметка по фактическим границам жестов"
).show()

### Разметка расширенным набором классов: жесты и переходы

In [None]:
n_holdout_groups = 1

full_marker = FullMarker(use_peaks='std', bounds_shift=0)

X_train_full, X_test_full, y_train_full, y_test_full, cv_groups_full, gestures_full= read_emg8(
    montage, dir=DATA_DIR,
    n_holdout_groups=1,
    marker=full_marker
)

last_train_idx_full = gestures_full[GROUP_COL].drop_duplicates().index[-n_holdout_groups] - 1

print('X_train shape:', X_train_full.shape)
print('y_train shape:', y_train_full.shape)
print('X_test shape:', X_test_full.shape)
print('y_test shape:', y_test_full.shape)

display(full_marker.states)

fig_montage(
    gestures_full[OMG_CH], 
    y_cmd_full=gestures_full[CMD_COL], 
    y_act=gestures_full[TARGET],
    # std_grad = marker.std1 / 4,
    # std_grad_downs = marker.peaks_std1_neg / 4,
    # std_grad_ups = marker.peaks_std1 / 4,
    protocol_cycle=gestures_full[GROUP_COL],
    title=f"<i>{full_path}</i> – разметка жестов с переходами"
)

### Разметка только переходов

In [None]:
n_holdout_groups = 1

trans_marker = TransMarker(use_peaks='std', bounds_shift=0)

X_train_trans, X_test_trans, y_train_trans, y_test_trans, cv_groups_trans, gestures_trans= read_emg8(
    montage, dir=DATA_DIR,
    n_holdout_groups=1,
    marker=trans_marker
)

last_train_idx_trans = gestures_trans[GROUP_COL].drop_duplicates().index[-n_holdout_groups] - 1

print('X_train shape:', X_train_trans.shape)
print('y_train shape:', y_train_trans.shape)
print('X_test shape:', X_test_trans.shape)
print('y_test shape:', y_test_trans.shape)

display(trans_marker.states)

fig_montage(
    gestures_trans[OMG_CH], 
    y_cmd=gestures_trans[CMD_COL], 
    y_act=gestures_trans[TARGET],
    protocol_cycle=gestures_trans[GROUP_COL],
    title=f"<i>{full_path}</i> – разметка только переходов"
).show()

## Отбор данных

### Кросс-валидация размеченных данных с помощью логистической регерессии

In [9]:
montages_cv_scores = score_montages(DATA_DIR)

In [None]:
fig = px.bar(
    montages_cv_scores.round(2), 
    width=700, height=400,
    color=montages_cv_scores,
    # color_continuous_scale='RdBu',
    color_continuous_scale=[(0, "indianred"), (CV_SCORE_OK, "indianred"), (1, "royalblue")],
    range_color=[0, 1],
    title=f'F1-macro на кросс-валидации размеченных монтажей: <i>{DATA_DIR}</i>',
    labels={'value': 'f1 macro', 'index': ''},
    orientation='h',
    text_auto=True
)
fig.update_coloraxes(showscale=False)
fig.update_layout(margin=dict(t=70))
fig.show()

## Проверка для каждого монтажа на тестовых данных

In [None]:
all_montages = sorted(filter(lambda f: f.endswith('.emg8'), os.listdir('data/all')))
all_scores = []
model, _ = create_logreg_pipeline()
shift = get_total_shift(model)
for montage in all_montages:
    scores = []
    for marker in [BaseMarker, FullMarker]:
        X_train, X_test, y_train, y_test, cv_groups, _ = read_emg8(montage, 'data/all', n_holdout_groups=1)
        model.fit(X_train, y_train)
        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)
        scores.append((
            f1_score_shifted(y_train, y_pred_train, shift),
            f1_score_shifted(y_test, y_pred_test, shift)
        ))

In [None]:
scores = pd.DataFrame(
    scores, 
    index=all_montages, 
    columns=['f1 train (BaseMarker)', 'f1 test (BaseMarker)', 'f1 train (FullMarker)', 'f1 test (FullMarker)'])
scores

## Оффлайн инференс на отложенной выборке

In [None]:
model, score = create_logreg_pipeline()
from pprint import pprint
pprint(model.get_params())
model
# # сохранение пайплайна (сериализация)
# with open('pipeline_logreg.pkl', 'wb') as f:
#     pickle.dump(model, f)

# # восстановление пайплайна из файла
# with open('pipeline_logreg.pkl', 'rb') as f:
#     model = pickle.load(f)

### Только основные классы (без разметки переходов)

In [None]:
# Без разметки и предсказания переходов

model.set_params(model__use_trans='ignore')
model.fit(X_train, y_train)

y_test_pred = np.empty(0)
comp_durations = np.empty(0)

for i in range(X_test.shape[0]):
    start_time = time()
    y_test_pred = np.append(y_test_pred, model.predict(X_test[i]))
    comp_duration = time() - start_time
    comp_durations = np.append(comp_durations, comp_duration)

print(f"Максимальное время: {np.round(comp_durations.max() * 1000, 2)} мс")
print(f"Среднее время: {np.round(comp_durations.mean() * 1000, 2)} мс")

# print(classification_report(y_test, y_test_pred, target_names=marker.states.index, zero_division=0))

print(classification_report(
    y_true = shift_target(y_test, get_total_shift(model)), 
    y_pred = y_test_pred, 
    target_names=marker.states.index, 
    zero_division=0
))

fig = fig_montage(
    pd.DataFrame(X_test), 
    y_cmd=gestures.loc[last_train_idx + 1:, CMD_COL].reset_index(drop=True), 
    # y_true=y_test, 
    y_pred=y_test_pred,
    title=f"{full_path}<br>Результаты предсказаний в инференсе <b>без использования переходов</b>"
)
fig.show()

### Предсказание основных классов, но с использованием разметки переходов

In [None]:
# С использованием дополнительных классов переходов

model.set_params(model__use_trans='drop')
model.fit(X_train_full, y_train_full)

y_test_full[y_test_full < 0] = 0
y_test_full %= 10

y_test_pred = np.empty(0)
comp_durations = np.empty(0)

for i in range(X_test_full.shape[0]):
    start_time = time()
    y_test_pred = np.append(y_test_pred, model.predict(X_test_full[i]))
    comp_duration = time() - start_time
    comp_durations = np.append(comp_durations, comp_duration)

print(f"Максимальное время: {np.round(comp_durations.max() * 1000, 2)} мс")
print(f"Среднее время: {np.round(comp_durations.mean() * 1000, 2)} мс")

print(classification_report(
    y_true = shift_target(y_test_full, get_total_shift(model)), 
    y_pred = y_test_pred, 
    target_names=GESTURES, 
    zero_division=0
))

fig = fig_montage(
    pd.DataFrame(X_test_full), 
    y_cmd=gestures_full.loc[last_train_idx_full + 1:, CMD_COL].reset_index(drop=True), 
    # y_true=y_test_full, 
    y_pred=y_test_pred,
    title=f"{full_path}<br>Результаты предсказаний в инференсе <b>с использованием дополнительных классов переходов</b>"
)
# fig.update_xaxes(visible=False)
# fig.update_yaxes(visible=False)
fig.show()

### Предсказание только переходов

In [None]:
# С предсказанием только переходов

model.set_params(model__use_trans='keep', gradients__oper='replace')
model.fit(X_train_trans, y_train_trans)

y_test_pred = np.empty(0)
comp_durations = np.empty(0)

for i in range(X_test_trans.shape[0]):
    start_time = time()
    y_test_pred = np.append(y_test_pred, model.predict(X_test_trans[i]))
    comp_duration = time() - start_time
    comp_durations = np.append(comp_durations, comp_duration)

print(f"Максимальное время: {np.round(comp_durations.max() * 1000, 2)} мс")
print(f"Среднее время: {np.round(comp_durations.mean() * 1000, 2)} мс")

print(classification_report(
    y_true = shift_target(y_test_trans, get_total_shift(model)), 
    y_pred = y_test_pred, 
    target_names=trans_marker.states.index, 
    zero_division=0
))

fig = fig_montage(
    pd.DataFrame(X_test_trans), 
    y_cmd=gestures.loc[last_train_idx_trans + 1:, CMD_COL].reset_index(drop=True), 
    y_true=y_test_trans, y_pred=y_test_pred,
    title=f"{full_path}<br>Результаты предсказаний в инференсе <b>с предсказанием только переходов</b>"
)
fig.show()

## Генерализация модели на все монтажи пилота

In [None]:
model, _ = create_logreg_pipeline(
    gradients__n_lags=7,
)

X_train_all = np.empty((0, N_OMG_CH))
X_test_all = np.empty((0, N_OMG_CH))
y_train_all = np.empty(0)
y_test_all = np.empty(0)

# marker = TransMarker(use_peaks='std', bounds_shift=0)
marker = FullMarker(use_peaks='std', bounds_shift=0)

for montage in montages:
    if montages_cv_scores[montage] < CV_SCORE_OK:
        continue
    X_train, X_test, y_train, y_test, *_ = read_emg8(montage, dir=DATA_DIR, marker=marker, n_holdout_groups=1)
    X_train_all = np.vstack([X_train_all, X_train])
    X_test_all = np.vstack([X_test_all, X_test])
    y_train_all = np.hstack([y_train_all, y_train])
    y_test_all = np.hstack([y_test_all, y_test])

model.fit(X_train_all, y_train_all)

In [16]:
y_pred_all = model.predict(X_test_all)
y_test_all[y_test_all < 0] = 0
y_test_all %= 10
y_test_all = shift_target(y_test_all, get_total_shift(model))

In [None]:
print(classification_report(y_test_all, y_pred_all, target_names=GESTURES))

In [None]:
fig_montage(X_test_all, y_true=y_test_all, y_pred=y_pred_all)

---

In [19]:
# # # marker = TransMarker(use_peaks='std', bounds_shift=0)

# model, _ = create_logreg_pipeline(
#     gradients__n_lags=4,
# )

# marker = FullMarker(use_peaks='std', bounds_shift=0)

# for holdout_montage in montages:

#     X_train_all = np.empty((0, N_OMG_CH))
#     y_train_all = np.empty(0)

#     X_test_all, _, y_test_all, *_ = read_emg8(holdout_montage, dir=DATA_DIR, marker=marker, n_holdout_groups=0)

#     for montage in montages:
#         if montage == holdout_montage:
#             continue
#         X_train, _, y_train, *_ = read_emg8(montage, dir=DATA_DIR, marker=marker, n_holdout_groups=0)
#         X_train_all = np.vstack([X_train_all, X_train])
#         y_train_all = np.hstack([y_train_all, y_train])

#     model.fit(X_train_all, y_train_all)

#     y_pred_all = model.predict(X_test_all)
#     y_test_all[y_test_all < 0] = 0
#     y_test_all %= 10
#     y_test_all = shift_target(y_test_all, get_total_shift(model))
#     print(holdout_montage)
#     print(classification_report(y_test_all, y_pred_all, target_names=GESTURES))

#     fig_montage(X_test_all, y_true=y_test_all, y_pred=y_pred_all).write_html(holdout_montage + '.html')