<a href="https://colab.research.google.com/github/julwdo/NLP-project/blob/main/NLP_Project_JW.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
#!apt-get install openjdk-17-jdk-headless -qq > /dev/null # OpenJDK 17
#!wget --show-progress https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz # Apache Spark 3.5.5 with Hadoop 3 support
#!tar xf spark-3.5.6-bin-hadoop3.tgz
#!pip install findspark

In [None]:
import os
import findspark
from pyspark.sql import SparkSession
from google.colab import auth
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from nltk.sentiment import SentimentIntensityAnalyzer
import math
from collections import Counter
from pyspark.sql import Window
from pyspark.sql.types import NumericType
from sentence_transformers import SentenceTransformer
import torch

In [21]:
import nltk
nltk.download('vader_lexicon')

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...


True

In [6]:
# Set up Spark
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-17-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.5.6-bin-hadoop3'

findspark.init()
#indspark.find()

spark = SparkSession.builder.appName('TwiBot22').getOrCreate()

In [7]:
auth.authenticate_user()

In [8]:
#!gcloud init

In [9]:
bucket_name = "twibot-22"
file_names = ["user.jsonl", "label.csv", "tweet_0.jsonl"]

for file_name in file_names:
  !gsutil cp gs://{bucket_name}/{file_name} /content/{file_name}

Copying gs://twibot-22/user.jsonl...
/ [0 files][    0.0 B/745.4 MiB]                                                ==> NOTE: You are downloading one or more large file(s), which would
run significantly faster if you enabled sliced object downloads. This
feature is enabled by default but requires that compiled crcmod be
installed (see "gsutil help crcmod").

- [1 files][745.4 MiB/745.4 MiB]   36.7 MiB/s                                   
Operation completed over 1 objects/745.4 MiB.                                    
Copying gs://twibot-22/label.csv...
\ [1 files][ 20.6 MiB/ 20.6 MiB]                                                
Operation completed over 1 objects/20.6 MiB.                                     
Copying gs://twibot-22/tweet_0.jsonl...
==> NOTE: You are downloading one or more large file(s), which would
run significantly faster if you enabled sliced object downloads. This
feature is enabled by default but requires that compiled crcmod be
installed (see "gsutil help c

In [10]:
users = spark.read.json(f"/content/user.jsonl")

users.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- description: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- description: struct (nullable = true)
 |    |    |-- cashtags: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- tag: string (nullable = true)
 |    |    |-- hashtags: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- tag: string (nullable = true)
 |    |    |-- mentions: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- end: long (nullable = true)
 |    |    |    |    |-- start: long (nullable = true)
 |    |    |    |    |-- username: string (nullable = true)
 |    |

In [11]:
users.show(5, truncate=False)

+-------------------------+-------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------+-------------------+-------------------------+---------------+----------------------------------------------------------------------------+---------+---------------------+-----------------------+-------------+--------+--------+
|created_at               |description                                                                                                                    |entities                                                                                                                                                                               

In [12]:
users_selected = users.select(
    F.col("id"),
    F.col("name"),
    F.col("username"),
    F.col("created_at"),
    F.col("description"),
    F.col("url"),
    F.col("entities.description.cashtags"),
    F.col("entities.description.hashtags"),
    F.col("entities.description.mentions"),
    F.col("entities.description.urls"),
    F.col("location"),
    F.col("pinned_tweet_id"),
    F.col("profile_image_url"),
    F.col("protected"),
    F.col("public_metrics.followers_count"),
    F.col("public_metrics.following_count"),
    F.col("public_metrics.listed_count"),
    F.col("public_metrics.tweet_count"),
    F.col("verified")
    )

In [13]:
users_selected.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- description: string (nullable = true)
 |-- url: string (nullable = true)
 |-- cashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- tag: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- tag: string (nullable = true)
 |-- mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- end: long (nullable = true)
 |    |    |-- start: long (nullable = true)
 |    |    |-- username: string (nullable = true)
 |-- urls: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- di

In [14]:
users_selected.show(5, truncate=False)

+--------------------+-------------------------+-------------+-------------------------+-------------------------------------------------------------------------------------------------------------------------------+-----------------------+--------+--------+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+---------------+----------------------------------------------------------------------------+---------+---------------+---------------+------------+-----------+--------+
|id                  |name                     |username     |created_at               |description                                                                                                                    |url                    |cashtags|hashtags|mentions          |urls                                                                                                 

In [15]:
labels = spark.read.csv(f"/content/label.csv", header=True, inferSchema=True)

In [16]:
labels.show(5, truncate=False)

+--------------------+-----+
|id                  |label|
+--------------------+-----+
|u1217628182611927040|human|
|u2664730894         |human|
|u1266703520205549568|human|
|u1089159225148882949|human|
|u36741729           |bot  |
+--------------------+-----+
only showing top 5 rows



In [17]:
users_labeled = users_selected.join(labels, users_selected.id == labels.id, "left").drop(labels.id)

In [18]:
users_labeled.show(20, truncate=False)

+--------------------+--------------------------------------+---------------+-------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------+--------+----------------------+-------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+------------------------------+-------------------+--------------------------------------------------------------------------------------------+---------+---------------+---------------+------------+-----------+--------+-----+
|id                  |name                                  |username       |created_at               |description                                                                                                                                                    |url          

In [19]:
print('Summary of missing values:')
users_labeled.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in users_labeled.columns]).show()

