This notebook is used to convert all the data from the multiple datasources (csv / json and other tabulars) to relational model.\
The RDBMS used is PostgreSQL.

# Initialization

Imports.\
PySpark is used to manipulate dataframes.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, unix_timestamp , to_timestamp, regexp_replace
import os
import psycopg2
from sqlalchemy import create_engine
import pandas as pd

PySpark session and link to access it.\
Do NOT forget to forward **port 4040** if using SSH.\
http://localhost:4040

Replace DATA_PATH with the path towards Twitter data as downloaded from Botometer\
Replace JDBC_DRIVER_PATH with the path towards postgresql drivers

In [None]:
jdbc_driver_path = "/data2/petit/code/drivers/postgresql-42.7.3.jar"
data_path = "/data2/petit/data/twitter"

spark = SparkSession.builder \
    .appName("Spark SQL basic example") \
    .config("spark.driver.memory", "30g") \
    .config("spark.executor.memory", "190g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.jars", jdbc_driver_path) \
    .getOrCreate()

print("Spark UI available at http://localhost:4040")

# set timeparser to legacy to avoid errors when converting timestamps
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")



for root, dirs, files in os.walk(data_path):
    for name in dirs:
        print(os.path.join(root, name))
    break




25/02/12 15:21:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark UI available at http://localhost:4040
/data2/petit/data/twitter/MGTAB
/data2/petit/data/twitter/cresci-2015
/data2/petit/data/twitter/cresci-rtbust
/data2/petit/data/twitter/gilani-2017
/data2/petit/data/twitter/midterm-2018
/data2/petit/data/twitter/Twibot-20
/data2/petit/data/twitter/other
/data2/petit/data/twitter/Twibot-22
/data2/petit/data/twitter/cresci-2017
/data2/petit/data/twitter/botometer-feedback-2019


# Gilani-2017

In [62]:
gilani_path = os.path.join(data_path, "gilani-2017")

gilani_users = spark.read.json(os.path.join(gilani_path, "gilani-2017_tweets.json"))\
    .select("user.*")\
    .drop("entities") \
    .withColumn("created_at", to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))\
    .withColumnRenamed("id", "user_id")\

gilani_label = spark.read.csv(os.path.join(gilani_path, "gilani-2017.tsv"), sep="\t", header=False)\
    .withColumnRenamed("_c0", "user_id")\
    .withColumnRenamed("_c1", "label")

gilani_users.join(gilani_label, on="user_id", how="inner")\
    .write.jdbc(url=jdbc_url, table="gilani_2017", mode="overwrite", properties=connection_properties)

# Midterm-2018

In [None]:
midterm_path = os.path.join(data_path, "midterm-2018")

midterm_users = spark.read.json(os.path.join(midterm_path, "midterm-2018_processed_user_objects.json"))\
    .withColumn("user_created_at", to_timestamp(col("user_created_at"), "EEE MMM dd HH:mm:ss yyyy"))\

midterm_label = spark.read.csv(os.path.join(midterm_path, "midterm-2018.tsv"), sep="\t", header=False)\
    .withColumnRenamed("_c0", "user_id")\
    .withColumnRenamed("_c1", "label")\

midterm_users.join(midterm_label, on="user_id", how="inner")\
    .write.jdbc(url=jdbc_url, table="midterm_2018", mode="overwrite", properties=connection_properties)

/data2/petit/data/twitter/midterm-2018/midterm-2018_processed_user_objects.json
/data2/petit/data/twitter/midterm-2018/midterm-2018.tsv


                                                                                

# Cresci-rtbust

In [56]:
crescirtb_path = os.path.join(data_path, "cresci-rtbust")

crescirtb_users = spark.read.json(os.path.join(crescirtb_path, "cresci-rtbust-2019_tweets.json"))\
    .select("user.*")\
    .drop("entities")\
    .withColumn("created_at", to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))\
    .withColumnRenamed("id", "user_id")

crescirtb_label = spark.read.csv(os.path.join(crescirtb_path, "cresci-rtbust-2019.tsv"), sep="\t", header=False)\
    .withColumnRenamed("_c0", "user_id")\
    .withColumnRenamed("_c1", "label")

