# Dataset Analysis

In [None]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
import seaborn as sns

## Creating a Full Dataset

In [None]:
column_names = [
    "timestamp",
    "activity_id", 
    "heart_rate",
    "IMU_hand_temperature",
    "IMU_hand_3D_acceleration_data_16g_1",
    "IMU_hand_3D_acceleration_data_16g_2",
    "IMU_hand_3D_acceleration_data_16g_3",
    "IMU_hand_3D_acceleration_data_6g_1",
    "IMU_hand_3D_acceleration_data_6g_2",
    "IMU_hand_3D_acceleration_data_6g_3",
    "IMU_hand_3D_gyroscope_data_1",
    "IMU_hand_3D_gyroscope_data_2",
    "IMU_hand_3D_gyroscope_data_3",
    "IMU_hand_3D_magnetometer_data_1",
    "IMU_hand_3D_magnetometer_data_2",
    "IMU_hand_3D_magnetometer_data_3",
    "IMU_hand_orientation_1",
    "IMU_hand_orientation_2",
    "IMU_hand_orientation_3",
    "IMU_hand_orientation_4",
    "IMU_chest_temperature",
    "IMU_chest_3D_acceleration_data_16g_1",
    "IMU_chest_3D_acceleration_data_16g_2",
    "IMU_chest_3D_acceleration_data_16g_3",
    "IMU_chest_3D_acceleration_data_6g_1",
    "IMU_chest_3D_acceleration_data_6g_2",
    "IMU_chest_3D_acceleration_data_6g_3",
    "IMU_chest_3D_gyroscope_data_1",
    "IMU_chest_3D_gyroscope_data_2",
    "IMU_chest_3D_gyroscope_data_3",
    "IMU_chest_3D_magnetometer_data_1",
    "IMU_chest_3D_magnetometer_data_2",
    "IMU_chest_3D_magnetometer_data_3",
    "IMU_chest_orientation_1",
    "IMU_chest_orientation_2",
    "IMU_chest_orientation_3",
    "IMU_chest_orientation_4",
    "IMU_ankle_temperature",
    "IMU_ankle_3D_acceleration_data_16g_1",
    "IMU_ankle_3D_acceleration_data_16g_2",
    "IMU_ankle_3D_acceleration_data_16g_3",
    "IMU_ankle_3D_acceleration_data_6g_1",
    "IMU_ankle_3D_acceleration_data_6g_2",
    "IMU_ankle_3D_acceleration_data_6g_3",
    "IMU_ankle_3D_gyroscope_data_1",
    "IMU_ankle_3D_gyroscope_data_2",
    "IMU_ankle_3D_gyroscope_data_3",
    "IMU_ankle_3D_magnetometer_data_1",
    "IMU_ankle_3D_magnetometer_data_2",
    "IMU_ankle_3D_magnetometer_data_3",
    "IMU_ankle_orientation_1",
    "IMU_ankle_orientation_2",
    "IMU_ankle_orientation_3",
    "IMU_ankle_orientation_4"
]

col_for_drop = [col for col in column_names if col.endswith(tuple(f"orientation_{i}" for i in range(1, 5)))]
col_for_drop += [col for col in column_names if col.endswith(tuple(f"acceleration_data_6g_{i}" for i in range(1, 4)))]

In [None]:
person1 = pd.read_csv("Protocol/subject101.dat", delimiter=" ", names=column_names, header=None)
person2 = pd.read_csv("Protocol/subject102.dat", delimiter=" ", names=column_names, header=None)
person3 = pd.read_csv("Protocol/subject103.dat", delimiter=" ", names=column_names, header=None)
person4 = pd.read_csv("Protocol/subject104.dat", delimiter=" ", names=column_names, header=None)
person5 = pd.read_csv("Protocol/subject105.dat", delimiter=" ", names=column_names, header=None)
person6 = pd.read_csv("Protocol/subject106.dat", delimiter=" ", names=column_names, header=None)
person7 = pd.read_csv("Protocol/subject107.dat", delimiter=" ", names=column_names, header=None)
person8 = pd.read_csv("Protocol/subject108.dat", delimiter=" ", names=column_names, header=None)
person9 = pd.read_csv("Protocol/subject109.dat", delimiter=" ", names=column_names, header=None)

person1_additional = pd.read_csv("Optional/subject101.dat", delimiter=" ", names=column_names, header=None)
person5_additional = pd.read_csv("Optional/subject105.dat", delimiter=" ", names=column_names, header=None)
person6_additional = pd.read_csv("Optional/subject106.dat", delimiter=" ", names=column_names, header=None)
person8_additional = pd.read_csv("Optional/subject108.dat", delimiter=" ", names=column_names, header=None)
person9_additional = pd.read_csv("Optional/subject109.dat", delimiter=" ", names=column_names, header=None)

## Data Preprocessing

***
### Заполнение пульса:
***

In [None]:
df_list = [person1, person2, person3, person4, person5, person6, person7, person8, person9, 
           person1_additional, person5_additional, person6_additional, person8_additional, person9_additional]

for df in df_list:
    hr_positions = np.flatnonzero(df['heart_rate'].notna())
    if len(hr_positions) < 2:
        continue
    first_idx, second_idx = hr_positions[0], hr_positions[1]
    hr_1 = df['heart_rate'].iat[first_idx]
    hr_2 = df['heart_rate'].iat[second_idx]
    for pos in hr_positions[2:]:
        fill_val = (hr_1 + hr_2) / 2
        df.loc[first_idx + 1 : second_idx, 'heart_rate'] = (
            df.loc[first_idx + 1 : second_idx, 'heart_rate'].fillna(fill_val)
        )
        first_idx, second_idx = second_idx, pos
        hr_1, hr_2 = hr_2, df['heart_rate'].iat[pos]
    df.loc[second_idx + 1 :, 'heart_rate'] = (
        df.loc[second_idx + 1 :, 'heart_rate'].fillna(hr_2)
    )

