In [30]:
import pandas as pd
import numpy as np
import joblib
from math import log
import textcleaner
import string
import emoji
import re
import pickle
import collections
import scipy
import itertools
import json
import scipy.signal
import os
import warnings

warnings.filterwarnings("ignore", category=RuntimeWarning)

from sentence_transformers import SentenceTransformer, util

username_cols = ['entropy', 'num_uppercase', 'num_lowercase', 'num_digits', 'num_punctuations', 'num_emojis', 'num_hashtags']
username_model = joblib.load('../models_for_everyone/user_name.pkl')
posts_model = joblib.load('../models_for_everyone/posts.pkl')

news_model_filename = '../models_for_everyone/news_logregmodel.pkl'
with open(news_model_filename, 'rb') as f:
    news_model = pickle.load(f)
    
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')



## Predict Bot or Not through Username

In [2]:
def check_known_expert(username):   
    if username != None and username != '':
        username = username.lower()        
        if 'bot' in username:
            #print('bot in username')
            return 'bot'
        else: 
            return None
    
    return None

def log2(number): 
    return log(number)/log(2)

df_entropy = pd.read_csv('../models_for_everyone/names_dict.csv')
df_entropy['log2'] = df_entropy['probability'].apply(log2)
df_entropy_dict = df_entropy.set_index('character').to_dict()

def get_entropy_of_text(text):
    text = text.lower()
    entropy = 0.0
    if not text:
        return -1

    #text = remove_punctuations(text)
    for char in text:
        if char in df_entropy_dict['log2']:
            entropy += df_entropy_dict['log2'][char]

    return -entropy


def get_num_uppercase_letters(text):
    return sum(1 for c in text if c.isupper())

def get_num_lowercase_letters(text):
    return sum(1 for c in text if c.islower())

def get_num_digits(text):
    return sum(1 for c in text if c.isdigit())

def get_num_punctuations(text):
    count = lambda l1,l2: sum([1 for x in l1 if x in l2])
    return count(text, set(string.punctuation))

def get_num_hashtags(text):
    return sum(1 for c in text if c=='#')

def get_num_emojis(text):
    return len(''.join(c for c in text if c in emoji.UNICODE_EMOJI['en']))

def get_num_spaced_words(text):
    return len(text.split(' '))

def get_num_words(text):
    return len(re.findall(r'\w+', text))

# col_name is user_name or screen_name
def transform_df(df, col_name):
    df['entropy'] = df[col_name].apply(get_entropy_of_text)
    df['num_uppercase'] = df[col_name].apply(get_num_uppercase_letters)
    df['num_lowercase'] = df[col_name].apply(get_num_lowercase_letters)
    df['num_digits'] = df[col_name].apply(get_num_digits)
    df['num_punctuations'] = df[col_name].apply(get_num_punctuations)
    df['num_emojis'] = df[col_name].apply(get_num_emojis)
    df['num_hashtags'] = df[col_name].apply(get_num_hashtags)
    df['num_words'] = df[col_name].apply(get_num_spaced_words)
    
    return df

def get_username_prob(username):
    if username == None or username == '':
        return None
    
    username = username.lower()
    
    username_arr = [{'user_name': username}]
    df = pd.DataFrame(username_arr)
    df = transform_df(df, 'user_name')
    df_test = df[username_cols]
    predictions = username_model.predict_proba(df_test)
    return predictions

## Bot Sorter

In [3]:
def preprocess_text(text):
    if text == '':
        return ''
    else:
        text = text.lower()
        text_cleaned = text.replace('rt', '')
        text_cleaned = re.sub(r'@[A-Za-z0-9_]+', '', text_cleaned)
        text_cleaned = re.sub(r'#[A-Za-z0-9_]+', '', text_cleaned)
        text_cleaned = re.sub(r'https?:\/\/\S*', '', text_cleaned)

        output_list = text_cleaned.split(' ')
        output = ' '.join([x for x in output_list if x != ''])
        return output

