In [None]:
import os
from snorkel.labeling import labeling_function
import pandas as pd
from snorkel.labeling import PandasLFApplier
from snorkel.labeling import LFAnalysis
from snorkel.analysis import get_label_buckets
from snorkel.labeling.model import MajorityLabelVoter
from snorkel.labeling.model import LabelModel
from snorkel.labeling import filter_unlabeled_dataframe
from snorkel.utils import probs_to_preds
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import StringTensorType


%matplotlib inline

In [None]:
# Turn off TensorFlow logging messages
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

# For reproducibility
os.environ["PYTHONHASHSEED"] = "0"

In [None]:
df = pd.read_excel('/Users/marcusturewicz/Downloads/tweets.xlsx')

df = df[['Label', 'Text', 'Language', 'Hashtags', 'URLs', 'Mentions', 'Media Type']]

df_test = df[(df['Label'] == 1) | (df['Label'] == 0)]
df_train = df[(df['Label'] == -1)]

Y_test = df_test['Label'].values

In [None]:
print(len(df))

In [None]:
df_train

In [None]:
df[['Text', 'Hashtags']].sample(50, random_state=2)

In [None]:
ABSTAIN = -1
HAM = 0
SPAM = 1

In [None]:
@labeling_function()
def hiring(x):
    return SPAM if "hiring" in x['Text'].lower() else ABSTAIN

@labeling_function()
def hire(x):
    return SPAM if "hire" in x['Text'].lower() else ABSTAIN    

@labeling_function()
def job(x):
    return SPAM if "job" in x['Text'].lower() else ABSTAIN

@labeling_function()
def career(x):
    return SPAM if "career" in x['Text'].lower() else ABSTAIN

@labeling_function()
def offer(x):
    return SPAM if "offer" in x['Text'].lower() else ABSTAIN

@labeling_function()
def candidate(x):
    return SPAM if "candidate" in x['Text'].lower() else ABSTAIN       

@labeling_function()
def money(x):
    return SPAM if "$" in x['Text'].lower() else ABSTAIN

@labeling_function()
def percent(x):
    return SPAM if "%" in x['Text'].lower() else ABSTAIN

@labeling_function()
def exclaim(x):
    return SPAM if "!" in x['Text'].lower() else ABSTAIN  

@labeling_function()
def links(x):
    return SPAM if x['Text'].count('http') > 2 else ABSTAIN       

@labeling_function()
def visit(x):
    return SPAM if "visit" in x['Text'].lower() else ABSTAIN     

@labeling_function()
def subscribe(x):
    return SPAM if "subscribe" in x['Text'].lower() else ABSTAIN

@labeling_function()
def firewall(x):
    return SPAM if "firewall" in x['Text'].lower() else ABSTAIN    

@labeling_function()
def blockchain(x):
    return SPAM if "blockchain" in x['Text'].lower() else ABSTAIN       

@labeling_function()
def crypto(x):
    return SPAM if "crypto" in x['Text'].lower() else ABSTAIN           

@labeling_function()
def buy(x):
    return SPAM if "buy" in x['Text'].lower() else ABSTAIN

@labeling_function()
def free(x):
    return SPAM if "free" in x['Text'].lower() else ABSTAIN

@labeling_function()
def sale(x):
    return SPAM if 'sale' in x['Text'].lower() else ABSTAIN      

@labeling_function()
def today(x):
    return SPAM if "today" in x['Text'].lower() else ABSTAIN           

@labeling_function()
def hashtags(x):
    return SPAM if x['Hashtags'] > 8 else ABSTAIN   

@labeling_function()
def minimal_hashtags(x):
    return HAM if x['Hashtags'] < 5 else ABSTAIN       

@labeling_function()
def mentions(x):
    return SPAM if x['Mentions'] > 2 else ABSTAIN

@labeling_function()
def lang_und(x):
    return SPAM if x['Language'] == 'und' else ABSTAIN

@labeling_function()
def net(x):
    return HAM if ' .net ' in x['Text'].lower() else ABSTAIN

@labeling_function()
def nuget(x):
    return HAM if 'nuget' in x['Text'].lower() else ABSTAIN       