## Dataset Splitting with Temporal Consistency

***
### Создание df_all с учетом деления на людей и временной согласованности:
***

In [None]:
all_classes = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 16, 17, 18, 19, 20, 24]

subjects = {
    1: [person1, person1_additional],
    2: [person2],
    3: [person3],
    4: [person4],
    5: [person5, person5_additional],
    6: [person6, person6_additional],
    7: [person7],
    8: [person8, person8_additional],
    9: [person9, person9_additional]
}

df_all = pd.DataFrame()
for subject_id, dfs in subjects.items():
    df_subject = pd.concat(dfs, axis=0, ignore_index=True)
    df_subject['subject_id'] = subject_id
    df_all = pd.concat([df_all, df_subject], axis=0, ignore_index=True)

df_all = df_all.drop(columns=col_for_drop)
df_all = df_all.dropna()
df_all = df_all[df_all["activity_id"] != 0]

# Проверка активностей:
unique_activities = sorted(df_all['activity_id'].unique())
print(f"Unique activities in combined dataset: {unique_activities}")
if set(unique_activities) == set(all_classes):
    print("All required classes are present in the combined dataset.")
else:
    print(f"Warning: Missing classes {set(all_classes) - set(unique_activities)}")

***
### Мини-EDA для датасета:
***

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import chi2_contingency
import warnings
warnings.filterwarnings('ignore')

print("Размер датасета:", df_all.shape)
print("\nПервые 5 строк:")
print(df_all.head())
print("\nТипы данных:")
print(df_all.dtypes)

numeric_cols = df_all.select_dtypes(include=[np.number]).columns.tolist()
numeric_cols.remove('activity_id')
numeric_cols.remove('subject_id')
categorical_cols = ['activity_id', 'subject_id']
print("\nЧисловые признаки:", numeric_cols)
print("Категориальные признаки:", categorical_cols)

In [None]:
# Матрица корреляций для числовых признако
plt.figure(figsize=(12, 8))
corr_matrix = df_all[numeric_cols].corr()
sns.heatmap(corr_matrix, annot=False, cmap='coolwarm', vmin=-1, vmax=1)
plt.title('Correlation Matrix of Numeric Features')
plt.show()

corr_pairs = corr_matrix.unstack()
high_corr = corr_pairs[(abs(corr_pairs) > 0.7) & (abs(corr_pairs) < 1)].sort_values(ascending=False)
print("\nВысокие корреляции (|corr| > 0.7):")
print(high_corr)

In [None]:
# Распределение activity_id
plt.figure(figsize=(10, 6))
sns.countplot(x='activity_id', data=df_all, order=sorted(df_all['activity_id'].unique()))
plt.title('Distribution of Activity Classes')
plt.xlabel('Activity ID')
plt.ylabel('Count')
plt.xticks(rotation=45)
plt.show()

# Распределение activity_id по субъектам
plt.figure(figsize=(12, 6))
sns.countplot(x='activity_id', hue='subject_id', data=df_all, order=sorted(df_all['activity_id'].unique()))
plt.title('Activity Distribution by Subject')
plt.xlabel('Activity ID')
plt.ylabel('Count')
plt.legend(title='Subject ID')
plt.xticks(rotation=45)
plt.show()

# Доля классов
class_counts = df_all['activity_id'].value_counts(normalize=True)
print("\nДоля каждого класса activity_id:")
print(class_counts)

In [None]:
accel_cols = [col for col in numeric_cols if 'accel' in col and 'hand' in col]
print("Выбранные признаки для анализа динамики:", accel_cols)

subject_1 = df_all[df_all['subject_id'] == 1].copy()
subject_1['time'] = subject_1.index

plt.figure(figsize=(14, 6))
for col in accel_cols:
    sns.lineplot(x='time', y=col, hue='activity_id', data=subject_1, alpha=0.5)
plt.title('Temporal Dynamics of Hand Accelerometer Features (Subject 1)')
plt.xlabel('Time Index')
plt.ylabel('Acceleration')
plt.legend(title='Activity ID', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()

In [None]:
# Boxplot для топ-признаков
plt.figure(figsize=(14, 6))
for i, col in enumerate(top_numeric_cols[:3], 1):
    plt.subplot(1, 3, i)
    sns.boxplot(x='activity_id', y=col, data=df_all)
    plt.title(f'Boxplot of {col}')
    plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# Violin Plot
plt.figure(figsize=(14, 6))
for i, col in enumerate(top_numeric_cols[:3], 1):
    plt.subplot(1, 3, i)
    sns.violinplot(x='activity_id', y=col, data=df_all)
    plt.title(f'Violin Plot of {col}')
    plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

In [None]:
from statsmodels.graphics.mosaicplot import mosaic

# Мозаичный плот для activity_id и subject_i
plt.figure(figsize=(10, 6))
mosaic(df_all, ['activity_id', 'subject_id'], title='Mosaic Plot of Activity ID vs Subject ID')
plt.show()

In [None]:
# Проверка аномалий (например, экстремальные значения)
from scipy.stats import zscore
z_scores = df_all[numeric_cols].apply(zscore)
outliers = (z_scores.abs() > 3).sum()
print("\nКоличество выбросов (|z-score| > 3) по признакам:")
print(outliers[outliers > 0])

***
### Разделение на трейн и тест с учетом времени (классы активности, которые не попали - потом добавляются по маске и заново сортируются по людям и времени для сохранения порядка):
***

In [None]:
import pandas as pd

all_classes = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 16, 17, 18, 19, 20, 24]

# Убедимся, что df_all отсортирован по времени
df_all = df_all.sort_values(by=['subject_id', 'timestamp'])

train_df = pd.DataFrame()
test_df = pd.DataFrame()

# Сортируем subject_id по первой временной метке
subject_order = df_all.groupby('subject_id')['timestamp'].min().sort_values().index

