<a href="https://colab.research.google.com/github/julwdo/NLP-project/blob/main/01_codes/02_nlp_project_feature_engineering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-17-jdk-headless -qq > /dev/null # OpenJDK 17
!wget --show-progress https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz # Apache Spark 3.5.6 with Hadoop 3 support
!tar xf spark-3.5.6-bin-hadoop3.tgz
!pip install -q findspark
!pip install -q spark-nlp==6.1.2
!pip install -q xgboost==3.0.4
!pip install -q --upgrade pyspark==3.5.6

--2025-10-05 10:04:09--  https://dlcdn.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400923510 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.6-bin-hadoop3.tgz.1’


2025-10-05 10:04:15 (70.2 MB/s) - ‘spark-3.5.6-bin-hadoop3.tgz.1’ saved [400923510/400923510]



In [None]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-17-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.5.6-bin-hadoop3'

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("BotDetection")
    .master("local[*]")
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:6.1.2")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)
spark

In [None]:
from google.colab import auth
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType
from nltk.sentiment import SentimentIntensityAnalyzer
import math
from collections import Counter
from pyspark.sql import Window
from pyspark.sql.types import StringType, NumericType, BooleanType, ArrayType, FloatType
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import XlmRoBertaSentenceEmbeddings
from pyspark.ml import Pipeline

In [None]:
import nltk
nltk.download('vader_lexicon')

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


True

In [None]:
auth.authenticate_user()

In [None]:
#!gcloud init

In [None]:
bucket_name = "twibot-22"
file_names = ["user.jsonl", "label.csv", "tweet_0.jsonl", "edge.csv"]

for file_name in file_names:
    local_path = f"/content/{file_name}"
    if not os.path.exists(local_path):
        !gsutil cp gs://{bucket_name}/{file_name} {local_path}
    else:
        print(f"{file_name} already exists locally, skipping download.")

user.jsonl already exists locally, skipping download.
label.csv already exists locally, skipping download.
tweet_0.jsonl already exists locally, skipping download.
edge.csv already exists locally, skipping download.


In [None]:
users = spark.read.json(f"/content/user.jsonl")
#users.printSchema()

In [None]:
#users.show(5, truncate=False)

In [None]:
users_selected = users.select(
    F.col("id"),
    F.col("name"),
    F.col("username"),
    F.col("created_at"),
    F.col("description"),
    F.col("url"),
    F.col("entities.description.cashtags"),
    F.col("entities.description.hashtags"),
    F.col("entities.description.mentions"),
    F.col("entities.description.urls"),
    F.col("location"),
    F.col("pinned_tweet_id"),
    F.col("profile_image_url"),
    F.col("protected"),
    F.col("public_metrics.followers_count"),
    F.col("public_metrics.following_count"),
    F.col("public_metrics.listed_count"),
    F.col("public_metrics.tweet_count"),
    F.col("verified")
    )

In [None]:
#users_selected.printSchema()

In [None]:
#users_selected.show(5, truncate=False)

In [None]:
labels = spark.read.csv(f"/content/label.csv", header=True, inferSchema=True)

In [None]:
labels.show(5, truncate=False)

+--------------------+-----+
|id                  |label|
+--------------------+-----+
|u1217628182611927040|human|
|u2664730894         |human|
|u1266703520205549568|human|
|u1089159225148882949|human|
|u36741729           |bot  |
+--------------------+-----+
only showing top 5 rows



In [None]:
users_labeled = users_selected.join(labels, users_selected.id == labels.id, "left").drop(labels.id)

In [None]:
#users_labeled.show(5, truncate=False)

In [None]:
#print('Summary of missing values:')
#users_labeled.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in users_labeled.columns]).show()

In [None]:
sia = SentimentIntensityAnalyzer()

def vader_sentiment(text):
  return sia.polarity_scores(text)["compound"]

vader_udf = F.udf(vader_sentiment, FloatType())

In [None]:
def shannon_entropy(string):
    if string.strip() == "":
        return 0.0
    counts = Counter(string)
    length = len(string)
    return -sum((count/length) * math.log2(count/length) for count in counts.values())

entropy_udf = F.udf(shannon_entropy, FloatType())

In [None]:
now = F.current_timestamp()

