****Model to Detect Hate Speech****

In [37]:
# All the imports
import numpy as np
import pandas as pd
import re

from nltk.corpus import stopwords
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import SVC

from textblob import TextBlob
from collections import Counter
from typing import List, Dict, Union

*Loading Data*

In [22]:
df = pd.read_csv('Data/labeled_data.csv')
df.drop(columns=['Unnamed: 0'], inplace=True)
df.head()

Unnamed: 0,count,hate_speech,offensive_language,neither,class,tweet
0,3,0,0,3,2,!!! RT @mayasolovely: As a woman you shouldn't...
1,3,0,3,0,1,!!!!! RT @mleew17: boy dats cold...tyga dwn ba...
2,3,0,3,0,1,!!!!!!! RT @UrKindOfBrand Dawg!!!! RT @80sbaby...
3,3,0,2,1,1,!!!!!!!!! RT @C_G_Anderson: @viva_based she lo...
4,6,0,6,0,1,!!!!!!!!!!!!! RT @ShenikaRoberts: The shit you...


In [23]:
df1 = pd.read_csv("Data/hate-speech-prepared-spreadsheet.csv", delimiter="\t", encoding="utf-8")

df1 = df1.drop(columns=['Unnamed: 0', '###','Type', 'Target', 'Implicit', 'Metaphor/metonymy', 'Sarcasm/humor', 'Rhetorical question','Circumlocution'])
df1 = df1[df1['binary-hate-speech'] != 'discard']

df1['class'] = df1['binary-hate-speech'].map({
    'Hate Speech': 0,
    'Offensive Speech': 1,
    'Acceptable speech': 2
})

df1 = df1.dropna(subset=['class'])

df1.head()

Unnamed: 0,Comment,binary-hate-speech,class
0,Can we shut up about refugees already?,Acceptable speech,2.0
1,Why should we? It's the biggest humanitarian c...,Acceptable speech,2.0
6,You should Said that your parents and your Eur...,Acceptable speech,2.0
9,"Define, what demands? Where are these demands ...",Acceptable speech,2.0
11,Peter Hallam How does speaking the truth make ...,Acceptable speech,2.0


*Pre-Processing Data*

In [24]:
def clean_data(data):
    data = str(data).lower()
    data = re.sub(r"http\S+|https\S+", "", data)
    data = re.sub(r"[^a-z\s\']", "", data)
    data = re.sub(r"\@w+|\#", "", data)
    data = re.sub(r"\w*\d\w*", "", data)
    data = re.sub(r"\[.*?\]", "", data)
    data = re.sub(r"<.*?>+", "", data)
    data = re.sub(r'(?<=\s)[\'"]+(?=\s)|^[\'"]+|[\'"]+$', '', data)
    filtered_data = [word for word in data.split() if not word in stopwords.words("english")]
    return " ".join(filtered_data)

In [25]:
speech_class = []
speech = []

for i in range(df['count'].count()):
    hate_speech_percentage = df['hate_speech'].iloc[i] / df['count'].iloc[i]
    offensive_language_percentage = df['offensive_language'].iloc[i] / df['count'].iloc[i]
    neither_percentage = df['neither'].iloc[i] / df['count'].iloc[i]
    max_index = np.argmax(np.array([hate_speech_percentage, offensive_language_percentage, neither_percentage]))
    speech_class.append(max_index)
    speech.append(clean_data(df['tweet'][i]))

speech_class = np.array(speech_class)
speech = np.array(speech)

In [26]:
speech_train, speech_test, speech_class_train, speech_class_test = train_test_split(speech, speech_class, test_size=0.2, stratify=speech_class, random_state=42)

vectorizer = TfidfVectorizer(max_features=5000, ngram_range=(1,2))

speech_train_tfidf = vectorizer.fit_transform(speech_train)
speech_test_tfidf = vectorizer.transform(speech_test)

df1['cleaned_comment'] = df1['Comment'].apply(clean_data)

speech_class_df1 = df1['class'].to_numpy()
speech_df1 = df1['cleaned_comment'].to_numpy()

speech_df1_tfidf = vectorizer.transform(speech_df1)