In [4]:
def check_if_self_declared_bot(data):
    tweet_dict = data[0]
    
    screenname = tweet_dict['user']['screen_name']
    username = tweet_dict['user']['name']
    description = tweet_dict['user']['description']

    if screenname is not None:
        screenname = screenname.lower()
        if 'bot' in screenname:
            return True
    elif username is not None:
        username = username.lower()
        if 'bot' in username:
            return True
    elif description is not None:
        description = description.lower()
        if 'bot' in description:
            return True

    return False

In [5]:
def predict_tweet_or_news(text):
    preproc_text = preprocess_text(text)
    text_arr = [preproc_text]
    df_test = pd.DataFrame(text_arr, columns=['text'])
    pred = news_model.predict(df_test)
    return pred[0]

def check_if_news_bot(data):
    tweet_dict_1 = data[0]
    screenname = tweet_dict_1['user']['screen_name']
    description = tweet_dict_1['user']['description']
    username = tweet_dict_1['user']['name']
    
    if screenname is not None:
        screenname = screenname.lower()
        if 'news' in screenname:
            return True
    if username is not None:
        username = username.lower()
        if 'news' in username:
            return True
    if description is not None:
        description = description.lower()
        if 'news' in description:
            return True
    
    # for each tweet, check if it is news
    news_or_not_arr = []
    for tweet in data:       
        try:
            text = tweet['text']
        except:
            text = tweet['full_text']

        if text == '':
            news_or_not = 'tweet'
        else:
            processed_text = preprocess_text(text)
            if processed_text == '':
                news_or_not = 'tweet'
            else:
                news_or_not = predict_tweet_or_news(processed_text)

        news_or_not_arr.append(news_or_not)

    # if 80% are news, return news
    arr_counter = collections.Counter(news_or_not_arr)
    if (arr_counter['news'] / len(news_or_not_arr)) >= 0.80:
        is_news_bot = True
    else:
        is_news_bot = False
    
    return is_news_bot

In [6]:
def check_if_bridging_bot(data):
    num_more_than_2 = 0
    
    for tweet_dict in data:
        try:
            text = tweet_dict['text']
        except:
            text = tweet_dict['full_text']

        if text == '':
            return False
        else:
            sentence_split = text.split(' ')
            num_mentions = 0
            for w in sentence_split:
                if w.startswith('@'):
                    num_mentions += 1

            if num_mentions >= 2:
                num_more_than_2 += 1
                
            num_tweets = len(data)
            if num_more_than_2 >= (num_tweets * 0.80):
                return True
            else:
                return False
        
    return False

In [7]:
def check_if_amplifier_bot(data):
    amplifier_ref_mentions = 0
    amplifier_ref_retweet = 0
    
    for tweet in data:
        try:
            text = tweet['text']
        except:
            text = tweet['full_text']

        if text == '':
            continue
        else:
            if text.startswith('RT'):
                amplifier_ref_retweet += 1

            sentence_split = text.split(' ')
            for w in sentence_split:
                if w.startswith('@'):
                    amplifier_ref_mentions += 1
                    break

    perc_threshold = (len(data) * 0.80)

    if amplifier_ref_retweet >= perc_threshold:
        is_amplifier_bot = True
        amplifier_type = 'retweet'

    elif amplifier_ref_mentions >= perc_threshold:
        is_amplifier_bot = True
        amplifier_type = 'mentions'


    elif (amplifier_ref_mentions + amplifier_ref_retweet) >= perc_threshold:
        is_amplifier_bot = True
        amplifier_type = 'retweet and mentions'

    else:
        is_amplifier_bot = False
        amplifier_type = None
                
    return is_amplifier_bot

In [84]:
def get_posts_prob(text):
    #text_cleaned = preprocess_text(text)
    #text_dict = [{'text_cleaned': text_cleaned}]
    text_dict = [{'text_cleaned': text}]
    df = pd.DataFrame(text_dict)
    
    post_text_prob = posts_model.predict_proba(df)
    bot_prob = post_text_prob[0][0]
    human_prob = post_text_prob[0][1]
        
    return bot_prob
    
