In [1]:
import findspark
findspark.init()

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import ChiSqSelector
import json

In [3]:
spark = SparkSession.builder.master("local").appName("twitter_bot_identification").getOrCreate()
sc = spark.sparkContext
spark

# Reading and Cleaning Training Data
Source: Kaggle (https://www.kaggle.com/charvijain27/detecting-twitter-bot-data?select=training_data_2_csv_UTF.csv)

In [4]:
df = spark.read.option("quote", "\"").option("escape", "\"")\
        .load(path="train_data.csv",format="csv", sep=",", inferSchema="true", header="true")
df

DataFrame[id: decimal(18,0), id_str: string, screen_name: string, location: string, description: string, followers_count: int, friends_count: int, listedcount: int, favourites_count: int, verified: boolean, statuses_count: int, lang: string, default_profile: boolean, default_profile_image: boolean, has_extended_profile: boolean, bot: int]

### Features to detect Twitter Bots:
['followers_count', 'friends_count', 'listedcount', 'favourites_count', 'default_profile', 'word <b>'bot'</b> in the screenname <b>or</b> in the description']

In [5]:
df.createOrReplaceTempView('df')
raw_df = spark.sql('SELECT screen_name, description, location, followers_count, friends_count, listedcount,\
                        favourites_count, statuses_count, default_profile, bot FROM df WHERE verified = false')
raw_df

DataFrame[screen_name: string, description: string, location: string, followers_count: int, friends_count: int, listedcount: int, favourites_count: int, statuses_count: int, default_profile: boolean, bot: int]

In [6]:
train_df = raw_df.select(
'followers_count', 'friends_count', 'listedcount', 'favourites_count', 'statuses_count', 'default_profile','bot',
  F.col("screen_name").cast("string").contains("bot").alias("screen_name_binary"),
  F.col("description").cast("string").contains("bot").alias("description_name_binary")
).withColumn('bot_binary', F.col('screen_name_binary') | F.col('description_name_binary')).fillna({'bot_binary':False})\
.select('bot_binary', 'followers_count', 'friends_count', 
'listedcount', 'favourites_count', 'statuses_count', 'default_profile','bot')

In [7]:
train_df.show()

+----------+---------------+-------------+-----------+----------------+--------------+---------------+---+
|bot_binary|followers_count|friends_count|listedcount|favourites_count|statuses_count|default_profile|bot|
+----------+---------------+-------------+-----------+----------------+--------------+---------------+---+
|     false|           1291|            0|         10|               0|         78554|           true|  1|
|     false|              1|          349|          0|              38|            31|           true|  1|
|      true|           1086|            0|         14|               0|           713|           true|  1|
|     false|             33|            0|          8|               0|           676|           true|  1|
|     false|             11|          745|          0|             146|           185|          false|  1|
|     false|              1|          186|          0|               0|            11|           true|  1|
|      true|            193|         

# Scalerizing the features

In [8]:
cols=train_df.columns
cols.remove("bot")
assembler = VectorAssembler(inputCols=cols,outputCol="features")
train_df = assembler.transform(train_df)
train_df.select("features").show(5, truncate=False)

+-------------------------------------+
|features                             |
+-------------------------------------+
|[0.0,1291.0,0.0,10.0,0.0,78554.0,1.0]|
|[0.0,1.0,349.0,0.0,38.0,31.0,1.0]    |
|[1.0,1086.0,0.0,14.0,0.0,713.0,1.0]  |
|[0.0,33.0,0.0,8.0,0.0,676.0,1.0]     |
|[0.0,11.0,745.0,0.0,146.0,185.0,0.0] |
+-------------------------------------+
only showing top 5 rows



In [9]:
train, test = train_df.randomSplit([0.8, 0.2], seed=12345)

## Logistic Regression Model

In [47]:
lr = LogisticRegression(labelCol="bot", featuresCol="features",maxIter=10)
lr_model = lr.fit(train)
predict_train = lr_model.transform(train)
predict_test = lr_model.transform(test)
predict_test.select("bot","prediction").show(10)

+---+----------+
|bot|prediction|
+---+----------+
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
|  0|       1.0|
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
+---+----------+
only showing top 10 rows



In [48]:
tn = predict_test.select("bot","prediction").filter(predict_test.bot == 0).filter(predict_test.prediction == 0).count()
fp = predict_test.select("bot","prediction").filter(predict_test.bot == 0).filter(predict_test.prediction == 1).count()
fn = predict_test.select("bot","prediction").filter(predict_test.bot == 1).filter(predict_test.prediction == 0).count()
tp = predict_test.select("bot","prediction").filter(predict_test.bot == 1).filter(predict_test.prediction == 1).count()

In [49]:
recall = tp/(fn+tp)
precision = tp/(fp+tp)
f1 = (2*recall*precision)/(recall+precision)
print("F1 Score is: ",f1)

F1 Score is:  0.7853403141361258


In [11]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='bot')
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

