In [1]:
# Scrape Twitter tweets with snscrape
import snscrape.modules.twitter as sntwitter

query = "#bitcoin"
tweets = []
limit = 100

for tweet in sntwitter.TwitterSearchScraper(query).get_items():

    if len(tweets) == limit:
        break
    else:
        if tweet.lang=='en':
            tweets.append(str(tweet.rawContent))

In [2]:
# Store the tweets in a dataframe
import pandas as pd

df = pd.DataFrame(tweets, columns=['Tweet'])

df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 1 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   Tweet   100 non-null    object
dtypes: object(1)
memory usage: 928.0+ bytes


In [3]:
df.head()

Unnamed: 0,Tweet
0,🥳 FBBank Giveaway ！💰\n\n🏆 Prize Pool : $200 wo...
1,Join Mega Airdrop by @Coinstages x @TokenBrics...
2,Orangepill fail today by me. \n\nI got a colle...
3,"- \nBTC price: $24,672 / £20,616 \n\n40.53 Nak..."
4,San Francisco federal bank eyes CBDC system de...


In [4]:
# Set up the model environment
import torch, torchtext, torchdata
from torch import nn
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

SEED = 1234
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

from torchtext.datasets import SST2
train = SST2(split='train')

tokenizer = get_tokenizer('spacy', language='en_core_web_sm')

def yield_tokens(data_iter):
    for text, _ in data_iter:
        yield tokenizer(text)
        
vocab = build_vocab_from_iterator(yield_tokens(train),
                                  specials=['<unk>','<pad>','<bos>','<eos>'])

vocab.set_default_index(vocab['<unk>'])

text_pipeline  = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: int(x) - 1

pad_idx = vocab['<pad>']

class LSTM(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, output_dim, num_layers, bidirectional, dropout):
        super().__init__()
        #put padding_idx so asking the embedding layer to ignore padding
        self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=pad_idx)
        self.lstm = nn.LSTM(emb_dim, 
                           hid_dim, 
                           num_layers=num_layers, 
                           bidirectional=bidirectional, 
                           dropout=dropout,
                           batch_first=True)
        self.fc = nn.Linear(hid_dim * 2, output_dim)
        
    def forward(self, text, text_lengths):
        #text = [batch size, seq len]
        embedded = self.embedding(text)
        
        #++ pack sequence ++
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, text_lengths.to('cpu'), enforce_sorted=False, batch_first=True)
        
        #embedded = [batch size, seq len, embed dim]
        packed_output, (hn, cn) = self.lstm(packed_embedded)  #if no h0, all zeroes
        
        #++ unpack in case we need to use it ++
        output, output_lengths = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        
        #output = [batch size, seq len, hidden dim * num directions]
        #output over padding tokens are zero tensors
        
        #hidden = [num layers * num directions, batch size, hid dim]
        #cell = [num layers * num directions, batch size, hid dim]
        
        #concat the final forward (hidden[-2,:,:]) and backward (hidden[-1,:,:]) hidden layers
        hn = torch.cat((hn[-2,:,:], hn[-1,:,:]), dim = 1)
        #hn = [batch size, hidden dim * num directions]
        
        return self.fc(hn)

input_dim  = len(vocab)
hid_dim    = 256
emb_dim    = 300
output_dim = 2
num_layers = 2
bidirectional = True
dropout = 0.5

model = LSTM(input_dim, emb_dim, hid_dim, output_dim, num_layers, bidirectional, dropout).to(device)

In [5]:
# Load the pretrained LSTM model
save_path = f'models/{model.__class__.__name__}_SST2.pt'

model.load_state_dict(torch.load(save_path, map_location=torch.device(device)))

<All keys matched successfully>

In [6]:
# Classify the sentiments of the tweets
def predict(text, text_length):
    with torch.no_grad():
        output = model(text, text_length).squeeze(1)
        predicted = torch.max(output.data, 1)[1]
        return predicted

sentiment = []

for tweet in tweets:
    tweet = torch.tensor(text_pipeline(tweet))
    tweet = tweet.reshape(1, -1)
    tweet_length = torch.tensor([tweet.size(1)]).to(dtype=torch.int64)
    prediction = str(int(predict(tweet, tweet_length))).strip()
    sentiment.append(prediction)

In [7]:
# Store the sentiments in the dataframe
df['Sentiment'] = sentiment

In [8]:
df.head()

Unnamed: 0,Tweet,Sentiment
0,🥳 FBBank Giveaway ！💰\n\n🏆 Prize Pool : $200 wo...,0
1,Join Mega Airdrop by @Coinstages x @TokenBrics...,0
2,Orangepill fail today by me. \n\nI got a colle...,0
3,"- \nBTC price: $24,672 / £20,616 \n\n40.53 Nak...",0
4,San Francisco federal bank eyes CBDC system de...,0


In [9]:
df.Sentiment.value_counts

<bound method IndexOpsMixin.value_counts of 0     0
1     0
2     0
3     0
4     0
     ..
95    0
96    0
97    0
98    0
99    0
Name: Sentiment, Length: 100, dtype: object>