In [1]:
import numpy as np
import pandas as pd
import random
from scipy.io import arff
from itertools import combinations
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.model_selection import cross_validate,StratifiedKFold, RandomizedSearchCV, RepeatedStratifiedKFold

In [2]:
Person = dict[str,str|pd.DataFrame]

In [3]:
def compute_eer_from_paper(labels, prediction):
    from sklearn.metrics import roc_curve
    fprs, tprs, _ = roc_curve(labels, prediction,pos_label=True)
    eer = fprs[np.nanargmin(np.absolute((1 - tprs) - fprs))]
    return eer

In [4]:
def compute_eer_changjiang(labels, prediction):
    from scipy.optimize import brentq
    from scipy.interpolate import interp1d
    from sklearn.metrics import roc_curve
    fpr, tpr, _ = roc_curve(labels, prediction,pos_label=True)
    eer = brentq(lambda x: 1. - x - interp1d(fpr, tpr)(x), 0., 1.)
    return eer

In [5]:
def calculate_eer(y_true, y_score):
    from sklearn.metrics import roc_curve
    fpr, tpr, _ = roc_curve(y_true, y_score, pos_label=True)
    abs_diff = [abs(fp-tp) for fp, tp in zip(fpr, tpr)]
    min_idx = abs_diff.index(min(abs_diff))
    eer = (fpr[min_idx] + tpr[min_idx])/2
    return eer

In [6]:
def trim_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    df['ACTIVITY'].idxmin()

In [7]:
def processed_dataframe(raw: pd.DataFrame, cols_to_drop: list[str]) -> pd.DataFrame:
    result = raw.drop(columns=cols_to_drop)
    result.columns = [col.replace('"', "") for col in result]
    result['ACTIVITY'] = result['ACTIVITY'].str.decode('utf-8')
    result['class'] = result['class'].str.decode('utf-8')
    return result

In [8]:
def pick_impostors(guid: str, people: list[dict[str, pd.DataFrame]]) -> list[dict[str, pd.DataFrame]]:
    return random.choices([i for i in people if i['guid'] != guid], k=18)

