<a href="https://colab.research.google.com/github/julwdo/Algorithms-project/blob/main/NLP_Project_JW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#!apt-get install openjdk-17-jdk-headless -qq > /dev/null # OpenJDK 17
#!wget --show-progress https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz # Apache Spark 3.5.5 with Hadoop 3 support
#!tar xf spark-3.5.6-bin-hadoop3.tgz
#!pip install findspark

In [2]:
import os
import findspark
from pyspark.sql import SparkSession
from google.colab import auth
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from nltk.sentiment import SentimentIntensityAnalyzer
import math
from collections import Counter

In [3]:
#import nltk
#nltk.download('vader_lexicon')

In [4]:
# Set up Spark
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-17-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.5.6-bin-hadoop3'

findspark.init()
#indspark.find()

spark = SparkSession.builder.appName('TwiBot22').getOrCreate()

In [5]:
auth.authenticate_user()

In [6]:
#!gcloud init

In [7]:
#bucket_name = "twibot-22"
#file_names = ["user.jsonl", "label.csv", "tweet_0.jsonl"]

#for file_name in file_names:
#  !gsutil cp gs://{bucket_name}/{file_name} /content/{file_name}

In [8]:
users = spark.read.json(f"/content/user.jsonl")

users.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- description: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- description: struct (nullable = true)
 |    |    |-- cashtags: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- tag: string (nullable = true)
 |    |    |-- hashtags: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- tag: string (nullable = true)
 |    |    |-- mentions: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- username: string (nullable = true)
 |    |

In [9]:
users.show(5, truncate=False)

+-------------------------+-------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-------------------+-------------------------+---------------+----------------------------------------------------------------------------+---------+---------------------+-----------------------+-------------+--------+--------+
|created_at               |description                                                                                                                    |entities                                                                                                                                                                               

In [12]:
users_selected = users.select(
    F.col("id"),
    F.col("name"),
    F.col("username"),
    F.col("created_at"),
    F.col("description"),
    F.col("entities.description.cashtags"),
    F.col("entities.description.hashtags"),
    F.col("entities.description.mentions"),
    F.col("entities.description.urls"),
    F.col("location"),
    F.col("pinned_tweet_id"),
    F.col("profile_image_url"),
    F.col("protected"),
    F.col("public_metrics.followers_count"),
    F.col("public_metrics.following_count"),
    F.col("public_metrics.listed_count"),
    F.col("public_metrics.tweet_count"),
    F.col("verified")
    )

In [13]:
users_selected.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- description: string (nullable = true)
 |-- cashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- tag: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- tag: string (nullable = true)
 |-- mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- username: string (nullable = true)
 |-- urls: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- display_url: string (nullable = true)

In [15]:
users_selected.show(5, truncate=False)

+--------------------+-------------------------+-------------+-------------------------+-------------------------------------------------------------------------------------------------------------------------------+--------+--------+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+----------------------------------------------------------------------------+---------+---------------+---------------+------------+-----------+--------+
|id                  |name                     |username     |created_at               |description                                                                                                                    |cashtags|hashtags|mentions          |urls                                                                                                                                                 

In [16]:
labels = spark.read.csv(f"/content/label.csv", header=True, inferSchema=True)

In [17]:
labels.show(5, truncate=False)

+--------------------+-----+
|id                  |label|
+--------------------+-----+
|u1217628182611927040|human|
|u2664730894         |human|
|u1266703520205549568|human|
|u1089159225148882949|human|
|u36741729           |bot  |
+--------------------+-----+
only showing top 5 rows



In [18]:
users_labeled = users_selected.join(labels, users_selected.id == labels.id, "left").drop(labels.id)

In [20]:
users_labeled.show(5, truncate=False)

+--------------------+-------------------------+--------------+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+-----------------------------+----+--------------------------+-------------------+--------------------------------------------------------------------------------------------+---------+---------------+---------------+------------+-----------+--------+-----+
|id                  |name                     |username      |created_at               |description                                                                                                                                             |cashtags|hashtags|mentions                     |urls|location                  |pinned_tweet_id    |profile_image_url                                                                           |protected|followers_count|following_count|liste