for subject_id in subject_order:
    df_subject = df_all[df_all['subject_id'] == subject_id]
    split_idx = int(len(df_subject) * 0.8)
    if split_idx == 0 or split_idx >= len(df_subject):
        print(f"Warning: Subject {subject_id} has insufficient data for splitting.")
        continue
    train_part = df_subject.iloc[:split_idx]
    test_part = df_subject.iloc[split_idx:]
    train_df = pd.concat([train_df, train_part], axis=0, ignore_index=True)
    test_df = pd.concat([test_df, test_part], axis=0, ignore_index=True)

print(f"Train set size: {len(train_df)}, Test set size: {len(test_df)}")
train_classes = sorted(train_df['activity_id'].unique())
test_classes = sorted(test_df['activity_id'].unique())
print(f"Train classes: {train_classes}")
print(f"Test classes: {test_classes}")

missing_train = set(all_classes) - set(train_classes)
missing_test = set(all_classes) - set(test_classes)
if missing_train or missing_test:
    print(f"Warning: Missing classes in train: {missing_train}, test: {missing_test}")
    for activity in missing_train.union(missing_test):
        activity_data = df_all[df_all['activity_id'] == activity]
        if len(activity_data) > 0:
            split_idx = int(len(activity_data) * 0.8)
            if split_idx == 0:
                split_idx = 1
            if split_idx >= len(activity_data):
                split_idx = len(activity_data) - 1
            train_part = activity_data.iloc[:split_idx]
            test_part = activity_data.iloc[split_idx:]
            train_df = pd.concat([train_df, train_part], axis=0, ignore_index=True)
            test_df = pd.concat([test_df, test_part], axis=0, ignore_index=True)

# Сортируем итоговые наборы данных по subject_id, а затем по времен
train_df = train_df.sort_values(by=['subject_id', 'timestamp'])
test_df = test_df.sort_values(by=['subject_id', 'timestamp'])

train_classes = sorted(train_df['activity_id'].unique())
test_classes = sorted(test_df['activity_id'].unique())
print(f"Final train classes: {train_classes}")
print(f"Final test classes: {test_classes}")
if set(train_classes) == set(test_classes) == set(all_classes):
    print("All classes are present in both train and test sets.")
else:
    print(f"Warning: Still missing classes in train: {set(all_classes) - set(train_classes)}, test: {set(all_classes) - set(test_classes)}")

## Feature Engineering

In [None]:
for df in [train_df, test_df]:
    df["sum_temperature"] = (
        df["IMU_hand_temperature"] +
        df["IMU_chest_temperature"] +
        df["IMU_ankle_temperature"]
    )
    df["sum_hand_acceleration"] = (
        df["IMU_hand_3D_acceleration_data_16g_1"] +
        df["IMU_hand_3D_acceleration_data_16g_2"] +
        df["IMU_hand_3D_acceleration_data_16g_3"]
    )
    df["sum_chest_acceleration"] = (
        df["IMU_chest_3D_acceleration_data_16g_1"] +
        df["IMU_chest_3D_acceleration_data_16g_2"] +
        df["IMU_chest_3D_acceleration_data_16g_3"]
    )
    df["sum_ankle_acceleration"] = (
        df["IMU_ankle_3D_acceleration_data_16g_1"] +
        df["IMU_ankle_3D_acceleration_data_16g_2"] +
        df["IMU_ankle_3D_acceleration_data_16g_3"]
    )
    df["heart_rate_rolling_mean"] = df["heart_rate"].rolling(window=5, min_periods=1).mean()

## Encoding and Preparation for Modeling

In [None]:
X_train = train_df.drop(columns=["activity_id", "subject_id"])
y_train = train_df["activity_id"]
X_test = test_df.drop(columns=["activity_id", "subject_id"])
y_test = test_df["activity_id"]

from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
y_train_enc = le.fit_transform(y_train)
y_test_enc = le.transform(y_test)

## Sequence Preparation for HMM (пока не нужно)

In [None]:
import numpy as np

def create_sequences(X, y, window_size=100):
    sequences = []
    labels = []
    X_values = X.values if hasattr(X, 'values') else X
    y_values = np.asarray(y)
    
    for i in range(0, len(X) - window_size + 1, window_size):
        sequences.append(X_values[i:i + window_size])
        labels.append(np.bincount(y_values[i:i + window_size].astype(int)).argmax())
    return np.array(sequences), np.array(labels)

X_train_seq, y_train_seq = create_sequences(X_train, y_train_enc)
X_test_seq, y_test_seq = create_sequences(X_test, y_test_enc)

print(f"Train sequences: {X_train_seq.shape}, Test sequences: {X_test_seq.shape}")

***
### Обучение моделек на подвыборке датасета:
***

***
#### XGBoost:
***

In [None]:
import numpy as np
from sklearn.preprocessing import LabelEncoder, RobustScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report
from xgboost import XGBClassifier
from sklearn.utils import resample

# Подвыборка 30% тренировочных данных для ускорения
X_train_sub, y_train_sub = resample(X_train, y_train, n_samples=int(0.3 * len(X_train)), random_state=42)

# Кодирование меток
train_classes = np.unique(y_train_sub)
le = LabelEncoder().fit(train_classes)
y_train_enc = le.transform(y_train_sub)

# Фильтрация тестового набора (только классы из тренировочного)
test_mask = np.isin(y_test, train_classes)
X_test_filt = X_test[test_mask]
y_test_filt = y_test[test_mask]

# Пайплайн с масштабированием и моделью
pipeline = Pipeline([
    ('scaler', RobustScaler()),
    ('classifier', XGBClassifier(
        objective='multi:softprob',
        n_estimators=100,  # Уменьшено для скорости
        learning_rate=0.1,
        max_depth=10,  # Уменьшено
        random_state=42
    ))
])

# Обучение
pipeline.fit(X_train_sub, y_train_enc)