In [None]:
user_features = users_labeled.select(
    F.col("id"),
    F.length(F.col("name")).alias("name_length"),
    F.length(F.col("username")).alias("username_length"),
    (F.length(F.col("username")) / F.greatest(F.length(F.col("name")), F.lit(1))).alias("username_name_length_ratio"),
    F.regexp_replace(F.regexp_replace(F.regexp_replace(F.col("description"), r"https?://t\.co/\S+", "<URL>"), r"(?<=^|\s)@\w+", "<USER>"), r"\b[\w\.-]+@[\w\.-]+\.\w+\b", "<EMAIL>").alias("description"),
    F.length(F.col("description")).alias("description_length"),
    F.when(F.col("name") == "", False).otherwise(True).alias("has_name"),
    F.when(F.col("username") == "", False).otherwise(True).alias("has_username"),
    F.when(F.col("description") == "", False).otherwise(True).alias("has_description"),
    F.when(F.col("url") == "", False).otherwise(True).alias("has_url"),
    F.when(F.col("location").isNull() | (F.col("location") == ""), False).otherwise(True).alias("has_location"),
    F.when(F.col("pinned_tweet_id").isNull(), False).otherwise(True).alias("has_pinned_tweet"),
    F.col("name").rlike("(?i)\\bbot\\b").alias("has_bot_word_in_name"),
    F.col("description").rlike("(?i)\\bbot\\b").alias("has_bot_word_in_description"),
    (F.length(F.regexp_replace(F.col("name"), "[^\\d]", "")) / F.greatest(F.length(F.col("name")), F.lit(1))).alias("ratio_digits_in_name"),
    (F.length(F.regexp_replace(F.col("username"), "[^\\d]", "")) / F.greatest(F.length(F.col("username")), F.lit(1))).alias("ratio_digits_in_username"),
    (F.length(F.regexp_replace(F.col("description"), "[^\\d]", "")) / F.greatest(F.length(F.col("description")), F.lit(1))).alias("ratio_digits_in_description"),
    (F.length(F.regexp_replace(F.col("name"), "[A-Za-z0-9 ]", "")) / F.greatest(F.length(F.col("name")), F.lit(1))).alias("ratio_special_chars_in_name"),
    (F.length(F.regexp_replace(F.col("username"), "[A-Za-z0-9 ]", "")) / F.greatest(F.length(F.col("username")), F.lit(1))).alias("ratio_special_chars_in_username"),
    (F.length(F.regexp_replace(F.col("description"), "[A-Za-z0-9 ]", "")) / F.greatest(F.length(F.col("description")), F.lit(1))).alias("ratio_special_chars_in_description"),
    (F.length(F.regexp_replace(F.col("name"), "[^A-Z]", "")) / F.greatest(F.length(F.regexp_replace(F.col("name"), "[^a-z]", "")), F.lit(1))).alias("name_upper_to_lower_ratio"),
    (F.length(F.regexp_replace(F.col("username"), "[^A-Z]", "")) / F.greatest(F.length(F.regexp_replace(F.col("username"), "[^a-z]", "")), F.lit(1))).alias("username_upper_to_lower_ratio"),
    entropy_udf(F.col("name")).alias("name_entropy"),
    entropy_udf(F.col("username")).alias("username_entropy"),
    (F.levenshtein(F.col("username"), F.col("name")) / F.greatest(F.length(F.col("username")), F.length(F.col("name")), F.lit(1))).alias("username_name_levenshtein"),
    vader_udf(F.col("description")).alias("description_sentiment"),
    F.when(F.col("cashtags").isNotNull(), F.size(F.col("cashtags"))).otherwise(F.lit(0)).alias("cashtag_in_description_count"),
    F.when(F.col("hashtags").isNotNull(), F.size(F.col("hashtags"))).otherwise(F.lit(0)).alias("hashtag_in_description_count"),
    F.when(F.col("mentions").isNotNull(), F.size(F.col("mentions"))).otherwise(F.lit(0)).alias("mention_in_description_count"),
    F.when(F.col("urls").isNotNull(), F.size(F.col("urls"))).otherwise(F.lit(0)).alias("url_in_description_count"),
    F.col("protected").alias("is_protected"),
    F.col("verified").alias("is_verified"),
    (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at"))).alias("account_age_seconds"),
    F.col("followers_count"),
    F.col("following_count"),
    F.col("listed_count"),
    F.col("tweet_count"),
    (F.col("followers_count") / F.greatest(F.col("following_count"), F.lit(1))).alias("followers_over_following"),
    (2 * F.col("followers_count") / F.greatest(F.col("following_count"), F.lit(1))).alias("double_followers_over_following"),
    (F.col("following_count") / F.greatest(F.col("followers_count"), F.lit(1))).alias("following_over_followers"),
    (F.col("following_count") / F.greatest(F.col("followers_count") ** 2, F.lit(1))).alias("following_over_followers_squared"),
    (F.col("following_count") / F.greatest(F.col("followers_count") + F.col("following_count"), F.lit(1))).alias("following_over_total_connections"),
    (F.col("listed_count") / F.greatest(F.col("followers_count"), F.lit(1))).alias("listed_over_followers"),
    (F.col("tweet_count") / F.greatest(F.col("followers_count"), F.lit(1))).alias("tweets_over_followers"),
    (F.col("listed_count") / F.greatest(F.col("tweet_count"), F.lit(1))).alias("listed_over_tweets"),
    (F.col("followers_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("follower_rate"),
    (F.col("following_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("following_rate"),
    (F.col("listed_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("listed_rate"),
    (F.col("tweet_count") / (F.unix_timestamp(now) - F.unix_timestamp(F.to_timestamp("created_at")))).alias("tweet_rate"),
    F.col("label")
    )

In [None]:
# Truncate each description to 256 tokens
def truncate_text(text, max_tokens=256):
    tokens = text.split()
    return " ".join(tokens[:max_tokens])

truncate_udf = F.udf(lambda x: truncate_text(x, 256), StringType())

user_truncated = user_features.withColumn(
    "description", truncate_udf("description")
)

In [None]:
#user_truncated.show(5, truncate=False)

In [None]:
tweets = spark.read.json(f"/content/tweet_0.jsonl")
#tweets.printSchema()

In [None]:
#tweets.show(5, truncate=False)

In [None]:
tweets_selected = tweets.select(
    F.col("id"),
    F.regexp_replace(F.regexp_replace(F.regexp_replace(F.col("text"), r"https?://t\.co/\S+", "<URL>"), r"(?<=^|\s)@\w+", "<USER>"), r"\b[\w\.-]+@[\w\.-]+\.\w+\b", "<EMAIL>").alias("text"),
    F.concat(F.lit("u"), F.col("author_id")).alias("author_id"),
    F.col("created_at"),
    F.when(F.col("in_reply_to_user_id").isNull(), False).otherwise(True).cast("int").alias("is_reply"),
    F.col("lang"),
    F.col("possibly_sensitive").cast("int").alias("is_sensitive"),
    F.col("public_metrics.like_count"),
    F.col("public_metrics.quote_count"),
    F.col("public_metrics.reply_count"),
    F.col("public_metrics.retweet_count")
    )

In [None]:
#tweets_selected.printSchema()

In [None]:
#tweets_selected.show(5, truncate=False)

In [None]:
#print('Summary of missing values:')
#tweets_selected.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in tweets_selected.columns]).show()

In [None]:
# Filter for English-language tweets
#tweets_en = tweets_selected.filter(F.col("lang") == "en").drop("lang")

tweets_en = tweets_selected

In [None]:
# Filter top 10 tweets per author
window = Window.partitionBy("author_id").orderBy(F.col("created_at").desc())
tweets_en = tweets_en.withColumn("rank", F.row_number().over(window))
tweets_filtered = tweets_en.filter(F.col("rank") <= 10).drop("rank")

In [None]:
# Truncate each tweet to 12 tokens
truncate_udf = F.udf(lambda x: truncate_text(x, 12), StringType())

tweets_truncated = tweets_filtered.withColumn(
    "text_truncated", truncate_udf("text")
)

In [None]:
#tweets_truncated.show(1, truncate=False)

In [None]:
tweet_features = tweets_truncated.groupBy("author_id").agg(
    F.concat_ws(" ", F.collect_list("text_truncated")).alias("top_tweets_truncated_texts_concat"),
    F.collect_list("text").alias("top_tweets_texts"),
    F.avg(F.col("is_reply")).alias("top_tweets_reply_fraction"),
    F.countDistinct("lang").alias("top_tweets_num_distinct_langs"),
    F.avg(F.col("is_sensitive")).alias("top_tweets_sensitive_fraction"),
    F.avg(F.col("like_count")).alias("top_tweets_avg_likes"),
    F.avg(F.col("quote_count")).alias("top_tweets_avg_quotes"),
    F.avg(F.col("reply_count")).alias("top_tweets_avg_replies"),
    F.avg(F.col("retweet_count")).alias("top_tweets_avg_retweets")
    )

In [None]:
# Join user features with aggregated tweet features
enriched_user_features = user_truncated.join(
    tweet_features,
    user_truncated.id == tweet_features.author_id,
    how="inner"
).drop("author_id").cache()

In [None]:
#enriched_user_features.printSchema()

In [None]:
#enriched_user_features.show(5, truncate=False)

In [None]:
#edges = spark.read.csv(f"/content/edge.csv", header=True, inferSchema=True)

In [None]:
#edges.show(5, truncate=False)

In [None]:
#subset_users = enriched_user_features.select("id").distinct()

In [None]:
#edges_filtered = (
#    edges.filter(edges.relation == "following")
#    .join(subset_users.withColumnRenamed("id", "source_id"), on="source_id", how="inner")
#    .join(subset_users.withColumnRenamed("id", "target_id"), on="target_id", how="inner")
#).cache()

In [None]:
path = "/content/drive/MyDrive/TwiBot-22_Processed"

enriched_user_features.write.mode("overwrite").parquet(f"{path}/all_lang/enriched_user_features.parquet")
#edges_filtered.write.mode("overwrite").parquet(f"{path}/edges_filtered.parquet")