In [214]:
from collections import defaultdict
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from imblearn.over_sampling import SMOTE
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier, _tree, plot_tree

In [215]:
dataset = Path('WEDA-FALL/dataset/50Hz')
WINDOW_SIZE = 20

In [216]:
walking_dataset = dataset / 'D01'
walking_accel = (f for f in walking_dataset.glob('*_accel.csv') if 'vertical' not in f.name)
walking_gyro = walking_dataset.glob('*_gyro.csv')

In [217]:
jogging_dataset = dataset / 'D02'
jogging_accel = (f for f in jogging_dataset.glob('*_accel.csv') if 'vertical' not in f.name)
jogging_gyro = jogging_dataset.glob('*_gyro.csv')

In [218]:
def preprocess_df(df):
	df['acceleration'] = np.sqrt(
		df['accel_x_list'] ** 2 + df['accel_y_list'] ** 2 + df['accel_z_list'] ** 2
	)
	df['rotationrate'] = np.sqrt(
		df['gyro_x_list'] ** 2 + df['gyro_y_list'] ** 2 + df['gyro_z_list'] ** 2
	)

	df['acceleration'] = df['acceleration'].rolling(window=WINDOW_SIZE).mean()
	df['rotationrate'] = df['rotationrate'].rolling(window=WINDOW_SIZE).mean()

	pitch = np.atan2(
		-df['accel_x_list'], np.sqrt(df['accel_y_list'] ** 2 + df['accel_z_list'] ** 2)
	) * (180 / np.pi)
	gyro_integration = df['gyro_y_list'] * (1 / 50)

	# Calculate the recursive component using cumsum
	# This handles: 0.98 * (previous_angle + gyro_term)
	recursive_term = (0.98 * gyro_integration).cumsum()

	# Add the accelerometer correction term
	accel_term = 0.02 * pitch

	# Combine terms (note: first value needs special handling)
	df['angle_pitch'] = recursive_term + accel_term.cumsum()

	df = df.filter(
		items=['accel_time_list', 'gyro_time_list', 'acceleration', 'rotationrate', 'angle_pitch']
	)
	return df

In [219]:
df_list = []
for acc, gyro in zip(walking_accel, walking_gyro):
	df1 = pd.read_csv(acc)
	df2 = pd.read_csv(gyro)
	df = pd.concat([df1, df2], axis=1)
	df = preprocess_df(df).dropna()
	df['label'] = np.full(len(df), 'walking')
	df_list.append(df)
df_walking = pd.concat(df_list)

In [220]:
df_list = []
for acc, gyro in zip(jogging_accel, jogging_gyro):
	df1 = pd.read_csv(acc)
	df2 = pd.read_csv(gyro)
	df = pd.concat([df1, df2], axis=1)
	df = preprocess_df(df).dropna()
	df['label'] = np.full(len(df), 'jogging')
	df_list.append(df)
df_jogging = pd.concat(df_list)

In [221]:
df_fall = pd.read_csv('fall_data.csv')

df = pd.concat([df_walking, df_jogging, df_fall]).reset_index(drop=True)
# df = pd.concat([df_walking, df_jogging]).reset_index(drop=True)

In [222]:
X = df[['acceleration', 'rotationrate', 'angle_pitch']]
y = df['label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
smote = SMOTE(random_state=42)
X_train, y_train = smote.fit_resample(X_train, y_train)

In [223]:
clf = DecisionTreeClassifier(random_state=42, max_depth=3)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))

              precision    recall  f1-score   support

        fall       0.58      0.88      0.70      9772
     jogging       0.80      0.69      0.74      7363
     walking       0.97      0.66      0.78     12224

    accuracy                           0.74     29359
   macro avg       0.79      0.74      0.74     29359
weighted avg       0.80      0.74      0.74     29359



In [224]:
c_features = defaultdict(list)


def tree_to_rules(clf, feature_names):
	tree_ = clf.tree_
	feature_name = [
		feature_names[i] if i != _tree.TREE_UNDEFINED else 'undefined!' for i in tree_.feature
	]

	# The tuple is min, max
	d = defaultdict(lambda: (float('-inf'), float('inf')))

	def recurse(node, d):
		if tree_.feature[node] != _tree.TREE_UNDEFINED:
			name = feature_name[node]
			threshold = tree_.threshold[node]
			threshold = float(threshold)

			d_copy = d.copy()

			# Left child
			# The new upper bound for the feature is the threshold
			d[name] = (d_copy[name][0], threshold)
			recurse(tree_.children_left[node], d)

			# Right child
			# The new lower bound for the feature is the threshold
			d[name] = (threshold, d_copy[name][1])
			recurse(tree_.children_right[node], d)
		else:
			# Leaf node
			value = tree_.value[node]
			class_id = value.argmax()
			c_features[class_id].append(d.copy())

	recurse(0, d)


# Extract rules
tree_to_rules(clf, X.columns)

In [225]:
import math


def format_rules(class_rules):
	rule_strings = []
	for rule in class_rules:
		conditions = []
		for feature, (min_val, max_val) in rule.items():
			# Check for lower bound condition
			if min_val > -math.inf:
				conditions.append(f'{feature} > {min_val:.2f}')

			# Check for upper bound condition
			if max_val < math.inf:
				conditions.append(f'{feature} <= {max_val:.2f}')

		# Join all conditions for a single rule with " and "
		if conditions:
			rule_strings.append(' and '.join(conditions))
	return rule_strings

In [226]:
for class_id, rules in c_features.items():
	print(f'Rules for class {class_id}:')
	formatted = format_rules(rules)
	for rule_string in formatted:
		print(f'  - {rule_string}')
	print()

Rules for class 0:
  - angle_pitch <= 69.68 and acceleration <= 11.55
  - angle_pitch > 69.68 and angle_pitch <= 356.68 and acceleration <= 11.55
  - angle_pitch > 176.68 and angle_pitch <= 356.68 and acceleration > 11.55
  - angle_pitch > 494.96 and acceleration > 13.58 and acceleration <= 16.44

Rules for class 1:
  - angle_pitch > 69.68 and angle_pitch <= 176.68 and acceleration > 11.55
  - angle_pitch > 494.96 and acceleration > 16.44

Rules for class 2:
  - angle_pitch > 356.68 and angle_pitch <= 494.96 and acceleration > 11.55 and acceleration <= 13.58
  - angle_pitch > 494.96 and acceleration > 11.55 and acceleration <= 13.58

