In [None]:
!pip install datasets


In [None]:
from datasets import load_dataset
from transformers import BertTokenizer, BertForSequenceClassification, TrainingArguments, Trainer
import torch
from torch.utils.data import Dataset
import numpy as np
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import os
import re
from transformers import TrainerCallback
from sklearn.metrics import accuracy_score, f1_score, precision_score
torch.manual_seed(24)
os.environ["WANDB_DISABLED"] = "true"

#!pip install datasets

def remove_wallets(text):
    # This is a basic implementation - modify if your original remove_wallets was different
    # Common crypto wallet patterns (like Bitcoin/Ethereum addresses)
    wallet_pattern = r'0x[a-fA-F0-9]{40}|[13][a-km-zA-HJ-NP-Z1-9]{25,34}'
    return re.sub(wallet_pattern, '', text)

# Apply cleaning operations specified to the paper, https://github.com/mikik1234/CryptoBERT-LUKE/blob/main/CODE_Data_Collection.ipynb
def clean_text(text):
    # Remove Asian characters
    text = re.sub(r'[\u4e00-\u9fff]+', '', text)
    # Remove URLs
    text = re.sub(r'http\S+|www\S+', '', text)
    # Remove mentions, hashtags, stock symbols, and forward slashes with content
    text = re.sub(r'[@][A-Za-z0-9_]+|#[A-Za-z0-9_]+|$[A-Za-z0-9_ ]+|/[A-Za-z0-9_ ]+', '', text)
    # Remove RT prefix
    text = re.sub(r'RT : ', '', text)
    # Replace & with 'and'
    text = re.sub(r'&', 'and', text)
    # Handle special characters and quotes
    text = re.sub(r'â€™', '\'', text)
    text = re.sub(r'["&;]', '', text)
    text = re.sub(r'', '', text)  # Zero-width space
    # Remove .X or .x
    text = re.sub(r'\.[Xx]', '', text)
    # Normalize multiple dots to ellipsis
    text = re.sub(r'\.\.+', '...', text)
    # Remove standalone @ and pipe symbols
    text = re.sub(r'@|\|', '', text)
    # Normalize spaces
    text = re.sub(r'\s+', ' ', text).strip()
    # Convert to lowercase
    text = text.lower()
    # Remove wallet addresses
    text = remove_wallets(text)
    text = re.sub(r'^\s*\S+(?:\s+\S+){0,2}\s*$', '', text) # Remove short texts (fewer than 4 words)
    return text

def sentiment_map(text):
  if 'Bullish' in text:
    return 0
  elif 'Neutral' in text:
    return 1
  else:
    return 2

In [None]:
import torch
import numpy as np
import random

# Set random seeds for reproducibility
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# Create a generator for the split
generator = torch.Generator().manual_seed(seed)

In [None]:
data = load_dataset("StephanAkkerman/financial-tweets-crypto")
train_dataset_ori = data['train']

In [5]:
print(f'No. of data: {len(train_dataset_ori)}')
train_dataset_ori = train_dataset_ori.filter(lambda data: data['sentiment'] is not None)
print(f'No. of data after remove sentiment equals to none: {len(train_dataset_ori)}')
train_dataset_ori = train_dataset_ori.filter(lambda data: data['tweet_type']!='quote tweet')
print(f'No. of data after remove quote tweet: {len(train_dataset_ori)}')
train_dataset_ori = train_dataset_ori.filter(lambda data: len(data['description'].split(' '))>1)
print(f'No. of data after remove short text: {len(train_dataset_ori)}')
train_dataset_ori = train_dataset_ori.to_pandas()
train_dataset_ori['description'] = train_dataset_ori['description'].apply(clean_text)
train_dataset_ori.drop_duplicates(inplace=True, ignore_index=True)
print(f'No. of data after remove duplicates: {len(train_dataset_ori)}') # Make sure the records here remains the same after remove duplicates, else the following train test split might be different
train_dataset_ori['sentiment_label'] = train_dataset_ori['sentiment'].apply(sentiment_map)


No. of data: 57935


Filter:   0%|          | 0/57935 [00:00<?, ? examples/s]

No. of data after remove sentiment equals to none: 48692


Filter:   0%|          | 0/48692 [00:00<?, ? examples/s]

No. of data after remove quote tweet: 46866


Filter:   0%|          | 0/46866 [00:00<?, ? examples/s]

No. of data after remove short text: 45567
No. of data after remove duplicates: 45567


In [6]:
num_samples = len(train_dataset_ori)
# Create an array of indices
indices = np.arange(num_samples)

# Shuffle the indices randomly
np.random.seed(42)  # Set a seed for reproducibility
np.random.shuffle(indices)