def check_if_cyborg(data):
    if len(data) <= 3:
        return False
    
    initial_bot = None
    initial_botscore = None
    
    num_flip = 0
    change_in_score = 0.0
    count = 0
    
    for tweet in data:
        # run botbuster on text
        try:
            text = tweet['text']
        except:
            text = tweet['full_text']
        
        if text != '':                
            botscore = get_posts_prob(text)
            if botscore >= 0.5:
                bot_or_not = True
            else:
                bot_or_not = False

            if initial_bot == None:
                initial_bot = bot_or_not
                initial_botscore = botscore
            else:
                if bot_or_not != initial_bot:
                    num_flip += 1

                    initial_bot = bot_or_not

                botscore_change = abs(initial_botscore - botscore)
                change_in_score += botscore_change

            count += 1
    
    #print(change_in_score, count)
    avg_change_in_score = change_in_score / count
    #print('num flips ', num_flip)
    #print('avg score change ', avg_change_in_score)
    
    #if num_flip >= 3 and avg_change_in_score >= 0.10:
    if num_flip >= 3 or change_in_score >= 0.02:
        return True
    else:
        return False

In [23]:
def check_if_content_generation_bot(data):
    is_retweet = 0

    for tweet in data:
        try:
            text = tweet['text']
        except:
            text = tweet['full_text']

        if text == '':
            continue
        else:
            if text.startswith('RT'):
                is_retweet += 1

    if is_retweet >= (0.80 * len(data)):
        return True
    else:
        return False

In [79]:
def get_periodicity(signal):    
    if len(signal) <= 10:
        return False, None
        
    try:
        #df_signal = scipy.signal.find_peaks(signal)
        #print('signal ', df_signal)

        #if len(df_signal) <= 1:
        #    return False, None

        #df_signal = df_signal[0]
        df_signal = signal
        
        diff_between_peaks = [x - df_signal[i - 1] for i, x in enumerate(df_signal)][1:]
        #print('diff between peaks ', diff_between_peaks)
        
        if len(diff_between_peaks) == 1:
            return True, diff_between_peaks[0]

        b = collections.Counter(diff_between_peaks)

        #most_common_key = b.most_common()[0][0]
        most_common_val = b.most_common()[0][1]
        
        #print('most common val', most_common_val)

        if most_common_val >= (0.5 * len(diff_between_peaks)):
            return True, most_common_val
        else:
            return False, None
    except:
        return False, None

def check_if_announcer_bot(data):
    df_tweet_dict = pd.DataFrame.from_dict(data)
    df_tweet_dict['created_at'] = pd.to_datetime(df_tweet_dict['created_at'])
    df_time = df_tweet_dict['created_at'].dt.floor('H').value_counts().rename_axis('date').reset_index(name='count')
    
    df_time_arr = df_time['count'].tolist()
    
    periodicity, most_common_val = get_periodicity(df_time_arr)
    
    if periodicity == False:
        return False
    
    else:
        return True    

In [11]:
def check_if_repeater_bot(data):
    text_list = []

    for tweet in data:
        try:
            text = tweet['text']
        except:
            text = tweet['full_text']

        if text == '':
            continue
        else:
            processed_text = preprocess_text(text)
            if processed_text == '':
                continue
            else:
                text_list.append(processed_text)
                
    num_pairs = 0
    sim_arr = []

    for pair in itertools.combinations(text_list, 2):
        embedding_1 = model.encode(pair[0], convert_to_tensor=True)
        embedding_2 = model.encode(pair[1], convert_to_tensor=True)

        similarity = util.pytorch_cos_sim(embedding_1, embedding_2)
        sim_arr.append(similarity[0][0].item())

        num_pairs += 1
        
    if np.array(sim_arr).mean() >= 0.50:
        return True
    else:
        return False

## Now run it

Note: this is doing line by line. To do: make it aggregated user 

To redo:
- Find number of lines in file 
- Read 1000 lines 
- Group by userid
- Parse

In [80]:
# data_file = 'test.json'
# out_datafile = 'test_bots.csv'

# out_fh = open(out_datafile, 'w', encoding='utf-8')
# out_fh.write('username,userid,botornot,self_declared_bot,news_bot,bridging_bot,amplifier_bot,cyborg,content_generation_bot,announcer_bot,repeater_bot\n')

