In [61]:
import pymongo
import polars as pl
import tldextract


In [62]:
client = pymongo.MongoClient(
    "mongodb+srv://admin:I4KjOZRsfKNwxncl@sns-fake-content-dedica.v2ock79.mongodb.net/?retryWrites=true&w=majority"
)
db = client["sns-fake-content"]


In [63]:
tweets = pl.read_parquet("../analysis/master_fnn.parquet")
users = pl.read_csv("./master_users.csv")


In [64]:
prefix = "dummy_"

# Tweets


In [65]:
tweets.columns


['lang',
 'id',
 'entities',
 'public_metrics',
 'context_annotations',
 'possibly_sensitive',
 'created_at',
 'author_id',
 'text',
 'conversation_id',
 'edit_history_tweet_ids',
 'reply_settings',
 'in_reply_to_user_id',
 'referenced_tweets',
 'geo',
 'withheld',
 'label',
 'processed_text']

## Label By Time

In [66]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
        ]
    )
    .groupby(["label", "possibly_sensitive", "created_at"])
    .count()
)
data.sort("created_at", descending=True)
data = data.to_dicts()

In [67]:
collection = prefix + "tweets_label_time"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7eff04234190>

## Content Text


In [68]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("processed_text"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
        ]
    )
    .explode("processed_text")
    .groupby(["processed_text", "label", "possibly_sensitive", "created_at"])
    .count()
)
data = data.to_dicts()

In [69]:
collection = prefix + "tweets_content_word"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7f0103e3d220>

## Content


In [70]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("processed_text"),
            pl.col("processed_text").arr.lengths().alias("processed_text_length"),
            pl.col("text"),
            pl.col("text").str.split(" ").arr.lengths().alias("text_length"),
            pl.col("text").str.n_chars().alias("text_character_length"),
            pl.col("public_metrics"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
        ]
    )
    .unnest("public_metrics")
    .filter(
        (pl.col("like_count") > 0)
        | (pl.col("quote_count") > 0)
        | (pl.col("reply_count") > 0)
        | (pl.col("retweet_count") > 0)
    )
    .sample(100000, seed=2023, shuffle=True)
).to_dicts()
# data.describe()


In [71]:
collection = prefix + "tweets_text_length"

if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe998529a0>

## Context Entity


In [72]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
            pl.col("context_annotations"),
        ]
    )
    .explode("context_annotations")
    .unnest("context_annotations")
    .unnest("entity")
    .select(
        [
            pl.all().exclude(["entity", "domain", "description", "id", "name"]),
            pl.col("description").alias("entity_description"),
            pl.col("id").alias("entity_id"),
            pl.col("name").alias("entity_name"),
        ]
    )
    .filter(pl.col("entity_name").is_not_null())
    .groupby(["entity_name", "label", "possibly_sensitive", "created_at", "tweet_id", "entity_description"])
    .count()
    .groupby(["entity_name", "label", "possibly_sensitive", "created_at", "entity_description"])
    .count()
    .sort("count", descending=True)
)
data = data.to_dicts()


In [73]:
collection = prefix + "tweets_entity_word"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7f0103cd0250>

## Context Domain

In [74]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
            pl.col("context_annotations"),
        ]
    )
    .explode("context_annotations")
    .unnest("context_annotations")
    .unnest("domain")
    .select(
        [
            pl.all().exclude(["entity", "domain", "description", "id", "name"]),
            pl.col("description").alias("domain_description"),
            pl.col("id").alias("domain_id"),
            pl.col("name").alias("domain_name"),
        ]
    )
    .filter(pl.col("domain_name").is_not_null())
    .groupby(["domain_name", "label", "possibly_sensitive", "created_at", "tweet_id", "domain_description"])
    .count()
    .groupby(["domain_name", "label", "possibly_sensitive", "created_at", "domain_description"])
    .count()
    .sort("count", descending=True)
)
data = data.to_dicts()
# data

In [75]:
collection = prefix + "tweets_domain_word"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe815bad90>

## Annotation

In [76]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
            pl.col("entities"),
        ]
    )
    .unnest("entities")
    .explode("annotations")
    .unnest("annotations")
    .filter(pl.col("normalized_text").is_not_null())
    .select(
        [
            pl.all().exclude(
                ["urls", "mentions", "hashtags", "cashtags", "annotations"]
            ),
        ]
    )
    .groupby(
        [
            "normalized_text",
            "label",
            "possibly_sensitive",
            "created_at",
            "tweet_id",
        ]
    )
    .count()
    .groupby(
        [
            "normalized_text",
            "label",
            "possibly_sensitive",
            "created_at",
        ]
    )
    .count()
    .sort("count", descending=True)
)
data = data.to_dicts()
# data


