In [1]:
import pandas as pd
import numpy as np
import pymongo
from hmmlearn.hmm import CategoricalHMM

from sklearn.model_selection import train_test_split


In [2]:
import warnings

warnings.filterwarnings(action="ignore")

In [3]:
# data 기본 경로
ABSOLUTE_PATH = "C:\\Users\\rudnf\\vscode\\Graduation\\final\\data\\"
SEASONS = ["spring", "summer", "fall", "winter"]
KS = [25, 50, 75, 100]
MONGO_URI = "mongodb://localhost:27017/"

In [4]:
def season_match(season):
    months = []
    if season == "spring":
        months = ["03", "04", "05"]
    if season == "summer":
        months = ["06", "07", "08"]
    if season == "fall":
        months = ["09", "10", "11"]
    if season == "winter":
        months = ["12", "01", "02"]
    return months

In [5]:
def label_sequence(path, time_period):
    l_sequence = []
    for idx in range(0, len(path)):
        LID = int(path[idx].replace('POI', ''))
        T = 0
        
        if time_period[idx] == 'dawn':
            T = 0
        elif time_period[idx] == 'morning':
            T = 1
        if time_period[idx] == 'afternoon':
            T = 2
        elif time_period[idx] == 'night':
            T = 3
            
        l_sequence.append( LID * 4 + T )
    return l_sequence

In [6]:
import ast
def df_str_to_list(df):
    try:
        columns = ['trajectory_id', 'POI_sequence', 'time_period_sequence']
        for column in columns:
            df[column] = df[column].apply(ast.literal_eval)
        return df
    except (ValueError, SyntaxError):
        print("Invalid input string format.")
        

In [7]:
#****************** initial probability **********************#
def calculate_initial_probability(X, n_components, smoothing_factor=1.0):
    start_count = np.zeros(n_components)
    start_proba = np.zeros(n_components)

    # start_point
    for value in X[:, 1]:
        start_count[int(value.replace("POI", ""))] += 1

    total = np.sum(start_count)
    
    ## Laplace 스무딩적용
    start_proba = (start_count + smoothing_factor) / (total + n_components)
    return start_proba


In [8]:
# #****************** transition probability **********************#
# # transition_matrix : transition 횟수 count 저장, transition_probability : transition 확률 저장
def calculate_transition_probability(X, n_components, smoothing_factor=1.0):
    
    transition_count = np.zeros((n_components, n_components))  
    transition_proba = np.zeros((n_components, n_components))
    
    # transition 횟수 count
    # X[:, 2] : 'path'
    for trajectory in X[:, 3]:

        for i in range(len(trajectory) - 1):
            cur_POI_num = int(trajectory[i].replace('POI',''))
            next_POI_num = int(trajectory[i+1].replace('POI',''))
            
            transition_count[cur_POI_num, next_POI_num] += 1
            

    for i in range(len(transition_count)):
        total = np.sum(transition_count[i])
        
        ## Laplace 스무딩적용
        transition_proba[i] = (transition_count[i] + smoothing_factor) / (total + n_components)
    return transition_proba


In [9]:
#****************** emission probability **********************#
# 각 POI일때 label sequence가 나올확률
def calculate_emission_probability(X, n_components, n_features, smoothing_factor=1.0):
        # emission Probability 구하기
    emission_count = np.zeros(shape=(n_components, n_features))
    emission_proba = np.zeros(shape=(n_components, n_features))
 
    # X[:, [2,4]] : ['POI_sequence','l_sequence']
    for p, l_seq in X[:, [3,5]]:
        for cur_POI_num, cur_l_seq in zip(p, l_seq):
            cur_POI_num = int(cur_POI_num.replace('POI',''))
            emission_count[cur_POI_num][cur_l_seq] +=1
        

    for i in range(len(emission_count)):
        total = np.sum(emission_count[i])
        
         ## Laplace 스무딩적용
        emission_proba[i] = (emission_count[i] + smoothing_factor) / (total + n_features)

    return emission_proba


