In [1]:
import os
import datetime
import pickle
import numpy as np
from numpy import ndarray
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.base import ClassifierMixin
import pickle
from tool.random_forest import MMRandomForest

# Basic initialization
result_dir = "Results"
# models = ["svm", "logistic", "randomforest", "decisiontree"]
models = ["mmrandomforest"]
report_time = datetime.datetime.now()
report_file = os.path.join(result_dir, f"report_{report_time.timestamp()}.md")

# Setup the report output file
os.makedirs(result_dir, exist_ok=True)
with open(report_file, "w") as f:
  f.write(f"# Report generated at {report_time.strftime('%Y-%m-%d %H:%M:%S')}\n")

ImportError: cannot import name 'accuracy_score' from 'sklearn.base' (/home/mcmerdith/anaconda3/envs/cisc484/lib/python3.12/site-packages/sklearn/base.py)

In [None]:
start_date = "20201117"
end_date = "20210520"
start_date_datetime = datetime.datetime.strptime(start_date, "%Y%m%d")
end_date_datetime = datetime.datetime.strptime(end_date, "%Y%m%d")
proc_date = start_date_datetime
duration = 300  # t

data_check_list = os.listdir("Data/")
data_check_dic = {i: 1 for i in data_check_list}


dataset_X = []
dataset_y = []

for _ in range(duration):
    # process the data in this date
    proc_date_str = proc_date.strftime("%Y-%m-%d")

    input_data_tmp_path = "Tmp/" + proc_date_str + "/"
    input_data_label_path = "Label/" + proc_date_str + "/"
    output_data_folder_path = "Label/All/"
    output_model_folder_path = "Model/"

    if not proc_date_str in data_check_dic.keys():
        proc_date = proc_date + datetime.timedelta(days=1)
        if proc_date == end_date_datetime:
            break
        continue

    if not os.path.exists(output_data_folder_path):
        os.makedirs(output_data_folder_path)

    if not os.path.exists(output_model_folder_path):
        os.makedirs(output_model_folder_path)

    input_data_feature = input_data_tmp_path + "tweet_feature"
    input_data_label = input_data_label_path + "labeled_tweets.txt"

    label_dic = {}

    with open(
        input_data_label, "r", encoding="utf-8", errors="ignore"
    ) as file_label_in:
        for label_line in file_label_in:
            label_line_split = label_line.strip().split("\t")
            label_tweet_id = label_line_split[0]
            label_dic[label_tweet_id] = int(label_line_split[-1])

    with open(
        input_data_feature, "r", encoding="utf-8", errors="ignore"
    ) as file_feature_in:
        for feature_line in file_feature_in:
            feature_line_split = feature_line.strip().split("\t")
            feature_tweet_id = feature_line_split[0]
            if feature_tweet_id in label_dic.keys():
                dataset_X.append([float(i) for i in feature_line_split[2:-1]])
                dataset_y.append(label_dic[feature_tweet_id])
    #     print(label_dic)

    proc_date = proc_date + datetime.timedelta(days=1)
    if proc_date == end_date_datetime:
        break

In [None]:
# Split data
import random


dataset_X = np.array(dataset_X)
dataset_y = np.array(dataset_y)

scaler = MinMaxScaler()
dataset_X = scaler.fit_transform(dataset_X)

data_train, data_test, labels_train, labels_test = train_test_split(
    dataset_X, dataset_y, test_size=0.20, random_state=42
)

In [None]:
for model_type in models:
  # Initialize the classifier
  if model_type == "svm":
    model_inst = SVC()
  elif model_type == "logistic":
    model_inst = LogisticRegression()
  elif model_type == "randomforest":
    model_inst = RandomForestClassifier()
  elif model_type == "decisiontree":
    model_inst = DecisionTreeClassifier()
  elif model_type == "mmrandomforest":
    model_inst = MMRandomForest()
  else:
    raise ValueError("Invalid model type")
  
  model = model_inst

  # Fit the classifier to your data
  model.fit(data_train, labels_train)

  # Calculate and print the training accuracy
  train_accuracy = model.score(data_train, labels_train)
  
  # Calculate test accuracy
  test_accuracy = model.score(data_test, labels_test)

  with open(report_file, "a") as f:
    f.writelines([
      f"## {model_type}\n\n",
      f"Training accuracy: {train_accuracy}\n",
      f"Test accuracy: {test_accuracy}\n",
      "\n"
    ])

  # Save the trained model
  save_model_path = output_model_folder_path + model_type + "_model.p"
  pickle.dump(model, open(save_model_path, "wb"))

  # Save the dataset
  save_data_path = output_data_folder_path + model_type + "dataset.p"
  pickle.dump([data_train, labels_train], open(save_data_path, "wb"))

In [None]:
output_data_folder_path = "Label/All/"
output_model_folder_path = "Model/"

for model_type in models:
  load_model_path = output_model_folder_path + model_type + "_model.p"
  loaded_model: ClassifierMixin = pickle.load(open(load_model_path, "rb"))

  load_data_path = output_data_folder_path + model_type + "dataset.p"
  dataset_X: ndarray
  dataset_y: ndarray
  dataset_X, dataset_y = pickle.load(open(load_data_path, "rb"))

  result = loaded_model.score(data_test, labels_test)

  predicted_labels = loaded_model.predict(data_test)

  accuracy = accuracy_score(labels_test, predicted_labels)
  precision = precision_score(labels_test, predicted_labels)
  recall = recall_score(labels_test, predicted_labels)

  with open(report_file, "a") as f:
    f.writelines([
      f"## {model_type.capitalize()}\n\n",
      f"Accuracy: {accuracy}\n",
      f"Precision: {precision}\n",
      f"Recall: {recall}\n",
      "\n",
    ])