136

In [26]:
def check_is_bot(username, verified):
    if verified == True:
        bot_prob = 0
        human_prob = 1
        
        return bot_prob, human_prob, False
    
    if check_known_expert(username) != 'bot':
        pred = get_username_prob(username)
        bot_prob = pred[0][0]
        human_prob = pred[0][1]
    else:
        bot_prob = 1
        human_prob = 0
        
    if bot_prob >= human_prob:
        return bot_prob, human_prob, True
    else:
        return bot_prob, human_prob, False

In [57]:
def check_type_of_bot(data, is_bot, out_fh):    
#     for userid, data in bot_users.items():
    username = data[0]['user']['screen_name']

    self_declared_bot = check_if_self_declared_bot(data)

    news_bot = check_if_news_bot(data)

    bridging_bot = check_if_bridging_bot(data)

    amplifier_bot = check_if_amplifier_bot(data)

    cyborg = check_if_cyborg(data)

    content_generation_bot = check_if_content_generation_bot(data)

    announcer_bot = check_if_announcer_bot(data)

    repeater_bot = check_if_repeater_bot(data)

    num_posts = len(data)

    out_fh.write(f'{username},{userid},{num_posts},{is_bot},{self_declared_bot},{news_bot},{bridging_bot},{amplifier_bot},{cyborg},{content_generation_bot},{announcer_bot},{repeater_bot}\n')
    

In [None]:
# If you ran 0_splitfiles before this 

in_dir = './covid_2020_june_in_full_separated'
out_dir = './covid_2020_june_out_full'

file_list = os.listdir(in_dir)

new_filename = os.path.join(out_dir, 'bot_sorter.csv')
out_fh = open(new_filename, 'w', encoding='utf-8')
out_fh.write('username,userid,num_posts,botornot,self_declared_bot,news_bot,bridging_bot,amplifier_bot,cyborg,content_generation_bot,announcer_bot,repeater_bot\n')

for file in file_list:    
    if file.endswith('.json'):        
        full_filename = os.path.join(in_dir, file)
        
        user_data = []
        userid = None
        verified = None
        username = None
        
        with open(full_filename, encoding='utf-8') as f:
            for line in f:
                try:
                    data = json.loads(line)

                    if userid == None:
                        userid = data['user']['id']
                        verified = data['user']['verified']
                        username = data['user']['screen_name']

                    user_data.append(data)
                
                except:
                    pass
                
        f.close()
        
        bot_prob, human_prob, is_bot = check_is_bot(username, verified)
        check_type_of_bot(user_data, is_bot, out_fh)

out_fh.close()

In [None]:
# For unsorted files 

in_dir = './covid_2020_june_in_full_separated'
out_dir = './covid_2020_june_out_full'

file_list = os.listdir(in_dir)

new_filename = os.path.join(out_dir, 'bot_sorter.csv')
out_fh = open(new_filename, 'w', encoding='utf-8')
out_fh.write('username,userid,num_posts,botornot,self_declared_bot,news_bot,bridging_bot,amplifier_bot,cyborg,content_generation_bot,announcer_bot,repeater_bot\n')

for file in file_list:    
    if file.endswith('.json'):        
        full_filename = os.path.join(in_dir, file)
        
        bot_users = {}
        non_bot_users = {}

        with open(full_filename, encoding='utf-8') as f:
            for line in f:
                try:
                    data = json.loads(line)

                    userid = data['user']['id']
                    verified = data['user']['verified']
                    username = data['user']['screen_name']

                    bot_prob, human_prob, is_bot = check_is_bot(username, verified)

                    if is_bot:
                        if userid not in bot_users:
                            bot_users[userid] = []

                        bot_users[userid].append(data)               

                    else:
                        if userid not in non_bot_users:
                            non_bot_users[userid] = []

                        non_bot_users[userid].append(data)
                except:
                    pass

        f.close()
        
check_type_of_bot(bot_users, True, out_fh)
check_type_of_bot(bot_users, False, out_fh)

out_fh.close()