# Split the indices into train, validation, and test sets
train_size = int(num_samples * 0.8)  # 80% for training
val_size = int(num_samples * 0.1)  # 10% for validation
test_size = num_samples - train_size - val_size  # 10% for testing

# Split the shuffled indices
train_idx = indices[:train_size]
val_idx = indices[train_size:train_size + val_size]
test_idx = indices[train_size + val_size:]

# Print the sizes of each split
print(f"Train size: {len(train_idx)}")
print(f"Validation size: {len(val_idx)}")
print(f"Test size: {len(test_idx)}")

Train size: 36453
Validation size: 4556
Test size: 4558


In [7]:
train_dataset = train_dataset_ori.loc[train_idx]
valid_dataset = train_dataset_ori.loc[val_idx]
test_dataset = train_dataset_ori.loc[test_idx]

In [8]:
# 2. Prepare the data
#sentiment_map = {'positive': 2, 'neutral': 1, 'negative': 0}  # Adjust based on your actual sentiment values

class TweetDataset(Dataset):
    def __init__(self, texts, encodings, labels):
        self.encodings = encodings
        self.labels = labels
        self.texts = texts

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        # print(self.texts[idx])
        item['text'] = self.texts[idx]
        # item['text'] = torch.tensor(self.texts[idx])
        return item

    def __len__(self):
        return len(self.labels)

model_name = "ElKulako/cryptobert"

# 3. Initialize tokenizer
#tokenizer = BertTokenizer.from_pretrained('ElKulako/cryptobert')
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)

# 6. Create dataset
encodings = tokenizer(train_dataset['description'].to_list(),
                      truncation=True, padding="max_length", max_length=128)
train_dataset = TweetDataset(train_dataset['description'].to_list(), encodings, train_dataset['sentiment_label'].to_list())
encodings = tokenizer(valid_dataset['description'].to_list(),
                      truncation=True, padding="max_length", max_length=128)
val_dataset = TweetDataset(valid_dataset['description'].to_list(), encodings, valid_dataset['sentiment_label'].to_list())
encodings = tokenizer(test_dataset['description'].to_list(),
                      truncation=True, padding="max_length", max_length=128)
test_dataset = TweetDataset(test_dataset['description'].to_list(), encodings, test_dataset['sentiment_label'].to_list())


tokenizer_config.json:   0%|          | 0.00/1.35k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/798k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.11M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/957 [00:00<?, ?B/s]

In [9]:
from torch.utils.data import DataLoader

# Set your desired batch size
batch_size = 128

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [10]:
import torch
import torch.nn as nn

class BiLSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes, num_layers=1, dropout=0.5, bidirectional=True):
        super(BiLSTMClassifier, self).__init__()
        # Set padding_idx to ensure the embedding for padding tokens remains constant (if your tokenizer defines one)
        padding_idx = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else 0
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=padding_idx)

        self.lstm = nn.LSTM(
            embed_dim,
            hidden_dim,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0,
            bidirectional=bidirectional
        )

        # If using bidirectional LSTM, the final hidden size will be doubled.
        lstm_output_dim = hidden_dim * 2 if bidirectional else hidden_dim

        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(lstm_output_dim, num_classes)

    def forward(self, input_ids):
        # input_ids shape: (batch_size, seq_length)
        embedded = self.embedding(input_ids)  # (batch_size, seq_length, embed_dim)

        # Pass through LSTM
        # You could also use pack_padded_sequence here if you wish to handle variable lengths.
        lstm_out, (h_n, c_n) = self.lstm(embedded)

        # Concatenate the final forward and backward hidden states
        if self.lstm.bidirectional:
            h_n_cat = torch.cat((h_n[-2], h_n[-1]), dim=1)  # shape: (batch_size, hidden_dim*2)
        else:
            h_n_cat = h_n[-1]  # shape: (batch_size, hidden_dim)

        dropped = self.dropout(h_n_cat)
        logits = self.fc(dropped)  # shape: (batch_size, num_classes)
        return logits


In [11]:
vocab_size = tokenizer.vocab_size
embed_dim = 128     # Dimension for word embeddings; experiment as necessary
hidden_dim = 256    # Size of LSTM hidden states
num_classes = 3     # According to your sentiment_map (0: Bullish, 1: Neutral, 2: Bearish)
num_layers = 2      # You may experiment with deeper networks
dropout = 0.3
bidirectional = True

model = BiLSTMClassifier(vocab_size, embed_dim, hidden_dim, num_classes, num_layers, dropout, bidirectional)


In [12]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)  # Adjust learning rate as needed


In [13]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)