In [77]:
collection = prefix + "tweets_annotation_word"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe80665a60>

In [78]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
            pl.col("entities"),
        ]
    )
    .unnest("entities")
    .explode("annotations")
    .unnest("annotations")
    .filter(pl.col("normalized_text").is_not_null())
    .select(
        [
            pl.all().exclude(
                ["urls", "mentions", "hashtags", "cashtags", "annotations"]
            ),
        ]
    )
    .groupby(
        [
            "type",
            "label",
            "possibly_sensitive",
            "created_at",
            "tweet_id",
        ]
    )
    .count()
    .groupby(
        [
            "type",
            "label",
            "possibly_sensitive",
            "created_at",
        ]
    )
    .count()
    .sort("count", descending=True)
)
data = data.to_dicts()
# data


In [79]:
collection = prefix + "tweets_annotation_type"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe7af382b0>

## Hashtag/ Cashtag

In [80]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
            pl.col("entities"),
        ]
    )
    .unnest("entities")
    .explode("hashtags")
    .unnest("hashtags")
    .filter(pl.col("tag").is_not_null())
    .select(
        [
            pl.all().exclude(
                ["urls", "mentions", "hashtags", "cashtags", "annotations"]
            ),
        ]
    )
    .groupby(
        [
            "tag",
            "label",
            "possibly_sensitive",
            "created_at",
            "tweet_id",
        ]
    )
    .count()
    .groupby(
        [
            "tag",
            "label",
            "possibly_sensitive",
            "created_at",
        ]
    )
    .count()
    .sort("count", descending=True)
)
data = data.to_dicts()
# data


In [81]:
collection = prefix + "tweets_hashtag_word"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe80680d00>

In [82]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
            pl.col("entities"),
        ]
    )
    .unnest("entities")
    .explode("cashtags")
    .unnest("cashtags")
    .filter(pl.col("tag").is_not_null())
    .select(
        [
            pl.all().exclude(
                ["urls", "mentions", "hashtags", "cashtags", "annotations"]
            ),
        ]
    )
    .groupby(
        [
            "tag",
            "label",
            "possibly_sensitive",
            "created_at",
            "tweet_id",
        ]
    )
    .count()
    .groupby(
        [
            "tag",
            "label",
            "possibly_sensitive",
            "created_at",
        ]
    )
    .count()
    .sort("count", descending=True)
)
data = data.to_dicts()
# data


In [83]:
collection = prefix + "tweets_cashtag_word"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe7bcd0700>

## URL

In [84]:
def extracted_url(x):
    ext = tldextract.extract(x)
    return {
        "domain": ext.domain,
        "subdomain": ext.subdomain,
        "suffix": ext.suffix,
        "registered_domain": ext.registered_domain,
    }

data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
            pl.col("entities"),
        ]
    )
    .unnest("entities")
    .explode("urls")
    .unnest("urls")
    .filter(pl.col("expanded_url").is_not_null())
    .select(
        [
            pl.all().exclude(
                ["urls", "mentions", "hashtags", "cashtags", "annotations"]
            ),
            pl.col("expanded_url").apply(lambda x: extracted_url(x)).alias("extracted_url"),
        ]
    )
    .unnest("extracted_url")
    .groupby(
        [
            "domain",
            "subdomain",
            "suffix",
            "registered_domain",
            "label",
            "possibly_sensitive",
            "created_at",
            "tweet_id",
        ]
    )
    .count()
    .groupby(
        [
            "domain",
            "subdomain",
            "suffix",
            "registered_domain",
            "label",
            "possibly_sensitive",
            "created_at",
        ]
    )
    .count()
    .sort("count", descending=True)
)
data = data.to_dicts()
# data


In [85]:
collection = prefix + "tweets_url_domain"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe7f6087f0>

In [86]:
data = (
    tweets.select(
        [
            pl.col("id").alias("tweet_id"),
            pl.col("label"),
            pl.col("possibly_sensitive"),
            pl.col("created_at").dt.year(),
            pl.col("entities"),
        ]
    )
    .unnest("entities")
    .explode("urls")
    .unnest("urls")
    .filter(pl.col("title").is_not_null())
    .select(
        [
            pl.all().exclude(
                ["urls", "mentions", "hashtags", "cashtags", "annotations"]
            ),
        ]
    )
)

