# Setup

## Install necessary uncommon packages

In [1]:
!pip install gradio
!pip install tweepy



## Constants

In [20]:
deployment_tweet_count = 200

model1_max_features = 1250
model1_test_size = .4

model2_max_features = 1250
model2_test_size = .4

model1_min_df = 3
model2_min_df = 3

bearer_key = ""

## Imports / settings

In [3]:
# General imports
import pickle
import string

# Analysis imports
import pandas as pd
import numpy as np

# Twitter imports
import tweepy

# NLP imports
from nltk.stem import WordNetLemmatizer 
from nltk import pos_tag
from nltk.corpus import stopwords, wordnet
from nltk.tokenize import regexp_tokenize, word_tokenize, RegexpTokenizer

# SKlearn imports
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import train_test_split, GridSearchCV, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import ComplementNB
from sklearn.preprocessing import LabelEncoder
from sklearn.dummy import DummyClassifier

# Visualization imports
import matplotlib.pyplot as plt
import seaborn as sns

# Pandas settings
pd.options.display.max_rows = 100
pd.options.display.max_colwidth = 90

# Downloads (for NLP)
import nltk
nltk.download('wordnet')
nltk.download('tagsets')
nltk.download('averaged_perceptron_tagger');

# Deployment
import gradio as gr

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\Nate\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package tagsets to
[nltk_data]     C:\Users\Nate\AppData\Roaming\nltk_data...
[nltk_data]   Package tagsets is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\Nate\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


## Functions

These are helper functions that assist in the manipulation of tweet strings for pre-processing purposes.

In [21]:
# This is the file we save the twitter bearer key in.  Should be in folder included in gitignore b/c it's private info.
twitter_key_file = "private/twitter_key.json"

# Gets the twitter key from the private folder (not uploaded to github, included in gitignore)
def get_twitter_key():
    with open(twitter_key_file) as f:
        data = json.load(f)
    return data['bearer_key']

bearer_key = get_twitter_key()

In [22]:
def get_tweets(username, class_, number_of_tweets):
    # This is the key to use to download the tweets
   
    client = tweepy.Client(bearer_token=bearer_key)
    user_id = client.get_user(username=username).data.id

    # Uses the paginator to request as many tweets as we want (paginator makes it possible to download more than 100 at a time
    tweets = []
    for tweet in tweepy.Paginator(client.get_users_tweets, user_id, tweet_fields=['created_at', 'author_id'],expansions=[''], max_results=100, exclude=['replies']).flatten(limit=number_of_tweets):
        # Scrub the text of any non-readable characters
        text = "".join(i for i in tweet.text if i in string.printable)
        # Scrub the text of any newlines
        text = text.replace("\n", " ")
        # Put the tweet info into a new dictionary
        tweets.append({
            "user_name"  : str(username),
            'class'      : str(class_),
            "id"         : str(tweet.id),
            "text"       : str(text),
            "author_id"  : str(tweet.author_id),
            "created_at" : str(tweet.created_at)
        })
    return tweets


In [24]:
def strip_rt_user(text):
    if text[0:2] == "RT":
        colon = text.find(":")
        return text[colon+1:].lower()
    else:
        return text.lower()

def get_rt_user(text):
    if text[0:2] == "RT":
        colon = text.find(":")
        user = text[:colon]
        at = user.find("@")
        return (user[at+1:]).lower()
    else:
        return ""

def addHashTags(text):
    return "#" + text + "#"

# Translate nltk POS to wordnet tags
def get_wordnet_pos(treebank_tag):
    '''
    Translate nltk POS to wordnet tags
    '''
    if treebank_tag.startswith('J'):
        return wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return wordnet.VERB
    elif treebank_tag.startswith('N'):
        return wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return wordnet.ADV
    else:
        return wordnet.NOUN

def remove_characters(text, char_to_remove):
    str1 = ''.join(x for x in text if not x in char_to_remove)
    return str1

def remove_punctuation(text):
    text = remove_characters(text, string.punctuation)
    return text

def tag_and_lemmatize(text):
    newText = text
    newText = pos_tag(newText)
    newText = [(x[0], get_wordnet_pos(x[1])) for x in newText]
    lemma = nltk.stem.WordNetLemmatizer()
    newText = [(lemma.lemmatize(x[0], x[1])) for x in newText]
    return newText

def dummy_fun(doc):
    return doc

# perform all pre-processing on a df
def preprocessing(df):
    preprocessing_01_model_specific(df)
    preprocessing_02_general(df)
    preprocessing_03_tag_and_lemmatize(df)
    
    