@labeling_function()
def at_dotnet(x):
    return HAM if x['Text'].startswith('RT @dotnet') else ABSTAIN    

In [None]:
lfs = [hiring, hire, job, career, offer, candidate, money, percent, exclaim, visit, subscribe, firewall, buy, free, today, blockchain, crypto, hashtags, sale, mentions, lang_und, minimal_hashtags, net, nuget, at_dotnet]

applier = PandasLFApplier(lfs=lfs)
L_train = applier.apply(df=df_train)

In [None]:
L_train

In [None]:
LFAnalysis(L=L_train, lfs=lfs).lf_summary()

In [None]:
df_train.iloc[L_train[:, 1] == SPAM].sample(10, random_state=1)[['Text', 'Hashtags', 'Label']]

In [None]:
buckets = get_label_buckets(L_train[:, 0], L_train[:, 1])
df_train.iloc[buckets[(ABSTAIN, SPAM)]].sample(10, random_state=1)

In [None]:
L_test = applier.apply(df=df_test)

In [None]:
LFAnalysis(L=L_train, lfs=lfs).lf_summary()

In [None]:
def plot_label_frequency(L):
    plt.hist((L != ABSTAIN).sum(axis=1), density=True, bins=range(L.shape[1]))
    plt.xlabel("Number of labels")
    plt.ylabel("Fraction of dataset")
    plt.show()


plot_label_frequency(L_train)

In [None]:
majority_model = MajorityLabelVoter()
preds_train = majority_model.predict(L=L_train)

In [None]:
label_model = LabelModel(cardinality=2, verbose=True)
label_model.fit(L_train=L_train, n_epochs=500, log_freq=100, seed=123)

In [None]:
majority_acc = majority_model.score(L=L_test, Y=Y_test, tie_break_policy="random")[
    "accuracy"
]
print(f"{'Majority Vote Accuracy:':<25} {majority_acc * 100:.1f}%")

label_model_acc = label_model.score(L=L_test, Y=Y_test, tie_break_policy="random")[
    "accuracy"
]
print(f"{'Label Model Accuracy:':<25} {label_model_acc * 100:.1f}%")

In [None]:
def plot_probabilities_histogram(Y):
    plt.hist(Y, bins=10)
    plt.xlabel("Probability of SPAM")
    plt.ylabel("Number of data points")
    plt.show()


probs_train = label_model.predict_proba(L=L_train)
plot_probabilities_histogram(probs_train[:, SPAM])

In [None]:
df_train_filtered, probs_train_filtered = filter_unlabeled_dataframe(
    X=df_train, y=probs_train, L=L_train
)

In [None]:
vectorizer = CountVectorizer(ngram_range=(1, 5))

In [None]:
X_train = df_train_filtered['Text'].tolist()
X_test = df_test['Text'].tolist()

In [None]:
preds_train_filtered = probs_to_preds(probs=probs_train_filtered)

In [None]:
sklearn_model = GradientBoostingClassifier(n_estimators=100, learning_rate=1.0, max_depth=1, random_state=0)

In [None]:
pipe = Pipeline([("vectorizer", vectorizer), ("model", sklearn_model)])
pipe.fit(X=X_train, y=preds_train_filtered)

In [None]:
print(f"Test Accuracy: {pipe.score(X=X_test, y=Y_test) * 100:.1f}%")

In [None]:
Y_preds = pipe.predict(X=X_test)

print(classification_report(Y_test, Y_preds))


In [None]:
test_text = """#include<stdio.h>
#include<conio.h>
void main()
{
printf{"#GaneshChaturthi #DBoss};
getch()
#forex #bot #AI #Website #CodeNewbie #women #hacker #flutter #python #100DaysOfCode #ad #WomenWhoCode #tech #css  #cybersecurity #Blockchain #DataScience #infosec #dotnet #ClimateAction"""

pipe.predict(X=X_test)[0]

In [None]:
model_onnx = convert_sklearn(pipe, initial_types=[("input", StringTensorType([None, 1]))])

In [None]:
with open("../DotNetTwitterBot/spam_filter.onnx", "wb") as f:
    f.write(model_onnx.SerializeToString())