# Предсказания
y_pred_enc = pipeline.predict(X_test_filt)
y_pred = le.inverse_transform(y_pred_enc)

# Оценка
print("XGBoost Classification Report:")
print(classification_report(y_test_filt, y_pred))

***
#### XGBoost Random Search:
***

In [None]:
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from sklearn.model_selection import RandomizedSearchCV
from xgboost import XGBClassifier
from sklearn.utils import resample

# Подвыборка 30% тренировочных данных
X_train_sub, y_train_sub = resample(X_train, y_train, n_samples=int(0.3 * len(X_train)), random_state=42)

# Кодирование меток
train_classes = np.unique(y_train_sub)
le = LabelEncoder().fit(train_classes)
y_train_enc = le.transform(y_train_sub)

# Фильтрация тестового набора
mask = np.isin(y_test, train_classes)
X_test_filt = X_test[mask]
y_test_filt = y_test[mask]

# Модель
model = XGBClassifier(objective='multi:softprob', random_state=42)

# Упрощенный поиск гиперпараметров
param_dist = {
    'n_estimators': [50, 100, 200],
    'learning_rate': [0.05, 0.1]
}

# Стохастический поиск
random_search = RandomizedSearchCV(
    estimator=model,
    param_distributions=param_dist,
    n_iter=4,  # Меньше итераций
    scoring='accuracy',
    cv=2,  # Меньше фолдов
    n_jobs=-1,
    verbose=1
)

# Обучение
random_search.fit(X_train_sub, y_train_enc)

# Лучшая модель
best_model = random_search.best_estimator_
print("Best Hyperparameters:", random_search.best_params_)

# Предсказания
y_pred_enc = best_model.predict(X_test_filt)
y_pred = le.inverse_transform(y_pred_enc)

# Оценка
print("XGBoost Random Search Classification Report:")
print(classification_report(y_test_filt, y_pred))

***
#### Random Forest:
***

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from sklearn.ensemble import RandomForestClassifier
from sklearn.utils import resample

# Подвыборка 30% тренировочных данных
X_train_sub, y_train_sub = resample(X_train, y_train, n_samples=int(0.3 * len(X_train)), random_state=42)

# Кодирование меток
le = LabelEncoder().fit(all_classes)
y_train_enc = le.transform(y_train_sub)
y_test_enc = le.transform(y_test)

# Модель
model = RandomForestClassifier(
    n_estimators=100,  # Уменьшено
    max_depth=8,  # Уменьшено
    random_state=42,
    n_jobs=-1
)

# Обучение
model.fit(X_train_sub, y_train_enc)

# Предсказания
y_pred_enc = model.predict(X_test)
y_pred = le.inverse_transform(y_pred_enc)

# Оценка
print("Random Forest Classification Report:")
print(classification_report(y_test, y_pred))

# Важность признаков
importances = model.feature_importances_
features = X_train.columns
plt.figure(figsize=(8, 5))
plt.barh(features, importances)
plt.xlabel("Importance Score")
plt.title("Feature Importances (Random Forest)")
plt.tight_layout()
plt.show()

***
#### Logistic Regression:
***

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder, RobustScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.utils import resample

# Подвыборка 30% тренировочных данных
X_train_sub, y_train_sub = resample(X_train, y_train, n_samples=int(0.3 * len(X_train)), random_state=42)

# Масштабирование
scaler = RobustScaler()
X_train_scaled = scaler.fit_transform(X_train_sub)
X_test_scaled = scaler.transform(X_test)

# Кодирование меток
le = LabelEncoder().fit(all_classes)
y_train_enc = le.transform(y_train_sub)
y_test_enc = le.transform(y_test)

# Модель
model = LogisticRegression(
    max_iter=200,  # Уменьшено
    random_state=42
)

# Обучение
model.fit(X_train_scaled, y_train_enc)

# Предсказания
y_pred_enc = model.predict(X_test_scaled)
y_pred = le.inverse_transform(y_pred_enc)

# Оценка
print("Logistic Regression Classification Report:")
print(classification_report(y_test, y_pred))

# Важность признаков
coefficients = np.abs(model.coef_).mean(axis=0)
features = X_train.columns
plt.figure(figsize=(8, 5))
plt.barh(features, coefficients)
plt.xlabel("Coefficient Magnitude")
plt.title("Feature Importances (Logistic Regression)")
plt.tight_layout()
plt.show()

***
#### CatBoost:
***

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from catboost import CatBoostClassifier
from sklearn.utils import resample

# Подвыборка 30% тренировочных данных
X_train_sub, y_train_sub = resample(X_train, y_train, n_samples=int(0.3 * len(X_train)), random_state=42)

# Кодирование меток
le = LabelEncoder().fit(all_classes)
y_train_enc = le.transform(y_train_sub)
y_test_enc = le.transform(y_test)

# Модель
model = CatBoostClassifier(
    iterations=100,  # Уменьшено
    depth=4,  # Уменьшено
    learning_rate=0.1,
    loss_function='MultiClass',
    verbose=False,
    random_seed=42
)

# Обучение
model.fit(X_train_sub, y_train_enc)

# Предсказания
y_pred_enc = model.predict(X_test).flatten().astype(int)
y_pred = le.inverse_transform(y_pred_enc)

# Оценка
print("CatBoost Classification Report:")
print(classification_report(y_test, y_pred))

# Важность признаков
importances = model.get_feature_importance()
features = X_train.columns
plt.figure(figsize=(8, 5))
plt.barh(features, importances)
plt.xlabel("Importance Score")
plt.title("Feature Importances (CatBoost)")
plt.tight_layout()
plt.show()

***
### XGBoost + HMM:
***

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, RobustScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, accuracy_score, f1_score
from xgboost import XGBClassifier
from sklearn.utils.class_weight import compute_class_weight

# Функция ordered_unique
def ordered_unique(x):
    """ Return unique elements, maintaining order of appearance """
    return x[np.sort(np.unique(x, return_index=True)[1])]