Summary of missing values:
+---+----+--------+----------+-----------+---+--------+--------+--------+------+--------+---------------+-----------------+---------+---------------+---------------+------------+-----------+--------+-----+
| id|name|username|created_at|description|url|cashtags|hashtags|mentions|  urls|location|pinned_tweet_id|profile_image_url|protected|followers_count|following_count|listed_count|tweet_count|verified|label|
+---+----+--------+----------+-----------+---+--------+--------+--------+------+--------+---------------+-----------------+---------+---------------+---------------+------------+-----------+--------+-----+
|  0|   0|       0|         0|          0|  0|  997126|  832135|  786552|912917|  291542|         610135|                0|        0|              0|              0|           0|          0|       0|    0|
+---+----+--------+----------+-----------+---+--------+--------+--------+------+--------+---------------+-----------------+---------+---------------+

In [22]:
sia = SentimentIntensityAnalyzer()

def vader_sentiment(text):
  return sia.polarity_scores(text)["compound"]

vader_udf = F.udf(vader_sentiment, FloatType())

In [23]:
def shannon_entropy(string):
    if string.strip() == "":
        return 0.0
    counts = Counter(string)
    length = len(string)
    return -sum((count/length) * math.log2(count/length) for count in counts.values())

entropy_udf = F.udf(shannon_entropy, FloatType())

In [24]:
now = F.current_timestamp()