def biometric_train_test_split(person: Person, impostors: list[Person], activity: str,sensors: list[str])->[pd.DataFrame,pd.DataFrame]:
    full_person: pd.DataFrame = None
    for sensor in sensors:
        full_person = person[sensor].query(f"`ACTIVITY`=='{activity}'").drop(columns=['class','ACTIVITY']).add_suffix(sensor).reset_index(drop=True) if full_person is None else full_person.merge(
                person[sensor].query(f"`ACTIVITY`=='{activity}'").drop(columns=['class','ACTIVITY']).add_suffix(sensor).reset_index(drop=True), left_index=True, right_index=True)
    full_person['class']=True

    full_impostors: list[pd.DataFrame]|pd.DataFrame = []
    for impostor in impostors:
        full_impostor: pd.DataFrame = None
        for sensor in sensors:
            full_impostor = impostor[sensor].query(f"`ACTIVITY`=='{activity}'").drop(columns=['class','ACTIVITY']).add_suffix(sensor).reset_index(drop=True) if full_impostor is None else full_impostor.merge(
                impostor[sensor].query(f"`ACTIVITY`=='{activity}'").drop(columns=['class','ACTIVITY']).add_suffix(sensor).reset_index(drop=True), left_index=True, right_index=True)
        full_impostor = full_impostor.dropna()
        if len(full_impostor)>=3:
            full_impostor = full_impostor.sample(n=3)
        full_impostor['class']=False
        full_impostors.append(full_impostor)
    full_person = full_person.sample(frac=1)
    full_impostors = pd.concat(full_impostors).sample(frac=1)
    return pd.concat([full_person[:len(full_person)//2],full_impostors[:len(full_impostors)//2]]),pd.concat([full_person[len(full_person)//2:],full_impostors[len(full_impostors)//2:]])

In [9]:
def person_classification_df_builder(people: list[dict[str, int | pd.DataFrame]], sensors: list[str], activity: str) -> pd.DataFrame:
    result_df: pd.DataFrame = None
    for person in people:
        df: pd.DataFrame = None
        for sensor in sensors:
            df = person[sensor].query(f"`ACTIVITY`=='{activity}'").drop(columns=['class','ACTIVITY']).add_suffix(sensor).reset_index(drop=True) if df is None else df.merge(
                person[sensor].query(f"`ACTIVITY`=='{activity}'").drop(columns=['class','ACTIVITY']).add_suffix(sensor).reset_index(drop=True), left_index=True, right_index=True)
            # df = pd.get_dummies(df)
        df['class']=person['guid']
        result_df = df if result_df is None else pd.concat([result_df, df])
    return result_df.dropna()

def activity_classification_df_builder(people: list[dict[str, int | pd.DataFrame]], sensors: list[str], activities: list[str]) -> pd.DataFrame:
    people_df: list[pd.DataFrame] = []
    for person in people:
        for activity in activities:
            curr_df = None
            for sensor in sensors:
                curr_df = person[sensor].query(f"`ACTIVITY`=='{activity}'").drop(columns=['class','ACTIVITY']).add_suffix(sensor).reset_index(drop=True) if curr_df is None else curr_df.merge(
                    person[sensor].query(f"`ACTIVITY`=='{activity}'").drop(columns=['class','ACTIVITY']).add_suffix(sensor).reset_index(drop=True), left_index=True, right_index=True)
            curr_df['ACTIVITY'] = activity
            people_df.append(curr_df)
    return pd.concat(people_df).dropna()

In [10]:
columns_to_be_dropped = []
for n in ['X', 'Y', 'Z']:
    columns_to_be_dropped.append(f'"{n}VAR"')
    for i in range(13):
        columns_to_be_dropped.append(f'"{n}MFCC{i}"')
for n in combinations(['X', 'Y', 'Z'], 2):
    columns_to_be_dropped.append(f'"{"".join(n)}COS"')
    columns_to_be_dropped.append(f'"{"".join(n)}COR"')

sensors = ["phone_accel", "watch_accel", "phone_gyro", "watch_gyro"]
activities = {'A': 'walking',
              'B': 'jogging',
              'C': 'stairs',
              'D': 'sitting',
              'E': 'standing',
              'F': 'typing',
              'G': 'teeth',
              'H': 'soup',
              'I': 'chips',
              'J': 'pasta',
              'K': 'drinking',
              'L': 'sandwich',
              'M': 'kicking',
              'O': 'catch',
              'P': 'dribbling',
              'Q': 'writing',
              'R': 'clapping',
              'S': 'folding'}

In [11]:
people: list[dict[str, pd.DataFrame]] = [{
    'guid': i,
    'phone_accel': processed_dataframe(pd.DataFrame(arff.loadarff(f"arff_files/phone/accel/data_{i}_accel_phone.arff")[0]), columns_to_be_dropped),
    'phone_gyro': processed_dataframe(pd.DataFrame(arff.loadarff(f"arff_files/phone/gyro/data_{i}_gyro_phone.arff")[0]), columns_to_be_dropped),
    'watch_accel': processed_dataframe(pd.DataFrame(arff.loadarff(f"arff_files/watch/accel/data_{i}_accel_watch.arff")[0]), columns_to_be_dropped),
    'watch_gyro': processed_dataframe(pd.DataFrame(arff.loadarff(f"arff_files/watch/gyro/data_{i}_gyro_watch.arff")[0]), columns_to_be_dropped)
} for i in range(1600, 1651) if i != 1614]  # person 1614 missing in files

In [12]:
every_combination = [[i]for i in sensors] + \
    [list(i) for i in list(combinations(sensors, 2))]+[sensors]

### Authentication

In [13]:
biometry_models = {activity: [] for activity in activities}
for person in people:
    impostors = pick_impostors(person['guid'], people)
    for activity in activities:
        train, test = biometric_train_test_split(
            person, impostors, activity, sensors)
        if len(train) == 0 or len(test)==0:
            continue
        train_target = train['class']
        # print(train_target.unique())
        train = train.drop(columns=['class'])
        test_target = test['class']
        test = test.drop(columns=['class'])
        biometry_classifier = RandomForestClassifier(
            10, max_features='sqrt')
        biometry_classifier.fit(train, train_target)
        y_true = test_target
        if len(y_true.unique())!=2:
            # print(person['guid'],combination,activity)
            continue
        eer = compute_eer_changjiang(test_target,biometry_classifier.predict(test))
        biometry_models[activity].append(eer)

In [14]:
for activity in biometry_models:
    biometry_models[activity] = list(filter(lambda x: x!=0,biometry_models[activity]))
    print(activities[activity], np.average(biometry_models[activity]*100))

walking 0.12244160623842794
jogging 0.13106548407771806
stairs 0.18339670716198295
sitting 0.1657020531780547
standing 0.23001185626514298
typing 0.15048178626732367
teeth 0.18161459820042322
soup 0.17670430912922583
chips 0.16118067718137524
pasta 0.20546018099912405
drinking 0.17555747405992694
sandwich 0.1766961822210309
kicking 0.22811693257414256
catch 0.16451797455898662
dribbling 0.1878989312509228
writing 0.1643320248292602
clapping 0.15199607373286309
folding 0.18330402713449842


### People classification

In [15]:
kf = StratifiedKFold(n_splits=10,shuffle=True,random_state=1)
scores: dict[str,float] = {}
for activity in activities:
    people_df = person_classification_df_builder(people,sensors,activity)
    people_target = people_df['class']
    people_df = people_df.drop(columns=['class'])
    people_classifier = RandomForestClassifier(10, max_features='sqrt')
    score = cross_validate(people_classifier,people_df,people_target,cv=kf,scoring='accuracy')
    scores[activities[activity]]= np.average(score['test_score'])
scores

{'walking': 0.9814621758887997,
 'jogging': 0.9446939321037157,
 'stairs': 0.9270588235294118,
 'sitting': 0.925921855921856,
 'standing': 0.8939479060265578,
 'typing': 0.9733360064153969,
 'teeth': 0.9532915360501567,
 'soup': 0.9528735632183908,
 'chips': 0.9546220633299285,
 'pasta': 0.9592202462380301,
 'drinking': 0.9368253968253969,
 'sandwich': 0.9620820271682341,
 'kicking': 0.9079775280898877,
 'catch': 0.9430232558139535,
 'dribbling': 0.947114402451481,
 'writing': 0.9418364681295716,
 'clapping': 0.9564794007490637,
 'folding': 0.933283395755306}

## Activity classification

In [16]:
kf = StratifiedKFold(n_splits=10,random_state=1, shuffle=True)
activities_df = activity_classification_df_builder(people, sensors,activities)
activities_target = activities_df['ACTIVITY']
activities_df = activities_df.drop(columns=['ACTIVITY'])

In [17]:
# classifier = RandomForestClassifier()
# optimize_rf_params = {'criterion':['gini','entropy','log_loss'], 'max_features':['sqrt','log2',None], 'n_estimators':(1,50)}
# rkf = RepeatedStratifiedKFold(n_splits=10,n_repeats=3,random_state=1)
# random_search = RandomizedSearchCV(estimator=classifier,param_distributions=optimize_rf_params,n_jobs=-1,scoring='accuracy',cv=rkf)
# random_search.fit(activities_df,activities_target)
# best_model = random_search.best_estimator_
# print(random_search.best_score_)
# print(best_model.get_params())

In [18]:
activity_rfc = RandomForestClassifier(50,max_features='log2')
score_rfc = cross_validate(activity_rfc, activities_df,activities_target,scoring='accuracy',cv=kf)
activity_accuracy = np.average(score_rfc['test_score'])
activity_accuracy

0.8950853889943076