# Реализация HMM
class HMM:
    def __init__(self, startprob=None, emissionprob=None, transmat=None, n_iter=100, random_state=None):
        self.startprob = startprob
        self.emissionprob = emissionprob
        self.transmat = transmat
        self.n_iter = n_iter
        self.random_state = random_state
        self.labels = None

    def __str__(self):
        return (
            "HMM Model:\n"
            f"{ {'prior': self.startprob, 'emission': self.emissionprob, 'transition': self.transmat} }"
        )

    def fit(self, Y_pred, Y_true, groups=None):
        self.labels = np.unique(Y_true)
        self.startprob = self.compute_prior(Y_true, self.labels, uniform=False)  # Неравномерные вероятности
        self.emissionprob = self.compute_emission(Y_pred, Y_true, self.labels)
        self.transmat = self.compute_transition(Y_true, self.labels, groups)

    def predict(self, Y, groups=None):
        params = {
            'prior': self.startprob,
            'emission': self.emissionprob,
            'transition': self.transmat,
            'labels': self.labels,
        }

        if groups is None:
            Y_vit, _ = self._viterbi(Y, params)
        else:
            Y_vit = np.concatenate([
                self._viterbi(Y[groups == g], params)[0]
                for g in ordered_unique(groups)
            ])
        return Y_vit

    def predict_proba(self, Y, groups=None):
        params = {
            'prior': self.startprob,
            'emission': self.emissionprob,
            'transition': self.transmat,
            'labels': self.labels,
        }
        if groups is None:
            _, probs = self._viterbi(Y, params, True)
        else:
            probs = np.concatenate([
                self._viterbi(Y[groups == g], params, True)[1]
                for g in ordered_unique(groups)
            ])
        return probs

    def optimise(self, **kwargs):
        return

    @staticmethod
    def compute_transition(Y, labels=None, groups=None):
        if labels is None:
            labels = np.unique(Y)

        def _compute_transition(Y):
            transition = np.vstack([
                np.sum(Y[1:][(Y == label)[:-1]].reshape(-1, 1) == labels, axis=0)
                for label in labels
            ])
            return transition

        if groups is None:
            transition = _compute_transition(Y)
        else:
            transition = sum((
                _compute_transition(Y[groups == g])
                for g in ordered_unique(groups)
            ))

        # Laplace smoothing
        transition = transition + 1
        transition = transition / np.sum(transition, axis=1).reshape(-1, 1)
        return transition

    @staticmethod
    def compute_emission(Y_score, Y_true, labels=None):
        if labels is None:
            labels = np.unique(Y_true)

        if Y_score.ndim == 1:
            Y_pred = np.vstack([
                (Y_score == label).astype('float')[:, None]
                for label in labels
            ])
        else:
            Y_pred = Y_score

        emission = np.vstack(
            [np.mean(Y_pred[Y_true == label], axis=0) for label in labels]
        )
        return emission

    @staticmethod
    def compute_prior(Y_true, labels=None, uniform=True):
        if labels is None:
            labels = np.unique(Y_true)

        if uniform:
            prior = np.ones(len(labels)) / len(labels)
        else:
            prior = np.mean(Y_true.reshape(-1, 1) == labels, axis=0)
        return prior

    def _viterbi(self, Y, hmm_params, return_probs=False):
        if len(Y) == 0:
            return np.empty_like(Y), np.array([])

        def log(x):
            SMALL_NUMBER = 1e-16
            return np.log(x + SMALL_NUMBER)

        prior = hmm_params['prior']
        emission = hmm_params['emission']
        transition = hmm_params['transition']
        labels = hmm_params['labels']

        nobs = len(Y)
        nlabels = len(labels)

        if Y.ndim == 1:
            Y = np.where(Y.reshape(-1, 1) == labels)[1]
        else:
            Y = Y

        probs = np.zeros((nobs, nlabels))
        probs[0, :] = log(prior) + log(emission[:, Y[0] if Y.ndim == 1 else np.argmax(Y[0])])
        for j in range(1, nobs):
            for i in range(nlabels):
                probs[j, i] = np.max(
                    log(emission[i, Y[j] if Y.ndim == 1 else np.argmax(Y[j])]) +
                    log(transition[:, i]) +
                    probs[j - 1, :])
        viterbi_path = np.zeros_like(Y, dtype=int) if Y.ndim == 1 else np.zeros(len(Y), dtype=int)
        viterbi_path[-1] = np.argmax(probs[-1, :])
        for j in reversed(range(nobs - 1)):
            viterbi_path[j] = np.argmax(
                log(transition[:, viterbi_path[j + 1]]) +
                probs[j, :])

        viterbi_path = labels[viterbi_path]

        if return_probs:
            return viterbi_path, np.exp(probs)
        return viterbi_path, None

# Подвыборка 50% тренировочных данных с сохранением временного порядка
n_samples = int(0.5 * len(X_train))
X_train_sub = X_train.iloc[:n_samples].copy()
y_train_sub = y_train.iloc[:n_samples].copy()

# Проверка наличия всех классов и добавление отсутствующих
all_classes = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 16, 17, 18, 19, 20, 24]
missing_classes = set(all_classes) - set(np.unique(y_train_sub))
if missing_classes:
    print(f"Warning: Missing classes in training subset: {missing_classes}")
    for cls in missing_classes:
        cls_indices = np.where(y_train == cls)[0]
        if len(cls_indices) > 0:
            first_idx = cls_indices[0]
            X_train_sub = pd.concat([X_train_sub, X_train.iloc[[first_idx]]], ignore_index=True)
            y_train_sub = pd.concat([y_train_sub, y_train.iloc[[first_idx]]], ignore_index=True)

# Кодирование меток
train_classes = np.unique(y_train_sub)
le = LabelEncoder().fit(train_classes)
y_train_enc = le.transform(y_train_sub)
y_test_enc = le.transform(y_test)