In [10]:
def _get_collection_data(mongo_uri, db_name, collection_name):
    client = pymongo.MongoClient(mongo_uri)

    db = client[db_name]
    collection = db[collection_name]

    data = list(collection.find({}, {"_id": 0}))

    client.close()

    return data


In [11]:
def get_trajectory(k):

    db_name = "total"

    trajectory_name = f"trajectory_total_cluster_{k}"
    data = _get_collection_data(MONGO_URI, db_name, trajectory_name)
 
    return pd.DataFrame(data)
    

# Top 3 

In [12]:
def get_next_POI_top3(hmm_model, user_trajectory):

    X = np.array(user_trajectory)


    ### Next POI를 붙여가며 max score를 가지는 POI 찾기
    # X_test의 한 row에 대해 0~N_COMPONENTS까지 추가한 새로운 row들
    can_sequences = [np.append(X, i) for i in range(hmm_model.n_features)]
    
    decodes = [hmm_model.decode(seq.reshape(-1,1)) for seq in can_sequences]

    sorted_decode = sorted(decodes, key=lambda x:x[0])[::-1]

    # 순서대로 1, 2, 3
    unique_top_3_indices = []
    for log_likelihood, hidden_state_seq in sorted_decode:
        recommendation_POI = hidden_state_seq[-1]
        if recommendation_POI not in unique_top_3_indices:
            unique_top_3_indices.append(recommendation_POI)
        
        if len(unique_top_3_indices) == 3:
            break
    return unique_top_3_indices


In [13]:
def top_n_accuracy(correct, predict, n):
    correct_at_n = sum(1 if c in predict[i][:n] else 0 for i, c in enumerate(correct))
    top_n_accuracy = correct_at_n / len(correct)
    return top_n_accuracy

In [15]:
total_result_df = pd.DataFrame(columns=['K', 'Top N accuracy'])

idx =0

for k in KS:      
    print(f"K : {k}")

    trajectory = get_trajectory(k)

    trajectory = df_str_to_list(trajectory)
    trajectory['l_sequence'] = [np.array(label_sequence(POI_sequence, time_period_sequence)) for POI_sequence, time_period_sequence in trajectory[['POI_sequence', 'time_period_sequence']].values]


    select_columns = ['trajectory_id', 'start_point', 'end_point', 'POI_sequence', 'time_period_sequence', 'l_sequence']
    X = np.array(trajectory[select_columns])

    X_train, X_test = train_test_split(X, test_size=0.3)

    # time_period 수
    TIME_PERIOD = 4
    # cluster 수 : Hidden State수
    n_components = k
    n_features = n_components * TIME_PERIOD


    # 모델 불러오기
    hmm_model = CategoricalHMM(n_components=n_components, n_features=n_features)    

    hmm_model.startprob_ = calculate_initial_probability(X_train, n_components)
    hmm_model.transmat_ = calculate_transition_probability(X_train, n_components)
    hmm_model.emissionprob_ = calculate_emission_probability(X_train, n_components, n_features)

    predict_next_point = []
    current_path_last_point = []


    for seq in X_test:
        removed_last_point_seq = seq[5][:-1]

        current_path_last_point.append(seq[5][-1]//4)
        
        next_point = get_next_POI_top3(hmm_model, removed_last_point_seq)

        predict_next_point.append(next_point)
        
    n_value = 3
    top_N = top_n_accuracy(current_path_last_point, predict_next_point, n_value)

    print("Top-N Accuracy:", top_N)

    total_result_df.loc[idx] = [k, top_N]
    idx += 1



K : 25


KeyboardInterrupt: 

In [None]:
total_result_df

Unnamed: 0,K,Top N accuracy,Precision,Recall
0,25.0,0.765996,0.255332,0.765996
1,50.0,0.740501,0.246834,0.740501
2,75.0,0.757363,0.252454,0.757363
3,100.0,0.724608,0.241536,0.724608


In [None]:
# total_result_df.to_csv("total_top3.csv", index=False)