title = list(data['title'])

In [87]:
import spacy
from tqdm import tqdm

nlp = spacy.load("en_core_web_sm")

def get_tokenized_text(doc) -> "list[str]":
    return list(
        filter(
            lambda x: str(x) != "",
            [
                token.lemma_.lower() if not token.is_stop and token.is_alpha else ""
                for token in doc
            ],
        )
    )

texts: "list[list[str]]" = []
for doc in tqdm(nlp.pipe(title, n_process=-1), total=len(title)):
    texts.append(get_tokenized_text(doc))

100%|██████████| 9809/9809 [00:04<00:00, 2423.79it/s]


In [88]:
data = data.with_columns(pl.Series("processed_title", texts))
data = (
    data.explode("processed_title")
    .groupby(["processed_title", "label", "possibly_sensitive", "created_at"])
    .count()
    .sort("count", descending=True)
)
data = data.to_dicts()
# data

In [89]:
collection = prefix + "tweets_url_title_word"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe7b3c2f10>

In [90]:
like_count = (
    tweets.select(
        [
            pl.col("public_metrics"),
        ]
    )
    .unnest("public_metrics")
    .filter(
        (pl.col("like_count") > 1)
    )
    .select(
        [
            # like_count
            pl.col("like_count").quantile(0).alias("like_count_q0"),
            pl.col("like_count").quantile(0.25).alias("like_count_q1"),
            pl.col("like_count").quantile(0.5).alias("like_count_q2"),
            pl.col("like_count").quantile(0.75).alias("like_count_q3"),
            pl.col("like_count").quantile(1).alias("like_count_q4"),
            pl.col("like_count").std().alias("like_count_std"),
            pl.col("like_count").mean().alias("like_count_mean"),
            pl.col("like_count").var().alias("like_count_var"),
        ]
    )
)
quote_count = (
    tweets.select(
        [
            pl.col("public_metrics"),
        ]
    )
    .unnest("public_metrics")
    .filter(
        (pl.col("quote_count") > 1)
    )
    .select(
        [
            # quote_count
            pl.col("quote_count").quantile(0).alias("quote_count_q0"),
            pl.col("quote_count").quantile(0.25).alias("quote_count_q1"),
            pl.col("quote_count").quantile(0.5).alias("quote_count_q2"),
            pl.col("quote_count").quantile(0.75).alias("quote_count_q3"),
            pl.col("quote_count").quantile(1).alias("quote_count_q4"),
            pl.col("quote_count").std().alias("quote_count_std"),
            pl.col("quote_count").mean().alias("quote_count_mean"),
            pl.col("quote_count").var().alias("quote_count_var"),
        ]
    )
)
reply_count = (
    tweets.select(
        [
            pl.col("public_metrics"),
        ]
    )
    .unnest("public_metrics")
    .filter(
        (pl.col("reply_count") > 1)
    )
    .select(
        [
            # reply_count
            pl.col("reply_count").quantile(0).alias("reply_count_q0"),
            pl.col("reply_count").quantile(0.25).alias("reply_count_q1"),
            pl.col("reply_count").quantile(0.5).alias("reply_count_q2"),
            pl.col("reply_count").quantile(0.75).alias("reply_count_q3"),
            pl.col("reply_count").quantile(1).alias("reply_count_q4"),
            pl.col("reply_count").std().alias("reply_count_std"),
            pl.col("reply_count").mean().alias("reply_count_mean"),
            pl.col("reply_count").var().alias("reply_count_var"),
        ]
    )
)
retweet_count = (
    tweets.select(
        [
            pl.col("public_metrics"),
        ]
    )
    .unnest("public_metrics")
    .filter(
        (pl.col("retweet_count") > 1)
    )
    .select(
        [
            # retweet_count
            pl.col("retweet_count").quantile(0).alias("retweet_count_q0"),
            pl.col("retweet_count").quantile(0.25).alias("retweet_count_q1"),
            pl.col("retweet_count").quantile(0.5).alias("retweet_count_q2"),
            pl.col("retweet_count").quantile(0.75).alias("retweet_count_q3"),
            pl.col("retweet_count").quantile(1).alias("retweet_count_q4"),
            pl.col("retweet_count").std().alias("retweet_count_std"),
            pl.col("retweet_count").mean().alias("retweet_count_mean"),
            pl.col("retweet_count").var().alias("retweet_count_var"),
        ]
    )
)
data = pl.concat([like_count, quote_count, reply_count, retweet_count], how="horizontal")
data = data.to_dicts()