crescirtb_users.join(crescirtb_label, on="user_id", how="inner")\
    .write.jdbc(url=jdbc_url, table="cresci_rtbust", mode="overwrite", properties=connection_properties)


# Botometer-Feedback-2019

In [28]:
botometerfb_path = os.path.join(data_path, "botometer-feedback-2019")

botometerfb_users = spark.read.json(os.path.join(botometerfb_path, "botometer-feedback-2019_tweets.json"))\
    .select("user.*")\
    .drop("entities")\
    .withColumn("created_at", to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))\
    .withColumnRenamed("id", "user_id")

botometerfb_label = spark.read.csv(os.path.join(botometerfb_path, "botometer-feedback-2019.tsv"), sep="\t", header=False)\
    .withColumnRenamed("_c0", "user_id")\
    .withColumnRenamed("_c1", "label")

botometerfb_users.join(botometerfb_label, on="user_id", how="inner")\
    .write.jdbc(url=jdbc_url, table="botometer_feedback", mode="overwrite", properties=connection_properties)

                                                                                

# Cresci-2015

For this one, the bots and humans are spread between multiple directories.

We need to read from each separate file (using *) before merging them.

Encoding is weird, the regex fixes those issues. (both cresci-2015 and cresci-2017)

In [4]:
cresci2015_path = os.path.join(data_path, "cresci-2015")

bot_dirs = ["FSF", "INT", "TWT"]
human_dirs = ["E13", "TFP"]

spark.read.csv(os.path.join(cresci2015_path, "*/users.csv"), header=True)\
        .withColumn("created_at", to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))\
        .withColumn("label", when(col("dataset").isin(bot_dirs), "bot").otherwise("human"))\
        .withColumn("name", regexp_replace(col("name"), "[^ -~]", ""))\
        .withColumn("screen_name", regexp_replace(col("screen_name"), "[^ -~]", ""))\
        .withColumn("description", regexp_replace(col("description"), "[^ -~]", ""))\
        .withColumn("location", regexp_replace(col("location"), "[^ -~]", ""))\
        .withColumn("url", regexp_replace(col("url"), "[^ -~]", ""))\
        .withColumnRenamed("id", "user_id")\
        .write.jdbc(url=jdbc_url, table="cresci_2015_users", mode="overwrite", properties=connection_properties)

24/11/22 09:56:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [None]:
spark.read.csv(os.path.join(cresci2015_path, "*/tweets.csv"), header=True)\
        .withColumn("created_at", to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))\
        .select([regexp_replace(col(c), '\x00', '').alias(c) for c in spark.read.csv(os.path.join(cresci2015_path, "*/tweets.csv"), header=True).columns])\
        .write.jdbc(url=jdbc_url, table="cresci_2015_tweets", mode="overwrite", properties=connection_properties)

                                                                                

# Cresci-2017

This one is a bit weird. Tweets and Users CSV files in the various directories don't share all the exact same columns (mostly "updated" and "crawled_at")

So first we need to clean them by keeping only the columns in common before merging them all and pushing them in the DB.

First cell is users, second cell is tweets

In [None]:
cresci2017_path = os.path.join(data_path, "cresci-2017")

bot_dirs = ["fake_followers", "social_spambots_1", "social_spambots_2", "social_spambots_3"]
human_dirs = ["genuine_accounts"]

# Load all the users DataFrames
genuine_accounts_users = spark.read.csv(os.path.join(cresci2017_path, "genuine_accounts/users.csv"), header=True)
fake_followers_users = spark.read.csv(os.path.join(cresci2017_path, "fake_followers/users.csv"), header=True)
social_spambots_1_users = spark.read.csv(os.path.join(cresci2017_path, "social_spambots_1/users.csv"), header=True)
social_spambots_2_users = spark.read.csv(os.path.join(cresci2017_path, "social_spambots_2/users.csv"), header=True)
social_spambots_3_users = spark.read.csv(os.path.join(cresci2017_path, "social_spambots_3/users.csv"), header=True)

# Find the common columns in all the DataFrames (using set intersection operation)
common_columns = set(genuine_accounts_users.columns)
common_columns &= set(fake_followers_users.columns)
common_columns &= set(social_spambots_1_users.columns)
common_columns &= set(social_spambots_2_users.columns)
common_columns &= set(social_spambots_3_users.columns)