In [21]:
print('Summary of missing values:')
users_labeled.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in users_labeled.columns]).show()

Summary of missing values:
+---+----+--------+----------+-----------+--------+--------+--------+------+--------+---------------+-----------------+---------+---------------+---------------+------------+-----------+--------+-----+
| id|name|username|created_at|description|cashtags|hashtags|mentions|  urls|location|pinned_tweet_id|profile_image_url|protected|followers_count|following_count|listed_count|tweet_count|verified|label|
+---+----+--------+----------+-----------+--------+--------+--------+------+--------+---------------+-----------------+---------+---------------+---------------+------------+-----------+--------+-----+
|  0|   0|       0|         0|          0|  997126|  832135|  786552|912917|  291542|         610135|                0|        0|              0|              0|           0|          0|       0|    0|
+---+----+--------+----------+-----------+--------+--------+--------+------+--------+---------------+-----------------+---------+---------------+---------------+----

In [22]:
sia = SentimentIntensityAnalyzer()

def vader_sentiment(text):
    if text.strip() == "":
        return 0.0
    return float(sia.polarity_scores(text)["compound"])

vader_udf = F.udf(vader_sentiment, FloatType())

In [23]:
def shannon_entropy(s):
    if s.strip() == "":
        return 0.0
    counts = Counter(s)
    length = len(s)
    entropy = -sum((count/length) * math.log2(count/length) for count in counts.values())
    return float(entropy)

entropy_udf = F.udf(shannon_entropy, FloatType())

In [34]:
now = F.current_timestamp()