*Actual Model Training and Evaluation*

In [29]:
param_grid = {'C': [0.01, 0.1, 1, 10, 100]}
model_grid = GridSearchCV(SVC(kernel='linear', probability=True), param_grid, cv=5)
model_grid.fit(speech_train_tfidf, speech_class_train)

model = model_grid.best_estimator_

model.fit(speech_train_tfidf, speech_class_train)

In [30]:
speech_class_pred = model.predict(speech_test_tfidf)
predictions_df1 = model.predict(speech_df1_tfidf)
predictions_df = model.predict(vectorizer.transform(speech))

In [31]:
report = classification_report(speech_class_test, speech_class_pred)
accuracy = accuracy_score(speech_class_test, speech_class_pred)

print(report)
print("Accuracy on df:", accuracy)

if 'class' in df1:
    print(classification_report(speech_class_df1, predictions_df1))
    print("Accuracy on df1:", accuracy_score(speech_class_df1, predictions_df1))

              precision    recall  f1-score   support

           0       0.61      0.16      0.26       286
           1       0.93      0.96      0.94      3838
           2       0.85      0.90      0.87       833

    accuracy                           0.91      4957
   macro avg       0.79      0.68      0.69      4957
weighted avg       0.89      0.91      0.89      4957

Accuracy on df: 0.9065967318942909
              precision    recall  f1-score   support

         0.0       0.00      0.00      0.00         0
         1.0       0.00      0.00      0.00         0
         2.0       1.00      0.67      0.80      2821

    accuracy                           0.67      2821
   macro avg       0.33      0.22      0.27      2821
weighted avg       1.00      0.67      0.80      2821

Accuracy on df1: 0.6717476072314782


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


**Behavioral Analysis**

There would also be factors like like to dislike ratio, the code of which would be different dependent on platform. However, we can perform behavioral anaylsis, extending on our model, which would work well our redesigned engagement algorithm!

*Calculating Nuanced Language*

In [39]:
inflammatory_words = {'horrible', 'hate', 'stupid', 'idiot', 'worst', 'useless'}

def calculate_nuanced_language(self, text: str) -> Dict[str, Union[bool, float]]:
    qualifying_words = ['however', 'although', 'nevertheless', 'while', 'despite', 'though', 'alternatively', 'possibly']
    
    text_lower = text.lower()
    qualifier_count = sum(text_lower.count(word) for word in qualifying_words)
    
    has_contrast = any(word in text_lower for word in ['but', 'however', 'although'])
    
    word_count = len(text.split())
    nuance_score = (qualifier_count / max(1, word_count * 0.1)) + (1 if has_contrast else 0)
    
    return {
        'nuanced_language': nuance_score > 1.5,
        'nuance_score': min(1.0, nuance_score / 3)
    }

*Calculating Differing Perspective*

In [38]:
def calculate_perspective_metrics(self, text: str, comments: List[str]) -> Dict[str, float]:
    comment_sentiments = [TextBlob(comment).sentiment.polarity for comment in comments]
    
    sentiment_range = max(0.1, max(comment_sentiments) - min(comment_sentiments))
    
    positive_views = sum(1 for s in comment_sentiments if s > 0.2)
    negative_views = sum(1 for s in comment_sentiments if s < -0.2)
    total_views = len(comment_sentiments)
    
    if total_views == 0:
        balance_score = 0.5
    else:
        balance_score = 1 - abs(positive_views - negative_views) / total_views
        
    return {
        'perspective_balance': balance_score,
        'perspective_range': min(1.0, sentiment_range)
    }

*And conflict metrics - see redesigning-engagement-algorithms.ipynb for how it can be applied!* 