# Select only the common columns 
genuine_accounts_users = genuine_accounts_users.select(*common_columns)
fake_followers_users = fake_followers_users.select(*common_columns)
social_spambots_1_users = social_spambots_1_users.select(*common_columns)
social_spambots_2_users = social_spambots_2_users.select(*common_columns)
social_spambots_3_users = social_spambots_3_users.select(*common_columns)

# Add the label column 
genuine_accounts_users = genuine_accounts_users.withColumn("label", when(col("id").isNotNull(), "human"))
fake_followers_users = fake_followers_users.withColumn("label", when(col("id").isNotNull(), "bot"))
social_spambots_1_users = social_spambots_1_users.withColumn("label", when(col("id").isNotNull(), "bot"))
social_spambots_2_users = social_spambots_2_users.withColumn("label", when(col("id").isNotNull(), "bot"))
social_spambots_3_users = social_spambots_3_users.withColumn("label", when(col("id").isNotNull(), "bot"))

# Merge all the DataFrames into a single DataFrame
final_df = genuine_accounts_users.union(fake_followers_users) \
                                .union(social_spambots_1_users) \
                                .union(social_spambots_2_users) \
                                .union(social_spambots_3_users)

final_df.withColumn("created_at", to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))\
        .withColumnRenamed("id", "user_id")\
        .write.jdbc(url=jdbc_url, table="cresci_2017_users", mode="overwrite", properties=connection_properties)

                                                                                

In [61]:
# Load all the tweets DataFrames
genuine_accounts_tweets = spark.read.csv(os.path.join(cresci2017_path, "genuine_accounts/tweets.csv"), header=True)
fake_followers_tweets = spark.read.csv(os.path.join(cresci2017_path, "fake_followers/tweets.csv"), header=True)
social_spambots_1_tweets = spark.read.csv(os.path.join(cresci2017_path, "social_spambots_1/tweets.csv"), header=True)
social_spambots_2_tweets = spark.read.csv(os.path.join(cresci2017_path, "social_spambots_2/tweets.csv"), header=True)
social_spambots_3_tweets = spark.read.csv(os.path.join(cresci2017_path, "social_spambots_3/tweets.csv"), header=True)

# Find the common columns in all the DataFrames (using set intersection operation)
common_columns = set(genuine_accounts_tweets.columns)
common_columns &= set(fake_followers_tweets.columns)
common_columns &= set(social_spambots_1_tweets.columns)
common_columns &= set(social_spambots_2_tweets.columns)
common_columns &= set(social_spambots_3_tweets.columns)

# Select only the common columns
genuine_accounts_tweets = genuine_accounts_tweets.select(*common_columns)
fake_followers_tweets = fake_followers_tweets.select(*common_columns)
social_spambots_1_tweets = social_spambots_1_tweets.select(*common_columns)
social_spambots_2_tweets = social_spambots_2_tweets.select(*common_columns)
social_spambots_3_tweets = social_spambots_3_tweets.select(*common_columns)

# Merge all the DataFrames into a single DataFrame
final_df = genuine_accounts_tweets.union(fake_followers_tweets) \
                                .union(social_spambots_1_tweets) \
                                .union(social_spambots_2_tweets) \
                                .union(social_spambots_3_tweets)

final_df.withColumn("created_at", to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))\
        .select([regexp_replace(col(c), '\x00', '').alias(c) for c in final_df.columns])\
        .write.jdbc(url=jdbc_url, table="cresci_2017_tweets", mode="overwrite", properties=connection_properties)


                                                                                

# TwiBot20

### Write in DB

Twibot20 dataset path

In [15]:
tb20_path = "/data2/petit/data/twitter/Twibot-20/"

Reading users and tweets from JSON in a dataframe

In [16]:
df_nodes = spark.read \
    .option("multiLine", True) \
    .json(tb20_path + "node.json") \
    .persist()

#df_nodes.show(10)

                                                                                

In [5]:
# search in df_users ids that start with the letter "t"
df_users = df_nodes.filter(df_nodes.id.startswith("u")).persist()