In [29]:
user_features = users_labeled.select(
    F.col("id"),
    F.length(F.col("name")).alias("name_length"),
    F.length(F.col("username")).alias("username_length"),
    (F.length(F.col("username")) / F.greatest(F.length(F.col("name")), F.lit(1))).alias("username_name_length_ratio"),
    F.length(F.col("description")).alias("description_length"),
    F.when(F.col("name") == "", False).otherwise(True).alias("has_name"),
    F.when(F.col("username") == "", False).otherwise(True).alias("has_username"),
    F.when(F.col("description") == "", False).otherwise(True).alias("has_description"),
    F.when(F.col("location").isNull() | (F.col("location") == ""), False).otherwise(True).alias("has_location"),
    F.when(F.col("pinned_tweet_id").isNull(), False).otherwise(True).alias("has_pinned_tweet"),
    F.col("name").rlike("(?i)\\bbot\\b").alias("has_bot_word_in_name"),
    F.col("username").rlike("(?i)\\bbot\\b").alias("has_bot_word_in_username"),
    F.col("description").rlike("(?i)\\bbot\\b").alias("has_bot_word_in_description"),
    F.length(F.regexp_replace(F.col("name"), "[^\\d]", "")).alias("num_digits_in_name"),
    F.length(F.regexp_replace(F.col("username"), "[^\\d]", "")).alias("num_digits_in_username"),
    F.length(F.regexp_replace(F.col("description"), "[^\\d]", "")).alias("num_digits_in_description"),
    (F.length(F.regexp_replace(F.col("name"), "[^A-Z]", "")) / F.greatest(F.length(F.regexp_replace(F.col("name"), "[^a-z]", "")), F.lit(1))).alias("name_upper_to_lower_ratio"),
    (F.length(F.regexp_replace(F.col("username"), "[^A-Z]", "")) / F.greatest(F.length(F.regexp_replace(F.col("username"), "[^a-z]", "")), F.lit(1))).alias("username_upper_to_lower_ratio"),
    entropy_udf(F.col("name")).alias("name_entropy"),
    entropy_udf(F.col("username")).alias("username_entropy"),
    (F.levenshtein(F.col("username"), F.col("name")) / F.greatest(F.length(F.col("username")), F.length(F.col("name")), F.lit(1))).alias("username_name_levenshtein"),
    vader_udf(F.col("description")).alias("description_sentiment"),
    F.when(F.col("cashtags").isNotNull(), F.size(F.col("cashtags"))).otherwise(F.lit(0)).alias("cashtag_in_description_count"),
    F.when(F.col("hashtags").isNotNull(), F.size(F.col("hashtags"))).otherwise(F.lit(0)).alias("hashtag_in_description_count"),
    F.when(F.col("mentions").isNotNull(), F.size(F.col("mentions"))).otherwise(F.lit(0)).alias("mention_in_description_count"),
    F.when(F.col("urls").isNotNull(), F.size(F.col("urls"))).otherwise(F.lit(0)).alias("url_in_description_count"),
    F.col("protected").alias("is_protected"),
    F.col("verified").alias("is_verified"),
    (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at"))).alias("account_age_seconds"),
    F.col("followers_count"),
    F.col("following_count"),
    F.col("listed_count"),
    F.col("tweet_count"),
    (F.col("followers_count") / F.greatest(F.col("following_count"), F.lit(1))).alias("followers_over_following"),
    (2 * F.col("followers_count") / F.greatest(F.col("following_count"), F.lit(1))).alias("double_followers_over_following"),
    (F.col("following_count") / F.greatest(F.col("followers_count"), F.lit(1))).alias("following_over_followers"),
    (F.col("following_count") / F.greatest(F.col("followers_count") ** 2, F.lit(1))).alias("following_over_followers_squared"),
    (F.col("following_count") / F.greatest(F.col("followers_count") + F.col("following_count"), F.lit(1))).alias("following_over_total_connections")
    (F.col("listed_count") / F.greatest(F.col("followers_count"), F.lit(1))).alias("listed_over_followers"),
    (F.col("tweet_count") / F.greatest(F.col("followers_count"), F.lit(1))).alias("tweets_over_followers"),
    (F.col("listed_count") / F.greatest(F.col("tweet_count"), F.lit(1))).alias("listed_over_tweets"),
    (F.col("followers_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("follower_rate"),
    (F.col("following_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("following_rate"),
    (F.col("listed_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("listed_rate"),
    (F.col("tweet_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("tweet_rate")
    )

In [30]:
user_features.show(20, truncate=False)

+--------------------+-----------+---------------+------------------+--------+------------+---------------+------------+----------------+--------------------+------------------+-------------------------+------------+---------------+---------------+-----------+-------------------+------------+-----------+------------+----------------------------+----------------------------+----------------------------+------------------------+---------------------------+------------------------+------------------+-------------------------+---------------------+---------------------+---------------------+---------------------+-------------------------+---------------------------------+----------------------+-----------------------------+-------------------------+--------------------------------+--------------------------------------+-------------------------------+---------------------------+--------------------------+------------------------+---------------------+----------------+
|id                  

In [51]:
user_features.filter(F.col("name") == "").show()

+--------------------+---------------+---------------+-----------+---------------+-----------+-------------------+------------+-----------+------------+---------------+------------+----------------------------+----------------------------+----------------------------+------------------------+---------------------------+------------------------+--------------------+------------------+-------------------------+--------------------+--------------------+--------------------+--------------------+-------------------------+---------------------------------+
|                  id|following_count|followers_count|tweet_count|username_length|name_length|account_age_seconds|is_protected|is_verified|listed_count|has_description|has_location|cashtag_in_description_count|hashtag_in_description_count|mention_in_description_count|url_in_description_count|has_bot_word_in_description|has_bot_word_in_username|has_bot_word_in_name|description_length|followers_following_ratio|          tweet_rate|       fol

In [7]:
tweets = spark.read.json(f"/content/{file_name}")

tweets.printSchema()

root
 |-- attachments: struct (nullable = true)
 |    |-- media_keys: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- poll_ids: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- author_id: long (nullable = true)
 |-- context_annotations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- domain: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- entity: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |-- conversation_id: long (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- annotations: array (nullable = true)
 |    |    |-- element: struct (c

In [None]:
tweets.show(5, truncate=False)

+-----------+-------------------+-------------------+-------------------+-------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+--------------------+-------------------+----+------------------+--------------------+-----------------+--------------+------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
tweets_selected = tweets.select(
    col("author_id"),
    col("created_at"),
    explode("entities.media").alias("media_item"),
    col("id"),
    col("lang"),
    col("public_metrics.like_count"),
    col("public_metrics.retweet_count"),
    col("text")
    ).select(
        col("author_id"),
        col("created_at"),
        col("media_item.media_url_https").alias("media_url"),
        col("id"),
        col("lang"),
        col("like_count"),
        col("retweet_count"),
        col("text")
        )

In [None]:
tweets_selected.printSchema()

root
 |-- author_id: long (nullable = true)
 |-- created_at: string (nullable = true)
 |-- media_url: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- like_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- text: string (nullable = true)



In [None]:
tweets_selected.show(5, truncate=False)

+-------------------+-------------------------+-----------------------------------------------+--------------------+----+----------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|author_id          |created_at               |media_url                                      |id                  |lang|like_count|retweet_count|text                                                                                                                                                                                                                                                                                                    |
+-------------------+-------------------------+-----------------------------------------------+-----------------

In [None]:
tweets_selected.select("lang").distinct().show()

+----+
|lang|
+----+
|  en|
|  vi|
|  ne|
|  ro|
|  sl|
| und|
|  ur|
|  lv|
|  pl|
|  pt|
|  tl|
|  in|
|  ko|
|  uk|
|  cs|
|  mr|
|  sr|
|  tr|
|  de|
|  is|
+----+
only showing top 20 rows



In [None]:
# Check for missing values
print('Summary of missing values:')
tweets_selected.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in tweets_selected.columns]).show()

Summary of missing values:
+---------+----------+---+---------+---+----+----------+-------------+----+
|author_id|created_at|url|media_url| id|lang|like_count|retweet_count|text|
+---------+----------+---+---------+---+----+----------+-------------+----+
|        0|         0|  0|        0|  0|   0|         0|            0|   0|
+---------+----------+---+---------+---+----+----------+-------------+----+



In [None]:
n_rows = tweets_selected.count()
print(f'The dataset contains {n_rows} tweets.')

The dataset contains 2027740 tweets.


In [None]:
tweets_selected.groupBy('author_id').agg(F.countDistinct('lang')).filter(F.col('count(DISTINCT lang)') > 1).show(5)

+----------+--------------------+
| author_id|count(DISTINCT lang)|
+----------+--------------------+
| 280547194|                   2|
| 548871400|                   2|
|  28784085|                   2|
| 163172885|                   2|
|2251819037|                   3|
+----------+--------------------+
only showing top 5 rows



In [None]:
tweets_clean = tweets_selected.withColumn("text_clean",
                                          F.regexp_replace(col("text"), r"https?://t\.co/\S+", "<URL>")
                                          ).withColumn("text_clean",
                                                       F.regexp_replace(col("text_clean"), r"@\w+", "<USER>")
                                                       )
tweets_clean.show(5, truncate=False)

+-------------------+-------------------------+-----------------------------------------------+--------------------+----+----------+-------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|author_id          |created_at               |media_url                                      |id                  |lang|like_count|retweet_count|text                                                                                                                                       

In [None]:
n_rows = users_labeled.count()

n_human = users_labeled.filter(col("label") == "human").count()

human_perc = n_human / n_rows * 100
bot_perc = 100 - human_perc

print(f"Percentage of human accounts: {human_perc:.2f}")
print(f"Percentage of bot accounts: {bot_perc:.2f}")

Percentage of human accounts: 86.01
Percentage of bot accounts: 13.99


In [None]:
# Check for missing values
print('Summary of missing values:')
users_labeled.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in users_labeled.columns]).show()

Summary of missing values:
+-----------+---+----+---------------+---------------+------------+--------+-----+
|description| id|name|followers_count|following_count|listed_count|username|label|
+-----------+---+----+---------------+---------------+------------+--------+-----+
|          0|  0|   0|              0|              0|           0|       0|    0|
+-----------+---+----+---------------+---------------+------------+--------+-----+