In [91]:
collection = prefix + "tweets_statistic"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe7e4980d0>

## User

In [92]:
users_columns = users.columns[1:]
users_columns

users_columns_tmp = ['protected',
 'username',
 'description',
 'name',
 'created_at',
 'entities.description.urls',
 'entities.description.hashtags',
 'public_metrics.followers_count',
 'public_metrics.following_count',
 'public_metrics.tweet_count',
 'public_metrics.listed_count',
 'url',
 'entities.url.urls',
 'pinned_tweet_id',
 'entities.description.mentions',
 'entities.description.cashtags',
 'withheld.country_codes',
 'withheld.scope',
 'source']

In [93]:
tweets.columns

['lang',
 'id',
 'entities',
 'public_metrics',
 'context_annotations',
 'possibly_sensitive',
 'created_at',
 'author_id',
 'text',
 'conversation_id',
 'edit_history_tweet_ids',
 'reply_settings',
 'in_reply_to_user_id',
 'referenced_tweets',
 'geo',
 'withheld',
 'label',
 'processed_text']

In [94]:
users_tweets = tweets.select(
    [
        pl.col("processed_text"),
        pl.col("text"),
        pl.col("author_id"),
        pl.col("label"),
        pl.col("id").alias("tweet_id"),
        pl.col("possibly_sensitive"),
        pl.col("created_at"),
    ]
)
users_tweets