In [8]:
df_users.select('*',"public_metrics.*") \
    .drop("public_metrics","entities","pinned_tweet_id","profile_image_url","url","withheld","text") \
    .withColumn("created_at", to_timestamp(col("created_at"), "EEE MMM dd HH:mm:ss Z yyyy"))\
    .write.jdbc(url=jdbc_url, table="petit.tb20_users", mode="overwrite", properties=connection_properties)

                                                                                

In [23]:
df_tweets = df_nodes.filter(df_nodes.id.startswith("t")) \
    .select("id","text") \
    .withColumn("is_retweet", when(col("text").startswith("RT"), "true").otherwise("false"))\
    .write.jdbc(url=jdbc_url, table="petit.tb20_tweets", mode="overwrite", properties=connection_properties)

                                                                                

Edges

In [9]:
spark.read \
    .option("multiLine", True) \
    .option("header", "true") \
    .csv(tb20_path + "edge.csv") \
    .write.jdbc(url=jdbc_url, table="petit.tb20_edges", mode="overwrite", properties=connection_properties)

                                                                                

Retweets

In [28]:
df_edge = spark.read.jdbc(url=jdbc_url, table="petit.tb20_edges", properties=connection_properties)
df_edge.createOrReplaceTempView('edge')

df_tweet = spark.read.jdbc(url=jdbc_url, table="petit.tb20_tweets", properties=connection_properties)
df_tweet.createOrReplaceTempView('tweet')

In [29]:
df_post = spark.sql("SELECT source_id, target_id FROM edge WHERE relation = 'post'")
df_retweet = spark.sql("SELECT * FROM tweet WHERE is_retweet = 'true'")

In [31]:
# count each user retweets
df_post.join(df_retweet, df_post.target_id == df_retweet.id)\
    .groupBy("source_id")\
    .count()\
    .withColumnRenamed("source_id", "user_id")\
    .withColumnRenamed("count", "retweet_count")\
    .write.jdbc(url=jdbc_url, table="petit.tb20_user_retweet", mode="overwrite", properties=connection_properties)
    

                                                                                

# TwiBot22

## Original dataset

The original dataset is split between multiple files in different formats : json and csv.\
This part aims at writing them in a relational database.

The **database name** is *tep2022*, the **schema** is *petit*, and those **twibot22 tables** start with *tb22_*

In [41]:
tb22_path = "/data2/petit/data/twibot22/" 

Load the 9 tweet jsons in the database.\
Careful, very time consuming. Count ~8-9 minutes per file, roughly 75 minutes in total.\
Columns are listed below.\
*id, author_id, created_ad, text, lang, in_reply_to_user_id, like_count, quote_count, reply_count, retweet_count,*

In [16]:
for i in range(0,9):
    spark.read.option("multiLine", True).json(tb22_path + "/tweet_" + str(i) + ".json") \
        .select("id","author_id","created_at","text","lang","in_reply_to_user_id","public_metrics.*") \
        .withColumn("id", col("id").cast("string")) \
        .withColumn("author_id", col("author_id").cast("string")) \
        .withColumn("created_at", col("created_at").cast("timestamp")) \
        .withColumn("text", col("text").cast("string")) \
        .withColumn("lang", col("lang").cast("string")) \
        .withColumn("in_reply_to_user_id", col("in_reply_to_user_id").cast("string")) \
        .withColumn("quote_count", col("quote_count").cast("integer")) \
        .withColumn("reply_count", col("reply_count").cast("integer")) \
        .withColumn("retweet_count", col("retweet_count").cast("integer")) \
        .withColumn("like_count", col("like_count").cast("integer")) \
        .write.jdbc(url=jdbc_url, table="petit.tb22_tweet", mode="append", properties=connection_properties)

                                                                                

Loads users in the database. Columns are listed below.\
*created_at, description, id, location, name, protected, username, verified, followers_count, following_count, listed_count, tweet_count*

In [24]:
spark.read.option("multiLine", True).json(tb22_path + "/user.json")\
    .select('*',"public_metrics.*") \
    .drop("public_metrics","entities","pinned_tweet_id","profile_image_url","url","withheld") \
    .withColumn("created_at", col("created_at").cast("timestamp")) \
    .withColumn("description", when(col("description").contains("\u0000"), "").otherwise(col("description")))\
    .write.jdbc(url=jdbc_url, table="petit.tb22_user", mode="overwrite", properties=connection_properties)

