# Trading Signal Generation with KNN

In [1]:
# Core Libraries
import pandas as pd
import numpy as np
import re

# Text Processing
import spacy
from transformers import AutoTokenizer, AutoModel

# Machine Learning
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

# Visualization
import seaborn as sns
import matplotlib.pyplot as plt
import tqdm as notebook_tqdm

# Model Saving
import joblib

  from .autonotebook import tqdm as notebook_tqdm


## Preprocessing

In [2]:
file_path1 = "data/processed/labeled_january_data.csv"
with open(file_path1, "r", encoding="utf-8") as file:
    df_jan = pd.read_csv(file)

file_path2 = "data/processed/labeled_february_data.csv"
with open(file_path2, "r", encoding="utf-8") as file:
    df_feb = pd.read_csv(file)

file_path3 = "data/processed/labeled_march_data.csv"
with open(file_path3, "r", encoding="utf-8") as file:
    df_march = pd.read_csv(file)

FileNotFoundError: [Errno 2] No such file or directory: 'data/processed/labeled_january_data.csv'

In [8]:
import re

def normalize_numbers(text):
    # Replace monetary values (e.g., "R$ 15 bilhões" → "15B")
    text = re.sub(r"R\$ ?([\d.,]+) bilhões", r"\1B", text)
    text = re.sub(r"R\$ ?([\d.,]+) milhões", r"\1M", text)
    # Standardize percentages (e.g., "0.5 pp" → "0.5%")
    text = re.sub(r"([\d.,]+) pp", r"\1%", text)
    # Standardize plain numbers (e.g., "15,000" → "15000")
    text = text.replace(",", "")
    return text

In [9]:
def remove_noise(text):
    # Remove dates and times
    text = re.sub(r"\b\d{1,2}[hH]\d{2}\b", "", text)  # Times like "17h20"
    text = re.sub(r"\b\d{1,2}ª[Ff]\b", "", text)       # Ordinals like "2ªF"
    # Remove redundant words
    text = re.sub(r"MAIS AGENDA|LÁ FORA|A BOLSA ESTÁ CARA", "", text, flags=re.IGNORECASE)
    text = re.sub(r"\s+", " ", text).strip()
    return text

In [10]:
# Comprehensive dictionary for expanding acronyms
acronyms = {
    "Selic": "Sistema Especial de Liquidação e de Custódia",
    "PIB": "Produto Interno Bruto",
    "CDI": "Certificado de Depósito Interbancário",
    "LPRs": "Loan Prime Rates",
    "Ibovespa": "Índice Bovespa",
    "BB": "Banco do Brasil",
    "BC": "Banco Central",
    "FGTS": "Fundo de Garantia do Tempo de Serviço",
    "STF": "Supremo Tribunal Federal",
    "CPI": "Índice de Preços ao Consumidor",
    "MP": "Medida Provisória",
    "EUA": "Estados Unidos",
    "ONU": "Organização das Nações Unidas",
    "FGV": "Fundação Getúlio Vargas",
    "IBGE": "Instituto Brasileiro de Geografia e Estatística",
    "BNDES": "Banco Nacional de Desenvolvimento Econômico e Social",
    "IPCA": "Índice Nacional de Preços ao Consumidor Amplo",
    "DI": "Depósito Interfinanceiro",
    "IR": "Imposto de Renda",
    "OI": "Operadora Oi",
    "CV": "Câmara de Vereadores"
}

noisy_acronyms = {"ROMI", "ENEVA", "LIGHT", "DA"}

def expand_acronyms(text, acronym_dict):
    for acronym, full_form in acronym_dict.items():
        text = re.sub(rf'\b{re.escape(acronym)}\b', full_form, text, flags=re.IGNORECASE)
    return text

def remove_noisy_acronyms(text, noisy_set):
    return re.sub(r'\b(?:' + '|'.join(noisy_set) + r')\b', '', text)

In [11]:
import spacy

# Load Portuguese spaCy model
nlp = spacy.load("pt_core_news_sm")

def lemmatize_text(text):
    doc = nlp(text)
    return " ".join([token.lemma_ for token in doc if not token.is_stop and token.is_alpha])

In [12]:
def preprocess_text(text):
    text = normalize_numbers(text)         
    text = expand_acronyms(text, acronyms)
    text = remove_noisy_acronyms(text, noisy_acronyms)
    text = lemmatize_text(text)          
    text = re.sub(r'\s+', ' ', text).strip()
    text = text.lower()
    return text