processed_text,text,author_id,label,tweet_id,possibly_sensitive,created_at
list[str],str,i64,str,i64,bool,datetime[ns]
"[""air"", ""ryan"", ... ""sweepstake""]","""On Air with Ry...",1012203358512443392,"""false""",1029123395739414528,false,2018-08-13 21:51:52
"[""american"", ""idol"", ... ""gabby""]","""‘American Idol...",829904857305927680,"""false""",998353516434518016,false,2018-05-21 00:03:21
"[""latest"", ""art"", ... ""kardashian""]","""@ScottDisick @...",31259532,"""false""",1051158211208736768,false,2018-10-13 17:10:21
"[""youngblood"", ""seconds"", ... ""zayn""]","""@foquinha Youn...",194346085,"""false""",1011368336804937728,false,2018-06-25 21:59:36
"[""kylie"", ""jenner"", ... ""toll""]","""Kylie Jenner ‘...",31469390,"""false""",954584822474838016,true,2018-01-20 05:22:11
"[""yes"", ""studio"", ... ""embarrassed""]","""@Khalais1 @ibp...",901672053383757824,"""false""",1033841638408372224,false,2018-08-26 22:20:29
"[""say"", ""jesuit"", ... ""evangelical""]","""@realDonaldTru...",1456041367,"""false""",1037124307007225856,false,2018-09-04 23:44:38
"[""kim"", ""kardashian"", ... ""scandal""]","""Kim Kardashian...",1082727457,"""false""",1025125815430860800,false,2018-08-02 21:06:54
"[""rt"", ""rt"", ... ""nelm""]","""RT @rihanna: R...",99669829,"""false""",23424586641051648,false,2011-01-07 17:03:51
"[""portia"", ""de"", ... ""news""]","""Portia de Ross...",2992451738,"""false""",1012372432978575360,false,2018-06-28 16:29:31


In [95]:
data = users_tweets.join(
    users.select(users_columns), left_on="author_id", right_on="id", how="semi"
).groupby(["author_id"]).agg([
    (pl.col("label")).alias('label')
])

data = users.join(data, left_on="id", right_on="author_id", how="left").select([
    pl.col("created_at").str.strptime(pl.Date, fmt='%Y-%m-%dT%H:%M:%S.%fZ').cast(pl.Datetime).dt.year().alias("year"),
    pl.col("label").apply(lambda x: True if "true" in x else False).alias("label"),
]).groupby(["year", "label"]).count().sort("year", "label")
data = data.to_dicts()

In [96]:
collection = prefix + "users_label"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


<pymongo.results.InsertManyResult at 0x7efe8ae9fca0>

In [97]:
data = (
    users_tweets.join(
        users.select(users_columns), left_on="author_id", right_on="id", how="semi"
    )
    .groupby(["author_id"])
    .agg([(pl.col("label")).alias("label")])
    .select(
        [
            pl.all().exclude(["label"]),
            pl.col("label")
            .apply(lambda x: True if "true" in x else False)
            .alias("label"),
        ]
    )
)

data = (
    users.join(data, left_on="id", right_on="author_id", how="left")
    .filter(pl.col("description").is_not_null())
    .select(
        [
            pl.col("created_at")
            .str.strptime(pl.Date, fmt="%Y-%m-%dT%H:%M:%S.%fZ")
            .cast(pl.Datetime)
            .dt.year()
            .alias("year"),
            pl.col("label"),
            pl.col("id"),
            pl.col("description"),
        ]
    )
)

description = list(data["description"])


In [98]:
import spacy
from tqdm import tqdm

nlp = spacy.load("en_core_web_sm")

def get_tokenized_text(doc) -> "list[str]":
    return list(
        filter(
            lambda x: str(x) != "",
            [
                token.lemma_.lower() if not token.is_stop and token.is_alpha else ""
                for token in doc
            ],
        )
    )

texts: "list[list[str]]" = []
for doc in tqdm(nlp.pipe(description, n_process=-1), total=len(description)):
    texts.append(get_tokenized_text(doc))

100%|██████████| 514629/514629 [02:22<00:00, 3618.24it/s]


In [99]:
data_user_word = (
    data.select(["year", "label"])
    .with_columns(pl.Series("processed_description", texts))
    .explode("processed_description")
    .groupby(["year", "label", "processed_description"])
    .count()
    .sort("count", descending=True)
)
data_user_word
data_user_word = data_user_word.to_dicts()

In [100]:
collection = prefix + "users_description_word"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data_user_word)


<pymongo.results.InsertManyResult at 0x7efe880a60a0>

In [101]:
data = (
    users_tweets.join(
        users.select(users_columns), left_on="author_id", right_on="id", how="semi"
    )
    .groupby(["author_id"])
    .agg([(pl.col("label")).alias("label")])
    .select(
        [
            pl.all().exclude(["label"]),
            pl.col("label")
            .apply(lambda x: True if "true" in x else False)
            .alias("label"),
        ]
    )
)

data = (
    users.join(data, left_on="id", right_on="author_id", how="left")
    .filter(pl.col("description").is_not_null())
    .select(
        [
            pl.col("created_at")
            .str.strptime(pl.Date, fmt="%Y-%m-%dT%H:%M:%S.%fZ")
            .cast(pl.Datetime)
            .dt.year()
            .alias("year"),
            pl.col("description"),
            pl.col("label"),
            pl.col("id"),
            pl.col("public_metrics.followers_count").alias("followers_count"),
            pl.col("public_metrics.following_count").alias("following_count"),
            pl.col("public_metrics.tweet_count").alias("tweet_count"),
            pl.col("public_metrics.listed_count").alias("listed_count"),
        ]
    )
    .with_columns(pl.Series("processed_description", texts))
    .select([
        pl.all(),
        pl.col('processed_description').arr.lengths().alias('processed_description_length'),
        pl.col('description').str.n_chars().alias('description_character_length'),
        pl.col('description').str.lengths().alias('description_length'),
    ])
)

data
data = data.to_dicts()

In [103]:
collection = prefix + "users_description_length"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)


AutoReconnect: ac-ovtzfw5-shard-00-02.v2ock79.mongodb.net:27017: connection closed

In [None]:
data = (
    users_tweets.join(
        users.select(
            [
                "created_at",
                'id'
            ]
        ),
        left_on="author_id",
        right_on="id",
        how="outer",
    )
    .groupby(["tweet_id"])
    .first()
    .filter(pl.col("created_at_right").is_not_null())
    .filter(pl.col("created_at").is_not_null())
    .select(
        [
            pl.all(),
            pl.col("created_at").dt.year().alias("tweets_created_at"),
            pl.col("created_at_right").str.strptime(pl.Date, fmt="%Y-%m-%dT%H:%M:%S.%fZ").cast(pl.Datetime).dt.year().alias("users_created_at"),
            (
                pl.col("created_at")
                - pl.col("created_at_right")
                .str.strptime(pl.Date, fmt="%Y-%m-%dT%H:%M:%S.%fZ")
                .cast(pl.Datetime)
            )
            .cast(pl.Int64)
            .alias("time_difference"),
        ]
    )
    .select(
        [
            pl.all(),
            (pl.col("time_difference") // 1000 // 1000 // 60 // 60).alias(
                "time_difference_processed"
            ),
        ]
    )
    .sort("time_difference", descending=True)
)

# data
data = data.to_dicts()


In [None]:
collection = prefix + "tweets_users_time_difference"
if collection in db.list_collection_names():
    db.drop_collection(collection)
col = db[collection]

col.insert_many(data)