Loads edges of the graph in the database. Columns are listed below.\
*source_id, relation, target_id*

In [25]:
spark.read.csv(tb22_path + "/edge.csv", header=True)\
    .write.jdbc(url=jdbc_url, table="petit.tb22_edge", mode="overwrite", properties=connection_properties)

Loads the labels and train/test/validation splits

In [44]:
spark.read.csv(tb22_path + "/label.csv", header=True)\
    .write.jdbc(url=jdbc_url, table="petit.tb22_label", mode="overwrite", properties=connection_properties)

In [45]:
spark.read.csv(tb22_path + "/split.csv", header=True)\
    .write.jdbc(url=jdbc_url, table="petit.tb22_split", mode="overwrite", properties=connection_properties)

## Interaction graph

Get edge table with all relations

In [20]:
df_edge = spark.read.jdbc(url=jdbc_url, table="petit.tb22_edge", properties=connection_properties)
df_edge.createOrReplaceTempView('edge')

Split all the different edge categories that will be used into different dataframes

In [31]:
df_rt = spark.sql("SELECT source_id, target_id FROM edge WHERE relation = 'retweeted'")
df_post = spark.sql("SELECT source_id, target_id FROM edge WHERE relation = 'post'")
df_reply = spark.sql("SELECT source_id, target_id FROM edge WHERE relation = 'replied_to'")
df_mention = spark.sql("SELECT source_id, target_id FROM edge WHERE relation = 'mentioned'")

### Retweet graph

Obtain user retweets\
Source user retweeted from Target user.

In [111]:
df_post_a = df_post.alias("df_post_a") 
df_post_b = df_post.alias("df_post_b")

df_rt_a = df_rt.alias("df_rt_a")

df_user_retweet = df_post_a.join(df_rt_a, col('df_post_a.target_id') == col('df_rt_a.source_id'))\
    .join(df_post_b, col('df_rt_a.target_id') == col('df_post_b.target_id'))\
    .select(col('df_post_a.source_id').alias('source_id'), col('df_post_b.source_id').alias('target_id'))\
    .persist()

Count duplicate rows (users that retweeted from the same user multiple times)

In [116]:
df_user_retweet = df_user_retweet\
    .groupBy('source_id','target_id')\
    .count()

In [117]:
df_user_retweet.write\
    .jdbc(url=jdbc_url, table="petit.tb22_retweet", mode="overwrite", properties=connection_properties)

### Reply graph

In [37]:
df_post_a = df_post.alias("df_post_a")
df_post_b = df_post.alias("df_post_b")

df_reply_a = df_reply.alias("df_reply_a")

df_user_reply = df_post_a.join(df_reply_a, col('df_post_a.target_id') == col('df_reply_a.source_id'))\
    .join(df_post_b, col('df_reply_a.target_id') == col('df_post_b.target_id'))\
    .select(col('df_post_a.source_id').alias('source_id'), col('df_post_b.source_id').alias('target_id'))\
    .persist()

In [38]:
df_user_reply = df_user_reply\
    .groupBy('source_id','target_id')\
    .count()

In [39]:
df_user_reply.write\
    .jdbc(url=jdbc_url, table="petit.tb22_reply", mode="overwrite", properties=connection_properties)

### Mention graph

In [32]:
df_post_a = df_post.alias("df_post_a")

df_mention_a = df_mention.alias("df_mention_a")

df_user_mention = df_post_a.join(df_mention_a, col('df_post_a.target_id') == col('df_mention_a.source_id'))\
    .select(col('df_post_a.source_id').alias('source_id'), col('df_mention_a.target_id').alias('target_id'))\
    .persist()

In [34]:
df_user_mention = df_user_mention\
    .groupBy('source_id','target_id')\
    .count()

In [36]:
df_user_mention.write\
    .jdbc(url=jdbc_url, table="petit.tb22_mention", mode="overwrite", properties=connection_properties)

# Stop

Run the **stop** function if needed to force shutdown the Spark session

In [46]:
spark.stop()