In [25]:
user_features = users_labeled.select(
    F.col("id"),
    F.length(F.col("name")).alias("name_length"),
    F.length(F.col("username")).alias("username_length"),
    (F.length(F.col("username")) / F.greatest(F.length(F.col("name")), F.lit(1))).alias("username_name_length_ratio"),
    F.regexp_replace(F.regexp_replace(F.regexp_replace(F.col("description"), r"https?://t\.co/\S+", "<URL>"), r"(?<=^|\s)@\w+", "<USER>"), r"\b[\w\.-]+@[\w\.-]+\.\w+\b", "<EMAIL>").alias("description"),
    F.length(F.col("description")).alias("description_length"),
    F.when(F.col("name") == "", False).otherwise(True).alias("has_name"),
    F.when(F.col("username") == "", False).otherwise(True).alias("has_username"),
    F.when(F.col("description") == "", False).otherwise(True).alias("has_description"),
    F.when(F.col("url") == "", False).otherwise(True).alias("has_url"),
    F.when(F.col("location").isNull() | (F.col("location") == ""), False).otherwise(True).alias("has_location"),
    F.when(F.col("pinned_tweet_id").isNull(), False).otherwise(True).alias("has_pinned_tweet"),
    F.col("name").rlike("(?i)\\bbot\\b").alias("has_bot_word_in_name"),
    F.col("description").rlike("(?i)\\bbot\\b").alias("has_bot_word_in_description"),
    (F.length(F.regexp_replace(F.col("name"), "[^\\d]", "")) / F.greatest(F.length(F.col("name")), F.lit(1))).alias("ratio_digits_in_name"),
    (F.length(F.regexp_replace(F.col("username"), "[^\\d]", "")) / F.greatest(F.length(F.col("username")), F.lit(1))).alias("ratio_digits_in_username"),
    (F.length(F.regexp_replace(F.col("description"), "[^\\d]", "")) / F.greatest(F.length(F.col("description")), F.lit(1))).alias("ratio_digits_in_description"),
    (F.length(F.regexp_replace(F.col("name"), "[A-Za-z0-9 ]", "")) / F.greatest(F.length(F.col("name")), F.lit(1))).alias("ratio_special_chars_in_name"),
    (F.length(F.regexp_replace(F.col("username"), "[A-Za-z0-9 ]", "")) / F.greatest(F.length(F.col("username")), F.lit(1))).alias("ratio_special_chars_in_username"),
    (F.length(F.regexp_replace(F.col("description"), "[A-Za-z0-9 ]", "")) / F.greatest(F.length(F.col("description")), F.lit(1))).alias("ratio_special_chars_in_description"),
    (F.length(F.regexp_replace(F.col("name"), "[^A-Z]", "")) / F.greatest(F.length(F.regexp_replace(F.col("name"), "[^a-z]", "")), F.lit(1))).alias("name_upper_to_lower_ratio"),
    (F.length(F.regexp_replace(F.col("username"), "[^A-Z]", "")) / F.greatest(F.length(F.regexp_replace(F.col("username"), "[^a-z]", "")), F.lit(1))).alias("username_upper_to_lower_ratio"),
    entropy_udf(F.col("name")).alias("name_entropy"),
    entropy_udf(F.col("username")).alias("username_entropy"),
    (F.levenshtein(F.col("username"), F.col("name")) / F.greatest(F.length(F.col("username")), F.length(F.col("name")), F.lit(1))).alias("username_name_levenshtein"),
    vader_udf(F.col("description")).alias("description_sentiment"),
    F.when(F.col("cashtags").isNotNull(), F.size(F.col("cashtags"))).otherwise(F.lit(0)).alias("cashtag_in_description_count"),
    F.when(F.col("hashtags").isNotNull(), F.size(F.col("hashtags"))).otherwise(F.lit(0)).alias("hashtag_in_description_count"),
    F.when(F.col("mentions").isNotNull(), F.size(F.col("mentions"))).otherwise(F.lit(0)).alias("mention_in_description_count"),
    F.when(F.col("urls").isNotNull(), F.size(F.col("urls"))).otherwise(F.lit(0)).alias("url_in_description_count"),
    F.col("protected").alias("is_protected"),
    F.col("verified").alias("is_verified"),
    (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at"))).alias("account_age_seconds"),
    F.col("followers_count"),
    F.col("following_count"),
    F.col("listed_count"),
    F.col("tweet_count"),
    (F.col("followers_count") / F.greatest(F.col("following_count"), F.lit(1))).alias("followers_over_following"),
    (2 * F.col("followers_count") / F.greatest(F.col("following_count"), F.lit(1))).alias("double_followers_over_following"),
    (F.col("following_count") / F.greatest(F.col("followers_count"), F.lit(1))).alias("following_over_followers"),
    (F.col("following_count") / F.greatest(F.col("followers_count") ** 2, F.lit(1))).alias("following_over_followers_squared"),
    (F.col("following_count") / F.greatest(F.col("followers_count") + F.col("following_count"), F.lit(1))).alias("following_over_total_connections"),
    (F.col("listed_count") / F.greatest(F.col("followers_count"), F.lit(1))).alias("listed_over_followers"),
    (F.col("tweet_count") / F.greatest(F.col("followers_count"), F.lit(1))).alias("tweets_over_followers"),
    (F.col("listed_count") / F.greatest(F.col("tweet_count"), F.lit(1))).alias("listed_over_tweets"),
    (F.col("followers_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("follower_rate"),
    (F.col("following_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("following_rate"),
    (F.col("listed_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("listed_rate"),
    (F.col("tweet_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("tweet_rate"),
    F.col("label")
    )

In [26]:
#user_features.show(5, truncate=False)

In [27]:
tweets = spark.read.json(f"/content/tweet_0.jsonl")

tweets.printSchema()

root
 |-- attachments: struct (nullable = true)
 |    |-- media_keys: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- poll_ids: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |-- author_id: long (nullable = true)
 |-- context_annotations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- domain: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- entity: struct (nullable = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |-- conversation_id: long (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- annotations: array (nullable = true)
 |    |    |-- element: struct (c

In [28]:
#tweets.show(5, truncate=False)

In [29]:
tweets_selected = tweets.select(
    F.col("id"),
    F.regexp_replace(F.regexp_replace(F.regexp_replace(F.col("text"), r"https?://t\.co/\S+", "<URL>"), r"(?<=^|\s)@\w+", "<USER>"), r"\b[\w\.-]+@[\w\.-]+\.\w+\b", "<EMAIL>").alias("text"),
    F.concat(F.lit("u"), F.col("author_id")).alias("author_id"),
    F.col("created_at"),
    F.when(F.col("in_reply_to_user_id").isNull(), False).otherwise(True).cast("int").alias("is_reply"),
    F.col("lang"),
    F.col("possibly_sensitive").cast("int").alias("is_sensitive"),
    F.col("public_metrics.like_count"),
    F.col("public_metrics.quote_count"),
    F.col("public_metrics.reply_count"),
    F.col("public_metrics.retweet_count")
    )

In [30]:
tweets_selected.printSchema()

root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- author_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- is_reply: integer (nullable = false)
 |-- lang: string (nullable = true)
 |-- is_sensitive: integer (nullable = true)
 |-- like_count: long (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)



In [31]:
#tweets_selected.show(5, truncate=False)

In [32]:
#print('Summary of missing values:')
#tweets_selected.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in tweets_selected.columns]).show()

In [60]:
# Filter top 20 tweets per author
window = Window.partitionBy("author_id").orderBy(F.col("created_at").desc())
tweets_selected = tweets_selected.withColumn("rank", F.row_number().over(window))
tweets_filtered = tweets_selected.filter(F.col("rank") <= 20).drop("rank")

In [62]:
tweet_features = tweets_filtered.groupBy("author_id").agg(
    F.concat_ws(" ", F.collect_list("text")).alias("tweets_last20_concatenated_text"),
    F.avg(F.col("is_reply")).alias("tweets_last20_reply_fraction"),
    F.countDistinct("lang").alias("tweets_last20_num_distinct_langs"),
    F.avg(F.col("is_sensitive")).alias("tweets_last20_sensitive_fraction"),
    F.avg(F.col("like_count")).alias("tweets_last20_avg_likes"),
    F.avg(F.col("quote_count")).alias("tweets_last20_avg_quotes"),
    F.avg(F.col("reply_count")).alias("tweets_last20_avg_replies"),
    F.avg(F.col("retweet_count")).alias("tweets_last20_avg_retweets")
    )

In [63]:
#tweet_features.show(5, truncate=True)

In [64]:
# Join user features with aggregated tweet features
enriched_user_features = user_features.join(
    tweet_features,
    user_features.id == tweet_features.author_id,
    how="inner"
).drop("author_id")

In [65]:
enriched_user_features.printSchema()

root
 |-- id: string (nullable = true)
 |-- name_length: integer (nullable = true)
 |-- username_length: integer (nullable = true)
 |-- username_name_length_ratio: double (nullable = true)
 |-- description: string (nullable = true)
 |-- description_length: integer (nullable = true)
 |-- has_name: boolean (nullable = false)
 |-- has_username: boolean (nullable = false)
 |-- has_description: boolean (nullable = false)
 |-- has_url: boolean (nullable = false)
 |-- has_location: boolean (nullable = false)
 |-- has_pinned_tweet: boolean (nullable = false)
 |-- has_bot_word_in_name: boolean (nullable = true)
 |-- has_bot_word_in_description: boolean (nullable = true)
 |-- ratio_digits_in_name: double (nullable = true)
 |-- ratio_digits_in_username: double (nullable = true)
 |-- ratio_digits_in_description: double (nullable = true)
 |-- ratio_special_chars_in_name: double (nullable = true)
 |-- ratio_special_chars_in_username: double (nullable = true)
 |-- ratio_special_chars_in_description: 

In [66]:
#enriched_user_features.show(5, truncate=False)

In [None]:
numerical_cols = [f.name for f in enriched_user_features.schema.fields
                  if isinstance(f.dataType, NumericType)]

mean_std = enriched_user_features.select([
    F.mean(c).alias(c+"_mean") for c in numerical_cols
] + [
    F.stddev(c).alias(c+"_std") for c in numerical_cols
]).collect()[0]

In [37]:
text_features_only = user_features_with_tweets.select(
    "id",
    "description",
    "all_text"
    )

In [42]:
model = SentenceTransformer('all-mpnet-base-v2')

modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

In [45]:
def embed_text(row):
    desc_emb = model.encode(row['description'], convert_to_tensor=True)
    tweets_emb = model.encode(row['all_text'], convert_to_tensor=True)
    full_emb = torch.cat([desc_emb, tweets_emb], dim=-1)
    return (row['id'], full_emb.tolist())

embeddings_rdd = text_features_only.rdd.map(embed_text)
embeddings_df = embeddings_rdd.toDF(["id", "text_embedding"])

In [49]:
final_features = (
    user_features_with_tweets.drop("description", "all_text")
    .join(embeddings_df, on="id", how="inner")
)