# Apply preprocessing to the dataset
df_jan['cleaned_article'] = df_jan['article'].apply(
    lambda x: pd.Series(preprocess_text(x))
)
df_feb['cleaned_article'] = df_feb['article'].apply(
    lambda x: pd.Series(preprocess_text(x))
)
df_march['cleaned_article'] = df_march['article'].apply(
    lambda x: pd.Series(preprocess_text(x))
)

In [None]:
print(df_jan['label'].value_counts())
sns.countplot(x='label', data=df_jan)
plt.title('Label Distribution in January Dataset')
plt.savefig("results/bert_embeddings_experiment_v1/figures/label_distribution_january.png")

In [None]:
print(df_feb['label'].value_counts())
sns.countplot(x='label', data=df_feb)
plt.title('Label Distribution in February Dataset')
plt.savefig("results/bert_embeddings_experiment_v1/figures/label_distribution_february.png")

In [None]:
print(df_march['label'].value_counts())
sns.countplot(x='label', data=df_march)
plt.title('Label Distribution in March Dataset')
plt.savefig("results/bert_embeddings_experiment_v1/figures/label_distribution_march.png")

In [13]:
# Load BERTimbau tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("neuralmind/bert-base-portuguese-cased")
model = AutoModel.from_pretrained("neuralmind/bert-base-portuguese-cased")

In [14]:
from transformers import AutoTokenizer, AutoModel
import torch

def get_bert_embedding(text, tokenizer, model):
    # tokenize input text
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)

    # pass inputs through model
    with torch.no_grad():
        outputs = model(**inputs)

    # Extract [CLS] token embedding (shape: [batch_size, hidden_size])
    cls_embedding = outputs.last_hidden_state[:, 0, :]  # [CLS] token is the first token
    return cls_embedding.squeeze(0).numpy()  # convert to NumPy array

In [15]:
df_jan['embedding'] = df_jan['cleaned_article'].apply(lambda x: get_bert_embedding(x, tokenizer, model))
df_feb['embedding'] = df_feb['cleaned_article'].apply(lambda x: get_bert_embedding(x, tokenizer, model))
df_march['embedding'] = df_march['cleaned_article'].apply(lambda x: get_bert_embedding(x, tokenizer, model))

## KNN Classification

In [21]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

def plot_confusion_matrix(cm, labels, title):
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap="Blues", xticklabels=labels, yticklabels=labels)
    plt.title(title)
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.show()

def knn_binary_classification(df_train, df_test, k=5):
    df_train_binary = df_train[df_train['label'] != 0]
    df_test_binary = df_test[df_test['label'] != 0]
    
    X_train = np.vstack(df_train_binary['embedding'].values)
    y_train = df_train_binary['label']
    X_test = np.vstack(df_test_binary['embedding'].values)
    y_test = df_test_binary['label']
    
    knn = KNeighborsClassifier(n_neighbors=k, metric='euclidean')
    knn.fit(X_train, y_train)
    
    y_pred = knn.predict(X_test)
    print("Binary Classification Report:")
    print(classification_report(y_test, y_pred))
    print("Accuracy:", accuracy_score(y_test, y_pred))
    
    cm = confusion_matrix(y_test, y_pred, labels=[-1, 1])
    plot_confusion_matrix(cm, labels=[-1, 1], title="Confusion Matrix: Binary Classification")

def knn_multi_classification(df_train, df_test, k=5):
    X_train = np.vstack(df_train['embedding'].values)
    y_train = df_train['label']
    X_test = np.vstack(df_test['embedding'].values)
    y_test = df_test['label']
    
    knn = KNeighborsClassifier(n_neighbors=k, metric='euclidean')
    knn.fit(X_train, y_train)
    
    y_pred = knn.predict(X_test)
    print("Multi-class Classification Report:")
    print(classification_report(y_test, y_pred))
    print("Accuracy:", accuracy_score(y_test, y_pred))
    
    cm = confusion_matrix(y_test, y_pred, labels=[-1, 0, 1])
    plot_confusion_matrix(cm, labels=[-1, 0, 1], title="Confusion Matrix: Multi-class Classification")

df_train = pd.concat([df_jan, df_feb])

In [None]:
print("Binary Classification (KNN):")
knn_binary_classification(df_train, df_march, k=5)

print("\nMulti-class Classification (KNN):")
knn_multi_classification(df_train, df_march, k=5)