def preprocessing_01_model_specific(df):
    # Copy the RT user name from the text column and put it into a different column.
    df['RT_user'] = df['text'].apply(get_rt_user)
    df['RT_user'] = df['RT_user'].apply(lambda x: addHashTags(x) if x != "" else "")

    # Pull out the RT user name from the text column
    df['text'] = df['text'].apply(strip_rt_user)
    
def preprocessing_02_general(df):
    # Lower case the text tweets
    df['text'] = df['text'].str.lower()

    # Strip out the meaningless links
    df['text'] = df['text'].apply(lambda x: " ".join([n for n in x.split() if n[0:4] != "http"]))

    # Strip any excess white space
    df['text'] = df['text'].apply(lambda x: x.strip())
    
    # Take out stop words
    sw = set(stopwords.words('english'))
    sw.update(['amp'])
    df['text'] = df['text'].apply(lambda x: " ".join([n for n in x.split() if n not in sw]))

    # Remove punctuation
    df['text'] = df['text'].apply(lambda x: remove_punctuation(x))

    # Make sure we don't have any random numbers
    df['text'] = df['text'].apply(lambda x: " ".join([n for n in x.split() if n.isnumeric() == False]))

    # Put together the RT user and the tweet text
    df['text'] = df['text'] + " " + df['RT_user']

    # Make a new column, tokenize the words
    df['text_tokenized'] = df['text'].str.split()
    
    df = df.drop(columns=['id', 'author_id', 'created_at'])
    
    df['text'] = df['text'].apply(lambda x: np.nan if len(x.strip()) == 0 else x)
    df = df.dropna().reset_index(drop=True) 

    le = LabelEncoder()
    df['class_label'] = le.fit_transform(df['class'])
    df.head()
    
def preprocessing_03_tag_and_lemmatize(df):
    df['text_tokenized'] = df['text_tokenized'].apply(tag_and_lemmatize)

In [25]:
def get_primary_interest(username):

    try:
        print("Attempting to get ", deployment_tweet_count, " tweets from handle '", username, "'", sep="")
        tweets = get_tweets(username, 'blank', deployment_tweet_count)
    except:
        print("No tweets were returned due to an API error.")
    else:
        if len(tweets) > 0:
            df_new = pd.DataFrame.from_dict(tweets)
            preprocessing(df_new)
            df_new = df_new.groupby(['user_name', 'class']).agg({'text_tokenized': 'sum'}).reset_index()

            #load the content
            tfidf = pickle.load(open("models/model1_tfidf.pkl", "rb" ))
            model = pickle.load(open("models/model1_model.pkl", "rb"))

            tf_new = TfidfVectorizer(analyzer='word', tokenizer=dummy_fun, 
                                    preprocessor=dummy_fun, token_pattern=None, 
                                    ngram_range=(1,3), min_df=model1_min_df, max_features=model1_max_features, vocabulary=tfidf.vocabulary_)
            df_new_vectorized = tf_new.fit_transform(df_new['text_tokenized'])

            category1 = model.predict(df_new_vectorized)[0]
            if category1 != "Politics":
                return (category1)
            else:
                #load the content
                tfidf = pickle.load(open("models/model2_tfidf.pkl", "rb" ))
                model = pickle.load(open("models/model2_model.pkl", "rb"))

                tf_new = TfidfVectorizer(analyzer='word', tokenizer=dummy_fun, 
                                    preprocessor=dummy_fun, token_pattern=None, 
                                    ngram_range=(1,3), min_df=model2_min_df, max_features=model2_max_features, vocabulary=tfidf.vocabulary_)
                df_new_vectorized = tf_new.fit_transform(df_new['text_tokenized'])

                category2 = model.predict(df_new_vectorized)[0]
                return (category2)

# Deploy model

Test with code

In [26]:
handle = "barackobama"
print("Primary interest:", get_primary_interest(handle))

Attempting to get 200 tweets from handle 'barackobama'
Primary interest: Politics - Liberal


Create web app

In [69]:
def get_interest_(name):
    return get_primary_interest(name)

with gr.Blocks() as demo:
    handle = gr.Textbox(label="Twitter handle")
    output = gr.Textbox(label="Primary interest")
    get_interest = gr.Button("Get user's primary interest")
    get_interest.click(fn=get_interest_, inputs=handle, outputs=output)

demo.launch(share=True)

Running on local URL:  http://127.0.0.1:7862
Running on public URL: https://8c089b51656e1ab36a.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades (NEW!), check out Spaces: https://huggingface.co/spaces


