##Text Cleaning

![Text Cleaning](https://zotyag.github.io/Uni/INLP/Images/tisztitas.webp)

Dataset
https://huggingface.co/datasets/zeroshot/twitter-financial-news-sentiment


###Importing dataset

In [None]:
import pandas as pd

splits = {'train': 'sent_train.csv', 'validation': 'sent_valid.csv'}
df = pd.read_csv("hf://datasets/zeroshot/twitter-financial-news-sentiment/" + splits["train"])
vf= pd.read_csv("hf://datasets/zeroshot/twitter-financial-news-sentiment/" + splits["validation"])
df = pd.concat([df, vf])
df.head(10)

Unnamed: 0,text,label
0,$BYND - JPMorgan reels in expectations on Beyo...,0
1,$CCL $RCL - Nomura points to bookings weakness...,0
2,"$CX - Cemex cut at Credit Suisse, J.P. Morgan ...",0
3,$ESS: BTIG Research cuts to Neutral https://t....,0
4,$FNKO - Funko slides after Piper Jaffray PT cu...,0
5,$FTI - TechnipFMC downgraded at Berenberg but ...,0
6,$GM - GM loses a bull https://t.co/tdUfG5HbXy,0
7,$GM: Deutsche Bank cuts to Hold https://t.co/7...,0
8,$GTT: Cowen cuts to Market Perform,0
9,$HNHAF $HNHPD $AAPL - Trendforce cuts iPhone e...,0


###Removing URLs

In [None]:
# prompt: Remove any URLs from the data df["text"]

import re

def remove_urls(text):
    url_pattern = re.compile(r'https?://\S+|www\.\S+')
    return url_pattern.sub(r'', text)

df["text"] = df["text"].apply(lambda text: remove_urls(text))
df.head(10)

Unnamed: 0,text,label
0,$BYND - JPMorgan reels in expectations on Beyo...,0
1,$CCL $RCL - Nomura points to bookings weakness...,0
2,"$CX - Cemex cut at Credit Suisse, J.P. Morgan ...",0
3,$ESS: BTIG Research cuts to Neutral,0
4,$FNKO - Funko slides after Piper Jaffray PT cut,0
5,$FTI - TechnipFMC downgraded at Berenberg but ...,0
6,$GM - GM loses a bull,0
7,$GM: Deutsche Bank cuts to Hold,0
8,$GTT: Cowen cuts to Market Perform,0
9,$HNHAF $HNHPD $AAPL - Trendforce cuts iPhone e...,0


In [None]:
# prompt: Search the df["text"] for any URLs

import pandas as pd
import re

# Assuming df and remove_urls are defined as in the previous code

def find_urls(text):
    url_pattern = re.compile(r'https?://\S+|www\.\S+')
    urls = url_pattern.findall(text)
    return urls

df["urls"] = df["text"].apply(lambda text: find_urls(text))

# Display rows where URLs were found
print(df[df["urls"].apply(lambda x: len(x) > 0)])

Empty DataFrame
Columns: [text, label, urls]
Index: []


###Lowercasing

In [None]:
# prompt: Apply the lower() function to df["text"]

df["text"] = df["text"].str.lower()
df.head(10)

Unnamed: 0,text,label,urls
0,$bynd - jpmorgan reels in expectations on beyo...,0,[]
1,$ccl $rcl - nomura points to bookings weakness...,0,[]
2,"$cx - cemex cut at credit suisse, j.p. morgan ...",0,[]
3,$ess: btig research cuts to neutral,0,[]
4,$fnko - funko slides after piper jaffray pt cut,0,[]
5,$fti - technipfmc downgraded at berenberg but ...,0,[]
6,$gm - gm loses a bull,0,[]
7,$gm: deutsche bank cuts to hold,0,[]
8,$gtt: cowen cuts to market perform,0,[]
9,$hnhaf $hnhpd $aapl - trendforce cuts iphone e...,0,[]


In [None]:
# prompt: Test the df["text"] for uppercase letters

import pandas as pd
import re

# Assuming df is already loaded and processed as in the previous code

def has_uppercase(text):
    return any(c.isupper() for c in text)

df["has_uppercase"] = df["text"].apply(has_uppercase)
print(df[df["has_uppercase"]])

Empty DataFrame
Columns: [text, label, urls, has_uppercase]
Index: []


###Expanding Contractions

In [None]:
!pip install contractions



In [None]:
# prompt: Expand contractions in df["text"]


import pandas as pd
import re
import contractions

# ... (previous code)

def expand_contractions(text):
    return contractions.fix(text)

df["text"] = df["text"].apply(expand_contractions)
df.head(10)

Unnamed: 0,text,label,urls,has_uppercase
0,$bynd - jpmorgan reels in expectations on beyo...,0,[],False
1,$ccl $rcl - nomura points to bookings weakness...,0,[],False
2,"$cx - cemex cut at credit suisse, j.p. morgan ...",0,[],False
3,$ess: btig research cuts to neutral,0,[],False
4,$fnko - funko slides after piper jaffray pt cut,0,[],False
5,$fti - technipfmc downgraded at berenberg but ...,0,[],False
6,$gm - gm loses a bull,0,[],False
7,$gm: deutsche bank cuts to hold,0,[],False
8,$gtt: cowen cuts to market perform,0,[],False
9,$hnhaf $hnhpd $aapl - trendforce cuts iphone e...,0,[],False


###Removing ' character

In [None]:
# prompt: Check df["text"] if it contains any ' characters

def contains_single_quotes(text):
  return "'" in text

df["contains_single_quotes"] = df["text"].apply(contains_single_quotes)
print(df[df["contains_single_quotes"]])

                                                   text  label urls  \
10             $hog - moody's warns on harley-davidson       0   []   
25    $vclt $splb $iglb - guggenheim's minerd sees m...      0   []   
33    analysts question silicon motion's margin outlook      0   []   
51    children's place downgraded as e-commerce puts...      0   []   
52    children's place price target cut to $80 from ...      0   []   
...                                                 ...    ...  ...   
2362  splunk sees its biggest surge in a year after ...      1   []   
2366  tesla's stock ticks up after deutsche bank lif...      1   []   
2369  unisys's stock soars 16% premarket after sale ...      1   []   
2371          $scanx: today's biggest % gainers/losers       2   []   
2385  stocks making the biggest moves premarket: hom...      2   []   

      has_uppercase  contains_single_quotes  
10            False                    True  
25            False                    True  
33       

###Removing Punctuation

In [None]:
# prompt: Remove all punctuation from df["text"]

import string

def remove_punctuation(text):
  return text.translate(str.maketrans('', '', string.punctuation))

df["text"] = df["text"].apply(remove_punctuation)

In [None]:
# prompt: test df["text"] for punctuation

def has_punctuation(text):
    return any(c in string.punctuation for c in text)

df["has_punctuation"] = df["text"].apply(has_punctuation)
print(df[df["has_punctuation"]])

Empty DataFrame
Columns: [text, label, urls, has_uppercase, contains_single_quotes, has_punctuation]
Index: []


###Removing Stopwords

In [None]:
# prompt: Remove all stopwords from df["text"]

!pip install nltk

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords

stop_words = set(stopwords.words('english'))

def remove_stopwords(text):
    words = text.split()
    filtered_words = [word for word in words if word.lower() not in stop_words]
    return " ".join(filtered_words)

df["text"] = df["text"].apply(remove_stopwords)



[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
# prompt: Test df["text"] for stopwords

def contains_stopwords(text):
    words = text.split()
    for word in words:
        if word.lower() in stop_words:
            return True
    return False

df["contains_stopwords"] = df["text"].apply(contains_stopwords)
print(df[df["contains_stopwords"]])

Empty DataFrame
Columns: [text, label, urls, has_uppercase, contains_single_quotes, has_punctuation, contains_stopwords]
Index: []


###Stemming

In [None]:
# prompt: Apply stemming on df["text"]

from nltk.stem import PorterStemmer

nltk.download('punkt_tab') # Download punkt if not already downloaded

stemmer = PorterStemmer()

def stem_text(text):
    words = nltk.word_tokenize(text)
    stemmed_words = [stemmer.stem(word) for word in words]
    return " ".join(stemmed_words)

df["text"] = df["text"].apply(stem_text)

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


In [None]:
# prompt: write code to check if the stemming was succesful

# Check if stemming was successful by comparing original and stemmed text
# (This is a basic check; more sophisticated methods might be needed
# depending on your specific stemming goals.)

def compare_original_stemmed(original_text, stemmed_text):
    original_words = nltk.word_tokenize(original_text)
    stemmed_words = nltk.word_tokenize(stemmed_text)

    #Check if any word changed after stemming
    return original_words != stemmed_words

# Assuming you have a backup of the original text column
# e.g. df["original_text"]
# Replace "original_text" with your original text column name

if 'text' not in df.columns:
  print("Warning: 'original_text' column not found. Please create a backup of your original text before stemming")

else:
  df['stemming_applied'] = df.apply(lambda row: compare_original_stemmed(row['text'], row['text']), axis=1)
  print(df[df['stemming_applied']])

  #Example: count how many rows had any stemming applied
  print(f"Number of rows where stemming was applied: {df['stemming_applied'].sum()}")

#df.head(10)

Empty DataFrame
Columns: [text, label, urls, has_uppercase, contains_single_quotes, has_punctuation, contains_stopwords, stemming_applied]
Index: []
Number of rows where stemming was applied: 0


###Handling Whitespaces

In [None]:
# prompt: Remove all unnecessary whitespace characters from df["text"]

def remove_whitespace(text):
    return " ".join(text.split())

df["text"] = df["text"].apply(remove_whitespace)

In [None]:
# prompt: test the df["text"] if it contains any extra whitespaces

def has_extra_whitespace(text):
    return "  " in text

df["has_extra_whitespace"] = df["text"].apply(has_extra_whitespace)
print(df[df["has_extra_whitespace"]])

Empty DataFrame
Columns: [text, label, urls, has_uppercase, contains_single_quotes, has_punctuation, contains_stopwords, stemming_applied, has_extra_whitespace]
Index: []


###Removing Emoticons

In [None]:
# prompt: remove all emojis from a string

import re

def remove_emojis(text):
    emoji_pattern = re.compile("["
                           u"\U0001F600-\U0001F64F"  # emoticons
                           u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                           u"\U0001F680-\U0001F6FF"  # transport & map symbols
                           u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           u"\U00002702-\U000027B0"
                           u"\U000024C2-\U0001F251"
                           "]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', text)

df["text"] = df["text"].apply(remove_emojis)

In [None]:
# prompt: test the a string if it contains any emojis

def contains_emojis(text):
    emoji_pattern = re.compile("["
                           u"\U0001F600-\U0001F64F"  # emoticons
                           u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                           u"\U0001F680-\U0001F6FF"  # transport & map symbols
                           u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                           u"\U00002702-\U000027B0"
                           u"\U000024C2-\U0001F251"
                           "]+", flags=re.UNICODE)
    return bool(emoji_pattern.search(text))

df["contains_emojis"] = df["text"].apply(contains_emojis)
print(df[df["contains_emojis"]])

Empty DataFrame
Columns: [text, label, urls, has_uppercase, contains_single_quotes, has_punctuation, contains_stopwords, stemming_applied, has_extra_whitespace, tokens, contains_emojis]
Index: []


##Tokenization

![Tokenization](https://zotyag.github.io/Uni/INLP/Images/tokenizalas.webp)

###BPE Tokenization

In [None]:
# prompt: Apply the BPE tokenization algorithm on the df dataset

!pip install transformers

from transformers import AutoTokenizer

# Load the BPE tokenizer
tokenizer = AutoTokenizer.from_pretrained("gpt2") # Or any other BPE model

# Apply the tokenizer
df['tokens'] = df['text'].apply(lambda x: tokenizer.tokenize(x))

# Display some tokenized examples
print(df[['text', 'tokens']].head())

                                                text  \
0              bynd jpmorgan reel expect beyond meat   
1  ccl rcl nomura point book weak carniv royal ca...   
2  cx cemex cut credit suiss jp morgan weak build...   
3                      ess btig research cut neutral   
4              fnko funko slide piper jaffray pt cut   

                                              tokens  
0  [by, nd, Ġj, pm, organ, Ġreel, Ġexpect, Ġbeyon...  
1  [cc, l, Ġr, cl, Ġnom, ura, Ġpoint, Ġbook, Ġwea...  
2  [c, x, Ġc, em, ex, Ġcut, Ġcredit, Ġsu, iss, Ġj...  
3        [ess, Ġb, t, ig, Ġresearch, Ġcut, Ġneutral]  
4  [fn, ko, Ġfun, ko, Ġslide, Ġp, iper, Ġj, aff, ...  


##Vectorization

![Vectorization](https://zotyag.github.io/Uni/INLP/Images/vektorizalas.webp)

###Train and test and validation set spliting

In [None]:
# Train and test set

from sklearn.model_selection import train_test_split

# Split the data into training and temporary sets (80% train, 20% temp)
train_df, temp_df = train_test_split(df, test_size=0.2, random_state=42)

# Split the temporary set into validation and test sets (50% validation, 50% test)
val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42)

# Now you have train_df, val_df, and test_df
print(f"Train set size: {len(train_df)}")
print(f"Validation set size: {len(val_df)}")
print(f"Test set size: {len(test_df)}")

Train set size: 9544
Validation set size: 1193
Test set size: 1194


###Counter Vectorization

In [None]:
# CountVectorizer

from sklearn.feature_extraction.text import CountVectorizer

# Initialize the TF-IDF vectorizer
vectorizer = CountVectorizer(max_features=1000)

# Fit and transform the training data
train_df_cv = vectorizer.fit_transform(train_df['text']).toarray()

# Transform the validation and test data using the same vectorizer
val_df_cv = vectorizer.transform(val_df['text']).toarray()
test_df_cv = vectorizer.transform(test_df['text']).toarray()

###TF-IDF

In [None]:
# TFIDF

from sklearn.feature_extraction.text import TfidfVectorizer

# Initialize the TF-IDF vectorizer
tfidf_vectorizer = TfidfVectorizer(max_features=1000)

# Fit and transform the training data
train_df_tfidf = tfidf_vectorizer.fit_transform(train_df['text'])

# Transform the validation and test data using the same vectorizer
val_df_tfidf = tfidf_vectorizer.transform(val_df['text'])
test_df_tfidf = tfidf_vectorizer.transform(test_df['text'])

###Labeling

In [None]:
# Labels

train_df_labels = train_df['label'].values
val_df_labels = val_df['label'].values
test_df_labels = test_df['label'].values

##Modelling

![Modelling](https://zotyag.github.io/Uni/INLP/Images/modellezes.webp)

###Count Vectorization model creation

In [None]:
# prompt: generate a model for the df_cv dataset

from sklearn.linear_model import LogisticRegression

# Initialize the Logistic Regression model
model = LogisticRegression(max_iter=1000)  # Increased max_iter

# Train the model using the CountVectorizer data
model.fit(train_df_cv, train_df_labels)

# Make predictions on the validation set
val_predictions = model.predict(val_df_cv)

# Evaluate the model (example using accuracy)
from sklearn.metrics import accuracy_score
accuracy = accuracy_score(val_df_labels, val_predictions)
print(f"Validation Accuracy: {accuracy}")

#Now you can use the trained 'model' to predict on the test set
test_predictions = model.predict(test_df_cv)
test_accuracy = accuracy_score(test_df_labels, test_predictions)

print(f"Test Accuracy: {test_accuracy}")

Validation Accuracy: 0.7912824811399832
Test Accuracy: 0.7805695142378559


### TF-IDF model creation

In [None]:
# prompt: generate a model for the df_tfidf dataset

# Initialize the Logistic Regression model
model_tfidf = LogisticRegression(max_iter=1000)  # Increased max_iter

# Train the model using the TF-IDF data
model_tfidf.fit(train_df_tfidf, train_df_labels)

# Make predictions on the validation set
val_predictions_tfidf = model_tfidf.predict(val_df_tfidf)

# Evaluate the model (example using accuracy)
accuracy_tfidf = accuracy_score(val_df_labels, val_predictions_tfidf)
print(f"Validation Accuracy (TF-IDF): {accuracy_tfidf}")

# Now you can use the trained 'model_tfidf' to predict on the test set
test_predictions_tfidf = model_tfidf.predict(test_df_tfidf)
test_accuracy_tfidf = accuracy_score(test_df_labels, test_predictions_tfidf)

print(f"Test Accuracy (TF-IDF): {test_accuracy_tfidf}")

Validation Accuracy (TF-IDF): 0.7870913663034367
Test Accuracy (TF-IDF): 0.7747068676716918


###Model evaluation

In [None]:
# prompt: Evaluate the two models on the test sets

# ... (previous code)

# Evaluate CountVectorizer model on the test set
test_predictions_cv = model.predict(test_df_cv)
test_accuracy_cv = accuracy_score(test_df_labels, test_predictions_cv)
print(f"Test Accuracy (CountVectorizer): {test_accuracy_cv}")

# Evaluate TF-IDF model on the test set
test_predictions_tfidf = model_tfidf.predict(test_df_tfidf)
test_accuracy_tfidf = accuracy_score(test_df_labels, test_predictions_tfidf)
print(f"Test Accuracy (TF-IDF): {test_accuracy_tfidf}")

Test Accuracy (CountVectorizer): 0.7805695142378559
Test Accuracy (TF-IDF): 0.7747068676716918


##Practical use case

![Practical use case](https://zotyag.github.io/Uni/INLP/Images/practical_use.webp)

###Getting Tweet ID

In [None]:
# prompt: take a twitter link format(https://x.com/cinecitta2030/status/1863249312458686810) to a post as a input from the user and then get the tweet id which is the last part of the URL

import re

def get_tweet_id(twitter_url):
  """
  Extracts the tweet ID from a Twitter URL.

  Args:
    twitter_url: The URL of the tweet.

  Returns:
    The tweet ID as a string, or None if the ID cannot be extracted.
  """
  match = re.search(r"/status/(\d+)", twitter_url)
  if match:
    return match.group(1)
  else:
    return None

###Getting text from tweet

In [None]:
import requests
import os
import json
from google.colab import userdata


# To set your enviornment variables in your terminal run the following line:
# export 'BEARER_TOKEN'='<your_bearer_token>'
bearer_token = userdata.get('BEARER')


def create_url(tweet_url):
    id = get_tweet_id(tweet_url)
    url = f"https://api.twitter.com/2/tweets/{id}"
    return url


def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """

    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2TweetLookupPython"
    return r


def connect_to_endpoint(url):
    response = requests.request("GET", url, auth=bearer_oauth)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(
            "Request returned an error: {} {}".format(
                response.status_code, response.text
            )
        )
    return response.json()

def return_text_from_tweet(input_text):
  url = create_url(input_text)
  json_response = connect_to_endpoint(url)
  return json_response['data']['text']



###Predictions

In [None]:
# prompt: take a X/twitter link as an input from the user and then get the text from the post and parse it into a string then use the previously used text cleaning methods for this string after that make a prediction with both (tfidf and cv) models and then print out the results

import pandas as pd
import re
import contractions
import string
import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from transformers import AutoTokenizer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Assuming the previous code has been executed and the models (model, model_tfidf),
# vectorizers (vectorizer, tfidf_vectorizer), and other necessary objects are in memory.
# Test input: https://x.com/FluentInFinance/status/1862537796696592814


def preprocess_text(text):
    text = remove_urls(text)
    text = text.lower()
    text = expand_contractions(text)
    text = remove_punctuation(text)
    text = remove_stopwords(text)
    text = stem_text(text)
    text = remove_whitespace(text)
    text = remove_emojis(text)
    return text

def predict_sentiment(tweet_link):
    try:
        #  Extract tweet text from the link (implementation depends on the link format)
        #  Replace this placeholder with actual tweet extraction code
        tweet_text = return_text_from_tweet(tweet_link)

        #Preprocess the extracted text
        cleaned_tweet = preprocess_text(tweet_text)

        # Vectorize using CountVectorizer
        cv_tweet = vectorizer.transform([cleaned_tweet]).toarray()
        cv_prediction = model.predict(cv_tweet)[0]

        # Vectorize using TF-IDF
        tfidf_tweet = tfidf_vectorizer.transform([cleaned_tweet])
        tfidf_prediction = model_tfidf.predict(tfidf_tweet)[0]

        if cv_prediction == 0:
          cv_prediction = "Bearish"
        elif cv_prediction == 1:
          cv_prediction = "Bullish"
        else:
          cv_prediction = "Neutral"

        if tfidf_prediction == 0:
          tfidf_prediction = "Bearish"
        elif tfidf_prediction == 1:
          tfidf_prediction = "Bullish"
        else:
          tfidf_prediction = "Neutral"

        print(f"{cleaned_tweet}")
        print(f"Predicted Sentiment (CountVectorizer): {cv_prediction}")
        print(f"Predicted Sentiment (TF-IDF): {tfidf_prediction}")

    except Exception as e:
      print(f"An error occurred: {e}")

# Get input from the user
tweet_link = input("Enter the X/Twitter link: ")
predict_sentiment(tweet_link)

Enter the X/Twitter link: https://x.com/FluentInFinance/status/1862537796696592814
200
The S&amp;P 500 is up 57% since Michael Burry said, "Sell." https://t.co/D5OSW0bDBP
Predicted Sentiment (CountVectorizer): Bearish
Predicted Sentiment (TF-IDF): Bearish