# Вычисление весов классов
class_weights = compute_class_weight('balanced', classes=np.unique(y_train_enc), y=y_train_enc)
sample_weights = np.array([class_weights[cls] for cls in y_train_enc])

# Пайплайн XGBoost
pipeline = Pipeline([
    ('scaler', RobustScaler()),
    ('classifier', XGBClassifier(
        objective='multi:softprob',
        n_estimators=200,  # Увеличено
        learning_rate=0.05,  # Уменьшено
        max_depth=10,
        random_state=42
    ))
])

# Обучение с весами классов
pipeline.fit(X_train_sub, y_train_enc, classifier__sample_weight=sample_weights)
y_pred_enc = pipeline.predict(X_test)
y_pred = le.inverse_transform(y_pred_enc)
# Сглаживание вероятностей
epsilon = 1e-5
y_train_proba = pipeline.predict_proba(X_train_sub) * (1 - 18 * epsilon) + epsilon
y_pred_proba = pipeline.predict_proba(X_test) * (1 - 18 * epsilon) + epsilon

# HMM для сглаживания предсказаний
hmm_model = HMM(n_iter=100, random_state=42)
hmm_model.fit(y_train_proba, y_train_enc)

# Корректировка предсказаний
y_pred_hmm = hmm_model.predict(y_pred_proba)
y_pred_hmm = le.inverse_transform(y_pred_hmm)

# Оценка
print("XGBoost Classification Report:")
print(classification_report(y_test, y_pred, zero_division=0))
print("XGBoost + HMM Classification Report:")
print(classification_report(y_test, y_pred_hmm, zero_division=0))

# Сохранение метрик
metrics = {
    'XGBoost': {
        'accuracy': accuracy_score(y_test, y_pred),
        'f1_macro': f1_score(y_test, y_pred, average='macro')
    },
    'XGBoost_HMM': {
        'accuracy': accuracy_score(y_test, y_pred_hmm),
        'f1_macro': f1_score(y_test, y_pred_hmm, average='macro')
    }
}

# Вывод матриц HMM для отладки
print(hmm_model)

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, RobustScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, accuracy_score, f1_score
from xgboost import XGBClassifier
from sklearn.utils.class_weight import compute_class_weight

# Функция ordered_unique
def ordered_unique(x):
    """Return unique elements, maintaining order of appearance"""
    return np.unique(x)[np.argsort(np.unique(x, return_index=True)[1])]

# Реализация HMM
class HMM:
    def __init__(self, startprob=None, emissionprob=None, transmat=None, n_iter=100, random_state=None):
        self.startprob = startprob
        self.emissionprob = emissionprob
        self.transmat = transmat
        self.n_iter = n_iter
        self.random_state = random_state
        self.labels = None

    def __str__(self):
        return (
            "HMM Model:\n"
            f"{ {'startprob': self.startprob, 'emission': self.emissionprob, 'transition': self.transmat} }"
        )

    def fit(self, Y_pred, Y_true, groups=None):
        self.labels = np.sort(np.unique(Y_true))
        self.startprob = self.compute_probability(Y_true, self.labels, uniform=False)
        self.emissionprob = self.compute_emission_probability(Y_pred, Y_true, self.labels)
        self.transmat = self.compute_transition_probability(Y_true, self.labels, groups)

    def compute_probability(self, Y_true, labels=None, uniform=False):
        if labels is None:
            labels = np.sort(np.unique(Y_true))

        if uniform:
            probability = np.ones(self.labels(Y_true)) / len(Y_true)
        else:
            probability = np.mean(Y_true.reshape(-1, 1) == labels, axis=0)
        return probability

    def compute_emission_probability(self, Y_score, Y_true, labels=None):
        if labels is None:
            labels = np.sort(np.unique(Y_true))

        if Y_score.ndim == 1:
            Y_pred = np.vstack([
                (Y_score == labels).astype('float')[:, None]
            for label in labels])
        else:
            Y_pred = Y_score

        emission = np.vstack([
            np.mean(Y_pred[Y_true == label], axis=0) for label in labels
        ])
        return emission

    def compute_transition_probability(self, Y, labels=None, groups=None):
        if labels is None:
            labels = np.sort(np.unique(Y))

        def _compute_transition(Y):
            transition = np.vstack([
                np.sum(Y[1:][(Y == label)[:-1]].reshape(-1, 1) == labels, axis=0)
                for label in labels
            ])
            return transition

        if groups is None:
            transition = _compute_transition(Y)
        else:
            transition = sum((
                _compute_transition(Y[groups == g])
                for g in ordered_unique(groups)
            ))

        # Laplace smoothing
        transition = transition + 1
        transition = transition / np.sum(transition, axis=1).reshape(-1, 1)
        return transition

    def predict(self, Y, groups=None):
        params = {
            'prior': self.startprob,
            'emission': self.emissionprob,
            'transition': self.transmat,
            'labels': self.labels,
        }

        if groups is None:
            Y_vit, _ = self._viterbi(Y, params)
        else:
            Y_vit = np.concatenate([
                self._viterbi(Y[groups == g], params)[0]
                for g in ordered_unique(groups)
            ])
        return Y_vit

    def predict_probability(self, Y, groups=None):
        params = {
            'prior': self.startprob,
            'emission': self.emissionprob,
            'transition': self.transmat,
            'labels': self.labels,
        }
        if groups is None:
            _, probs = self._viterbi(Y, params, True)
        else:
            probs = np.concatenate([
                self._viterbi(Y[groups == g], params, True)[1]
                for g in ordered_unique(groups)
            ])
        return probs

    def optimise(self, **kwargs):
        return

    def _viterbi(self, Y, hmm_params, return_probs=False):
        if len(Y) == 0:
            return np.empty_like(Y), np.array([])

        def log(x):
            SMALL_NUMBER = 1e-16
            return np.log(x + SMALL_NUMBER)

        prior = hmm_params['prior']
        emission = hmm_params['emission']
        transition = hmm_params['transition']
        labels = hmm_params['labels']

        nobs = len(Y)
        nlabels = len(labels)

        if Y.ndim == 1:
            Y = np.where(Y.reshape(-1, 1) == labels)[1]
        else:
            Y = Y

        probs = np.zeros((nobs, nlabels))
        probs[0, :] = log(prior) + log(emission[:, Y[0] if Y.ndim == 1 else np.argmax(Y[0])])
        for j in range(1, nobs):
            for i in range(nlabels):
                probs[j, i] = np.max(
                    log(emission[i, Y[j] if Y.ndim == 1 else np.argmax(Y[j])]) +
                    log(transition[:, i]) +
                    probs[j - 1, :])
        viterbi_path = np.zeros_like(Y, dtype=int) if Y.ndim == 1 else np.zeros(len(Y), dtype=int)
        viterbi_path[-1] = np.argmax(probs[-1, :])
        for j in reversed(range(nobs - 1)):
            viterbi_path[j] = np.argmax(
                log(transition[:, viterbi_path[j + 1]]) +
                probs[j, :])

        viterbi_path = labels[viterbi_path]

        if return_probs:
            return viterbi_path, np.exp(probs)
        return viterbi_path, None