In [40]:
def calculate_conflict_metrics(self, text: str, comments: List[str]) -> Dict[str, float]:
    text_lower = text.lower()
    
    inflammatory_count = sum(text_lower.count(word) for word in self.inflammatory_words)
    word_count = len(text_lower.split())
    inflammatory_score = min(1.0, inflammatory_count / max(1, word_count * 0.05))
    
    reply_pairs = [(comments[i], comments[i+1]) for i in range(len(comments)-1)]
    conflict_indicators = 0
    
    for comment1, comment2 in reply_pairs:
        if any(phrase in comment2.lower() for phrase in [
            'wrong', 'disagree', 'incorrect', 'false', 'actually', 
            'not true', "you're wrong", "you don't understand"
        ]):
            conflict_indicators += 1
            
    conflict_score = min(1.0, conflict_indicators / max(1, len(reply_pairs)))
    
    return {
        'conflict_potential': conflict_score,
        'inflammatory_language': inflammatory_score
    }

In [42]:
def calculate_polarization_index(self, comments: List[str]) -> float:
    if not comments:
        return 0.0
        
    sentiments = [TextBlob(comment).sentiment.polarity for comment in comments]
    
    sentiment_std = np.std(sentiments) if len(comments) > 1 else 0
    
    positive = sum(1 for s in sentiments if s > 0.3)
    negative = sum(1 for s in sentiments if s < -0.3)
    neutral = len(sentiments) - positive - negative
    
    if len(sentiments) == 0:
        return 0.0
        
    cluster_balance = abs(positive - negative) / len(sentiments)
    spread_factor = min(1.0, sentiment_std * 2)
    
    return min(1.0, (cluster_balance + spread_factor) / 2)

In [43]:
# As I noted earlier, this one varies from platform to platform but this is an example for this notebook
def calculate_user_metrics(self, user_interactions: List[Dict]) -> Dict[str, float]:
    if not user_interactions:
        return {
            'user_rating': 0.0,
            'constructive_dialogue': 0.0,
            'diversity_exposure': 0.0
        }
    
    ratings = [interaction.get('rating', 0) for interaction in user_interactions]
    user_rating = np.mean(ratings) if ratings else 0.0
    
    constructive_count = sum(1 for interaction in user_interactions if interaction.get('is_constructive', False))
    constructive_ratio = constructive_count / len(user_interactions)
    
    unique_interactions = len(set(str(interaction) for interaction in user_interactions))
    diversity_score = unique_interactions / len(user_interactions)
    
    return {
        'user_rating': min(1.0, user_rating / 5),  
        'constructive_dialogue': constructive_ratio,
        'diversity_exposure': diversity_score
    }

In [44]:
# This would be really cool as a dashboard!!
def calculate_community_health(self, user_metrics: Dict[str, float], content_metrics: Dict[str, float]) -> float:
    factors = {
        'constructive_dialogue': user_metrics.get('constructive_dialogue', 0) * 0.3,
        'diversity_exposure': user_metrics.get('diversity_exposure', 0) * 0.2,
        'low_conflict': (1 - content_metrics.get('conflict_potential', 0)) * 0.25,
        'user_satisfaction': user_metrics.get('user_rating', 0) * 0.25
    }
    
    return min(1.0, sum(factors.values()))

*All the metrics all together now*

In [45]:
def calculate_all_metrics(self, content: str, comments: List[str], user_interactions: List[Dict]) -> Dict[str, Union[float, bool]]:
    nuanced = self.calculate_nuanced_language(content)
    perspective = self.calculate_perspective_metrics(content, comments)
    conflict = self.calculate_conflict_metrics(content, comments)
    
    polarization = self.calculate_polarization_index(comments)
    
    user_metrics = self.calculate_user_metrics(user_interactions)
    
    content_metrics = {
        'conflict_potential': conflict['conflict_potential'],
        'inflammatory_language': conflict['inflammatory_language']
    }
    
    community_health = self.calculate_community_health(user_metrics, content_metrics)
    
    return {
        'nuanced_language': nuanced['nuanced_language'],
        'perspective_balance': perspective['perspective_balance'],
        'perspective_range': perspective['perspective_range'],
        'conflict_potential': conflict['conflict_potential'],
        'inflammatory_language': conflict['inflammatory_language'],
        'polarization_index': polarization,
        'user_rating': user_metrics['user_rating'],
        'constructive_dialogue': user_metrics['constructive_dialogue'],
        'diversity_exposure': user_metrics['diversity_exposure'],
        'community_health': community_health
    }