The area under ROC for train set is 0.8299374078420743
The area under ROC for test set is 0.8224646000765415


## Random Forest Model

In [43]:
rf = RandomForestClassifier(labelCol="bot", featuresCol="features", numTrees=10)
rf_model = rf.fit(train)
predict_train = rf_model.transform(train)
predict_test = rf_model.transform(test)
predict_test.select("bot","prediction").show(10)

+---+----------+
|bot|prediction|
+---+----------+
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
|  0|       1.0|
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
|  1|       1.0|
+---+----------+
only showing top 10 rows



In [44]:
tn = predict_test.select("bot","prediction").filter(predict_test.bot == 0).filter(predict_test.prediction == 0).count()
fp = predict_test.select("bot","prediction").filter(predict_test.bot == 0).filter(predict_test.prediction == 1).count()
fn = predict_test.select("bot","prediction").filter(predict_test.bot == 1).filter(predict_test.prediction == 0).count()
tp = predict_test.select("bot","prediction").filter(predict_test.bot == 1).filter(predict_test.prediction == 1).count()

In [46]:
recall = tp/(fn+tp)
precision = tp/(fp+tp)
f1 = (2*recall*precision)/(recall+precision)
print("F1 Score is: ",f1)

F1 Score is:  0.8823529411764706


In [13]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction',labelCol='bot')
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

The area under ROC for train set is 0.9471908759892869
The area under ROC for test set is 0.9270091848450059


# Accessing MongoDB to get real time tweet data from Kafka

In [14]:
import json
from pymongo import MongoClient

In [15]:
client = MongoClient('localhost:27017')
collection = client.bots.bots

In [28]:
collection.count()

  """Entry point for launching an IPython kernel.


1048573

## Get Bot predictions of the twitter data stored in the database

In [17]:
def getDF(df):
    df.createOrReplaceTempView('df')
    df = spark.sql('SELECT screen_name, description, CAST(followers_count AS int), CAST(friends_count AS int)\
    , CAST(listedcount AS int), CAST(favourites_count as INT), CAST(statuses_count AS int), default_profile FROM df')
    return df.select(
    'followers_count', 'friends_count', 'listedcount', 'favourites_count', 'statuses_count', 'default_profile',
      F.col("screen_name").cast("string").contains("bot").alias("screen_name_binary"),
      F.col("description").cast("string").contains("bot").alias("description_name_binary")
    ).withColumn('bot_binary', F.col('screen_name_binary') | F.col('description_name_binary')).fillna({'bot_binary':False})\
    .select('bot_binary', 'followers_count', 'friends_count', 
    'listedcount', 'favourites_count', 'statuses_count', 'default_profile')

In [18]:
def getFeatures(train_df):
    cols=train_df.columns
    assembler = VectorAssembler(inputCols=cols,outputCol="features")
    train_df = assembler.transform(train_df)
    return train_df

In [29]:
user = {}
for element in collection.find({},{"_id": 0}):
    if element['screen_name'] not in user:
        handle = element['screen_name']
        location = element['location']
        df = getDF(spark.read.json(sc.parallelize([json.dumps(element)])))
        featuresDF = getFeatures(df)
        predict = lr_model.transform(featuresDF)
        if int(predict.select("prediction").first().prediction) == 1:
            isBot = True
        else:
            isBot = False
        user[handle] = [location, isBot]

In [30]:
with open('user.json', 'w') as file:
     file.write(json.dumps(user))

In [34]:
bots = 0
nonBots = 0
for key in user:
    if user[key][1]:
        bots += 1
    else:
        nonBots += 1
print("Total Bot percentage: ",bots*100/(bots+nonBots),"%")

Total Bot percentage:  8.175638434686753 %


## Delete mongo database

In [None]:
# collection.drop()