# Кодирование меток
le = LabelEncoder().fit(np.unique(y_train))
y_train_enc = le.transform(y_train)
y_test_enc = le.transform(y_test)

# Вычисление весов классов
class_weights = compute_class_weight('balanced', classes=np.unique(y_train_enc), y=y_train_enc)
sample_weights = np.array([class_weights[cls] for cls in y_train_enc])

# Пайплайн XGBoost
pipeline = Pipeline([
    ('scaler', RobustScaler()),
    ('classifier', XGBClassifier(
        objective='multi:softprob',
        n_estimators=300,  # Увеличено для полной выборки
        learning_rate=0.05,
        max_depth=10,
        random_state=42
    ))
])

# Обучение на полной выборке
pipeline.fit(X_train, y_train_enc, classifier__sample_weight=sample_weights)
y_pred_enc = pipeline.predict(X_test)
y_pred = le.inverse_transform(y_pred_enc)
# Сглаживание вероятностей
epsilon = 1e-5
y_train_proba = pipeline.predict_proba(X_train) * (1 - len(np.unique(y_train_enc)) * epsilon) + epsilon
y_pred_proba = pipeline.predict_proba(X_test) * (1 - len(np.unique(y_train_enc)) * epsilon) + epsilon

# HMM для сглаживания предсказаний
hmm_model = HMM(n_iter=100, random_state=42)
hmm_model.fit(y_train_proba, y_train_enc)

# Корректировка предсказаний
y_pred_hmm = hmm_model.predict(y_pred_proba)
y_pred_hmm = le.inverse_transform(y_pred_hmm)

# Оценка
print("XGBoost Classification Report:")
print(classification_report(y_test, y_pred, zero_division=0))
print("XGBoost + HMM Classification Report:")
print(classification_report(y_test, y_pred_hmm, zero_division=0))

# Сохранение метрик
metrics = {
    'XGBoost': {
        'accuracy': accuracy_score(y_test, y_pred),
        'f1_macro': f1_score(y_test, y_pred, average='macro')
    },
    'XGBoost_HMM': {
        'accuracy': accuracy_score(y_test, y_pred_hmm),
        'f1_macro': f1_score(y_test, y_pred_hmm, average='macro')
    }
}

# Вывод матриц HMM для отладки
print(hmm_model)

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder, RobustScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, accuracy_score, f1_score
from xgboost import XGBClassifier
from sklearn.utils.class_weight import compute_class_weight

def ordered_unique(x):
    """ Return unique elements, maintaining order of appearance """
    return x[np.sort(np.unique(x, return_index=True)[1])]

