In [None]:
# pip install -r .\requirments.txt

In [1]:
import numpy as np
import pandas as pd

In [2]:
def get_data_from_file(filename: str):
    dirty_data = pd.read_csv(filename, sep=':', header=None)
    user_2_data = dict()
    for i in range(dirty_data.shape[0]):
        user_2_data[dirty_data[0][i]] = np.array(dirty_data[1][i].split(';'), dtype=int)
    
    return pd.DataFrame.from_dict(user_2_data, orient='index')

orig_data = get_data_from_file("data.txt")
fake_data = get_data_from_file("data_fake.txt")
true_data = get_data_from_file("data_true.txt")

In [4]:
def get_transition_matrix(sequence: np.ndarray) -> np.ndarray:
    # Удаление nan'ов
    sequence = sequence[np.isnan(sequence)==False]
    length = sequence.shape[0]
    states = np.array(np.unique(sequence), dtype=int)
    transition_matrix = pd.DataFrame(0, index=states, columns=states)
    for i in range(length - 1):
        current_state = sequence[i]
        subsequent_state = sequence[i + 1]
        transition_matrix.loc[current_state, subsequent_state] += 1
    return transition_matrix.div(transition_matrix.sum().values, axis=0)


def get_threshold(
        transition_matrix: pd.DataFrame,
        sequence: np.ndarray=None) -> float:
    # Суть операции - находятся уникальные значения МП, убирается ноль как отсутствие вероятности перехода, порог - наименьшее значение без учета нуля
    tm_values = np.unique(transition_matrix.values)
    threshold = tm_values[tm_values != 0].min()
    return threshold


def anomaly_detection(
        transition_matrix: pd.DataFrame, 
        verified_data: np.ndarray, 
        treshold: float) -> bool:
    states_vd = np.unique(verified_data)
    states_tm = np.array(transition_matrix.columns)
    # Проверка на совпадение состояний: отсутствие состояний, содержащихся в МП
    is_states_match = all(np.in1d(states_vd, states_tm))
    if not is_states_match:
        return True
    # Проверка на существование отсутствующих переходов
    for i in range(len(verified_data) - 1):
        current_state = verified_data[i]
        subsequent_state = verified_data[i + 1]
        probability = transition_matrix.loc[current_state, subsequent_state]
        if probability < treshold:
            return True
    return False

In [5]:
alpha_error, betta_error = 0, 0

for user in orig_data.index:
    transition_matrix = get_transition_matrix(orig_data.loc[user].values)
    threshold = get_threshold(transition_matrix)
    detection_result_for_true_data = anomaly_detection(transition_matrix, true_data.loc[user].values, threshold)
    detection_result_for_fake_data = anomaly_detection(transition_matrix, fake_data.loc[user].values, threshold)
    if detection_result_for_true_data:
        alpha_error += 1
    if not detection_result_for_fake_data:
        betta_error += 1
    anomaly_or_not = lambda x: "Аnomaly" if x else "Net anomaly"
    print(user)
    print("\tOrig data: ", orig_data.loc[user].values, "\t States: ", str.join(", ", np.array(transition_matrix.columns, dtype=str)))
    print("\tTrue data: ", true_data.loc[user].values, f"\t Detection result: {anomaly_or_not(detection_result_for_true_data)}")
    print("\tFake data: ", fake_data.loc[user].values, f"\t Detection result: {anomaly_or_not(detection_result_for_fake_data)}")


user1
	Orig data:  [41. 41. 41. ... nan nan nan] 	 States:  41, 65
	True data:  [41 41 41 41 41 41 41 41 41 41] 	 Detection result: Net anomaly
	Fake data:  [34 34 34 34 34 33 33 34 34 34] 	 Detection result: Аnomaly
user2
	Orig data:  [55. 55. 55. ... nan nan nan] 	 States:  12, 28, 42, 54, 55, 56, 65
	True data:  [28 28 28 28 28 28 28 28 28 28] 	 Detection result: Net anomaly
	Fake data:  [45 45 45 45 45 45 43 45 45 45] 	 Detection result: Аnomaly
user3
	Orig data:  [41. 41. 41. ... nan nan nan] 	 States:  41
	True data:  [41 41 41 41 41 41 41 41 41 41] 	 Detection result: Net anomaly
	Fake data:  [10 10 10 10 10 10 10 10 10 10] 	 Detection result: Аnomaly
user4
	Orig data:  [15. 15. 15. ... nan nan nan] 	 States:  10, 15, 42, 62
	True data:  [15 15 15 15 15 15 15 15 15 15] 	 Detection result: Net anomaly
	Fake data:  [39 39 39 39 39 39 39 39 39 39] 	 Detection result: Аnomaly
user5
	Orig data:  [35. 35. 35. ... nan nan nan] 	 States:  35, 65
	True data:  [35 35 35 35 35 35 35 35 35 

In [6]:
print(alpha_error / len(true_data.index))
print(betta_error / len(fake_data.index))

0.1
0.175