BiLSTMClassifier(
  (embedding): Embedding(50265, 128, padding_idx=1)
  (lstm): LSTM(128, 256, num_layers=2, batch_first=True, dropout=0.3, bidirectional=True)
  (dropout): Dropout(p=0.3, inplace=False)
  (fc): Linear(in_features=512, out_features=3, bias=True)
)

In [14]:
def count_trainable_params(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Assuming your model is defined as `model`
print(f"Number of trainable parameters: {count_trainable_params(model)}")


Number of trainable parameters: 8802947


In [16]:
from sklearn.metrics import accuracy_score, f1_score, precision_score
import numpy as np

def evaluate(model, data_loader, criterion):
    model.eval()
    total_loss = 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in data_loader:
            # Move inputs and labels to the device
            input_ids = batch['input_ids'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids)
            loss = criterion(outputs, labels)
            total_loss += loss.item()

            preds = torch.argmax(outputs, dim=1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    avg_loss = total_loss / len(data_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')
    precision = precision_score(all_labels, all_preds, average='weighted')

    return avg_loss, accuracy, f1, precision

num_epochs = 5  # Adjust as needed

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch in train_dataloader:
        optimizer.zero_grad()

        input_ids = batch['input_ids'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(input_ids)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_train_loss = total_loss / len(train_dataloader)
    val_loss, val_acc, val_f1, val_precision = evaluate(model, val_dataloader, criterion)

    print(f"Epoch {epoch+1}/{num_epochs}:")
    print(f"  Train Loss: {avg_train_loss:.4f}")
    print(f"  Val Loss:   {val_loss:.4f} | Accuracy: {val_acc:.4f} | F1: {val_f1:.4f} | Precision: {val_precision:.4f}")


Epoch 1/5:
  Train Loss: 0.7261
  Val Loss:   0.7148 | Accuracy: 0.6811 | F1: 0.5598 | Precision: 0.6658
Epoch 2/5:
  Train Loss: 0.6343
  Val Loss:   0.6952 | Accuracy: 0.6978 | F1: 0.5678 | Precision: 0.6909
Epoch 3/5:
  Train Loss: 0.5552
  Val Loss:   0.7214 | Accuracy: 0.7098 | F1: 0.6117 | Precision: 0.7030
Epoch 4/5:
  Train Loss: 0.4728
  Val Loss:   0.7430 | Accuracy: 0.7133 | F1: 0.6338 | Precision: 0.7037
Epoch 5/5:
  Train Loss: 0.3860
  Val Loss:   0.7824 | Accuracy: 0.7094 | F1: 0.6360 | Precision: 0.7052


In [17]:
test_loss, test_acc, test_f1, test_precision = evaluate(model, test_dataloader, criterion)
print("Test Metrics:")
print(f"  Loss: {test_loss:.4f}")
print(f"  Accuracy: {test_acc:.4f}")
print(f"  F1 Score: {test_f1:.4f}")
print(f"  Precision: {test_precision:.4f}")


Test Metrics:
  Loss: 0.7618
  Accuracy: 0.7065
  F1 Score: 0.6288
  Precision: 0.7035


In [18]:
import torch
from sklearn.metrics import accuracy_score, precision_score, f1_score

def evaluate_model(model, dataloader, device):
    model.eval()  # Set the model to evaluation mode
    all_preds = []
    all_labels = []

    # Disable gradient calculation for efficiency
    with torch.no_grad():
        for batch in dataloader:
            # Move inputs and labels to the selected device
            input_ids = batch['input_ids'].to(device)
            labels = batch['labels'].to(device)

            # Forward pass through the model
            outputs = model(input_ids)
            # Get the predicted class (index) for each batch sample
            preds = torch.argmax(outputs, dim=1)

            # Collect results for the entire test set
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Calculate the overall accuracy
    acc = accuracy_score(all_labels, all_preds)
    # Calculate the precision for each class (order: 0: Bullish, 1: Neutral, 2: Bearish)
    precisions = precision_score(all_labels, all_preds, average=None, zero_division=0)
    # Calculate the weighted F1-score
    f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)

    print("Evaluation Metrics:")
    print(f"Precision (Bullish): {precisions[0]:.4f}")
    print(f"Precision (Neutral): {precisions[1]:.4f}")
    print(f"Precision (Bearish): {precisions[2]:.4f}")
    print(f"Accuracy:          {acc:.4f}")
    print(f"F1-Score:          {f1:.4f}")

# Assuming `model` is your trained BiLSTM model, `test_dataloader` is defined, and `device` is set:
evaluate_model(model, test_dataloader, device)


Evaluation Metrics:
Precision (Bullish): 0.7800
Precision (Neutral): 0.5589
Precision (Bearish): 0.5983
Accuracy:          0.7065
F1-Score:          0.6288