class HMM:
    def __init__(self, startprob=None, emissionprob=None, transmat=None, n_iter=100, random_state=None):
        self.startprob = startprob
        self.emissionprob = emissionprob
        self.transmat = transmat
        self.n_iter = n_iter
        self.random_state = random_state
        self.labels = None

    def __str__(self):
        return (
            "HMM Model:\n"
            f"{ {'prior': self.startprob, 'emission': self.emissionprob, 'transition': self.transmat} }"
        )

    def fit(self, Y_pred, Y_true, groups=None):
        self.labels = np.unique(Y_true)
        self.startprob = self.compute_prior(Y_true, self.labels, uniform=False)
        self.emissionprob = self.compute_emission(Y_pred, Y_true, self.labels)
        self.transmat = self.compute_transition(Y_true, self.labels, groups)

    def predict(self, Y, groups=None):
        params = {
            'prior': self.startprob,
            'emission': self.emissionprob,
            'transition': self.transmat,
            'labels': self.labels,
        }
        if groups is None:
            Y_vit, _ = self._viterbi(np.argmax(Y, axis=1), params)
        else:
            Y_vit = np.concatenate([
                self._viterbi(np.argmax(Y[groups == g], axis=1), params)[0]
                for g in ordered_unique(groups)
            ])
        return Y_vit

    def predict_proba(self, Y, groups=None):
        params = {
            'prior': self.startprob,
            'emission': self.emissionprob,
            'transition': self.transmat,
            'labels': self.labels,
        }
        if groups is None:
            _, probs = self._viterbi(np.argmax(Y, axis=1), params, True)
        else:
            probs = np.concatenate([
                self._viterbi(np.argmax(Y[groups == g], axis=1), params, True)[1]
                for g in ordered_unique(groups)
            ])
        return probs

    def optimise(self, **kwargs):
        return

    @staticmethod
    def compute_transition(Y, labels=None, groups=None):
        if labels is None:
            labels = np.unique(Y)
        def _compute_transition(Y):
            transition = np.vstack([
                np.sum(Y[1:][(Y == label)[:-1]].reshape(-1, 1) == labels, axis=0)
                for label in labels
            ])
            return transition
        if groups is None:
            transition = _compute_transition(Y)
        else:
            transition = sum((
                _compute_transition(Y[groups == g])
                for g in ordered_unique(groups)
            ))
        transition = transition + 1  # Laplace smoothing
        transition = transition / np.sum(transition, axis=1).reshape(-1, 1)
        return transition

    @staticmethod
    def compute_emission(Y_score, Y_true, labels=None):
        if labels is None:
            labels = np.unique(Y_true)
        Y_pred = np.argmax(Y_score, axis=1)  # Бинарные предсказания
        emission = np.zeros((len(labels), len(labels)))
        for i, true_label in enumerate(labels):
            mask = (Y_true == true_label)
            if np.sum(mask) > 0:
                emission[i] = np.bincount(Y_pred[mask], minlength=len(labels)) / np.sum(mask)
        return emission

    @staticmethod
    def compute_prior(Y_true, labels=None, uniform=True):
        if labels is None:
            labels = np.unique(Y_true)
        if uniform:
            prior = np.ones(len(labels)) / len(labels)
        else:
            prior = np.mean(Y_true.reshape(-1, 1) == labels, axis=0)
        return prior

    def _viterbi(self, Y, hmm_params, return_probs=False):
        if len(Y) == 0:
            return np.empty_like(Y), np.array([])
        def log(x):
            SMALL_NUMBER = 1e-16
            return np.log(x + SMALL_NUMBER)
        prior = hmm_params['prior']
        emission = hmm_params['emission']
        transition = hmm_params['transition']
        labels = hmm_params['labels']
        nobs = len(Y)
        nlabels = len(labels)
        probs = np.zeros((nobs, nlabels))
        probs[0, :] = log(prior) + log(emission[:, Y[0]])
        for j in range(1, nobs):
            for i in range(nlabels):
                probs[j, i] = np.max(
                    log(emission[i, Y[j]]) +
                    log(transition[:, i]) +
                    probs[j - 1, :])
        viterbi_path = np.zeros(nobs, dtype=int)
        viterbi_path[-1] = np.argmax(probs[-1, :])
        for j in reversed(range(nobs - 1)):
            viterbi_path[j] = np.argmax(
                log(transition[:, viterbi_path[j + 1]]) +
                probs[j, :])
        viterbi_path = labels[viterbi_path]
        if return_probs:
            return viterbi_path, np.exp(probs)
        return viterbi_path, None

# Используем полный набор данных
X_train_sub = X_train.copy()
y_train_sub = y_train.copy()

# Проверка наличия всех классов (опционально, так как используем полный набор)
all_classes = [1, 2, 3, 4, 5, 6, 7, 9, 10, 11, 12, 13, 16, 17, 18, 19, 20, 24]
missing_classes = set(all_classes) - set(np.unique(y_train_sub))
if missing_classes:
    print(f"Warning: Missing classes in training subset: {missing_classes}")
    for cls in missing_classes:
        cls_indices = np.where(y_train == cls)[0]
        if len(cls_indices) > 0:
            first_idx = cls_indices[0]
            X_train_sub = pd.concat([X_train_sub, X_train.iloc[[first_idx]]], ignore_index=True)
            y_train_sub = pd.concat([y_train_sub, y_train.iloc[[first_idx]]], ignore_index=True)

# Кодирование меток
train_classes = np.unique(y_train_sub)
le = LabelEncoder().fit(train_classes)
y_train_enc = le.transform(y_train_sub)
y_test_enc = le.transform(y_test)

# Вычисление весов классов
class_weights = compute_class_weight('balanced', classes=np.unique(y_train_enc), y=y_train_enc)
sample_weights = np.array([class_weights[cls] for cls in y_train_enc])

# Пайплайн XGBoost
pipeline = Pipeline([
    ('scaler', RobustScaler()),
    ('classifier', XGBClassifier(
        objective='multi:softprob',
        n_estimators=300,
        learning_rate=0.05,
        max_depth=10,
        random_state=42
    ))
])

# Обучение с весами классов
pipeline.fit(X_train_sub, y_train_enc, classifier__sample_weight=sample_weights)
y_pred_enc = pipeline.predict(X_test)
y_pred = le.inverse_transform(y_pred_enc)

# Сглаживание вероятностей
epsilon = 1e-5
y_train_proba = pipeline.predict_proba(X_train_sub) * (1 - len(np.unique(y_train_enc)) * epsilon) + epsilon
y_pred_proba = pipeline.predict_proba(X_test) * (1 - len(np.unique(y_train_enc)) * epsilon) + epsilon

# HMM для сглаживания предсказаний
hmm_model = HMM(n_iter=100, random_state=42)
hmm_model.fit(y_train_proba, y_train_enc)

# Корректировка предсказаний с учётом субъектов
groups_test = test_df['subject_id'].values  # Предполагаем, что есть subject_id
y_pred_hmm = hmm_model.predict(y_pred_proba, groups=groups_test)
y_pred_hmm = le.inverse_transform(y_pred_hmm)

# Оценка
print("XGBoost Classification Report:")
print(classification_report(y_test, y_pred, zero_division=0))
print("XGBoost + HMM Classification Report:")
print(classification_report(y_test, y_pred_hmm, zero_division=0))

# Сохранение метрик
metrics = {
    'XGBoost': {
        'accuracy': accuracy_score(y_test, y_pred),
        'f1_macro': f1_score(y_test, y_pred, average='macro')
    },
    'XGBoost_HMM': {
        'accuracy': accuracy_score(y_test, y_pred_hmm),
        'f1_macro': f1_score(y_test, y_pred_hmm, average='macro')
    }
}

# Вывод матриц HMM для отладки
print(hmm_model)