# Daily Tweet Processing
Iterate tweets for a given day and perform some calculations

In [None]:
# required imports to access api_db, misc, misc.CONFIG, ...
import sys
sys.path = ['.', '..', '../..'] + sys.path
from collection import *

<hr>
<h1 align="center">driver code</h1>

1. Define tasks to execute daily
2. Check if these have been executed for each day of the past month
3. Execute the ones that have not yet been executed (this ensures that new ones will be retroactively updated)
4. 

---
Define tasks below - these should be replicated in the api (or maybe make it generably able to access any `task_%s` collection from any endpoint, assuming injection is not a concern atm)

In [None]:
# task 1 - count tweets by type
from collections import defaultdict
def count_tweets_by_type(day):
    task = Task(api_db.db, "count by type")
    if task.exists_day(day): return # already processed
    metrics = defaultdict(int, next(api_db.col_tweets.aggregate([
        {"$match": {"created_at": get_filter_by_day(day)}}, 
        {"$facet": {
            "retweet": [
                {"$match": {"retweeted_status": {"$exists": True}}},
                {"$count": "retweet"},
            ],
            "quote": [
                {"$match": {"quoted_status": {"$exists": True}}},
                {"$count": "quote"},
            ],
            "reply": [
                {"$match": {"in_reply_to_status_id": {"$exists": True}}},
                {"$count": "reply"},
            ],
            "original": [
                {"$match": {"original": True}},
                {"$count": "original"},
            ],
            "total": [
                {"$count": "total"},
            ]
        }},
        {"$project": {
            "retweet": {"$arrayElemAt": ["$retweet.retweet", 0]},
            "quote": {"$arrayElemAt": ["$quote.quote", 0]},
            "reply": {"$arrayElemAt": ["$reply.reply", 0]},
            "original": {"$arrayElemAt": ["$original.original", 0]},
            "total": {"$arrayElemAt": ["$total.total", 0]}
        }}
    ])))
    # force defaultdict to create 0
    [metrics[x] for x in ["retweet", "quote", "reply", "original", "total"]]
    task.insert(day, metrics)

In [None]:
# task 2 - measure fake news
def measure_fakenews(day):
    task = Task(api_db.db, "measure fakenews")
    if task.exists_day(day): return # already processed

    # helper function
    from urllib.parse import urlparse
    def netloc(url): return urlparse(url.strip()).netloc.replace("www.", "")

    def normalize_name(name): return name.replace(".", "-")

    # get fakenews sites
    with open(abs_path("../fakenews.txt")) as inf: fakenews_sites = set(map(lambda s: s.strip(), inf.readlines()))
    # search query
    tweets = api_db.col_tweets.find({
        "urls":{"$exists": True},
        "created_at": get_filter_by_day(day)
        }, {"urls": True, "user": True, "favorite_count": True, "retweet_count": True})
    # collect
    metrics = {"total":0, "sites": {}, "favorite_count": 0, "retweet_count": 0}
    for fake in fakenews_sites: metrics["sites"][normalize_name(fake)]=0
    for t in tweets:
        for url in t["urls"]:
            loc = netloc(url["expanded_url"])
            if loc == "facebook.com":
                for fake in fakenews_sites:
                    if fake in url["expanded_url"]:
                        metrics["sites"][normalize_name(fake)]+=1
                        if "favorite_count" in t: metrics["favorite_count"]+=t["favorite_count"]
                        if "retweet_count" in t: metrics["retweet_count"]+=t["retweet_count"]
                        break
            elif loc in fakenews_sites:
                metrics["sites"][normalize_name(loc)]+=1
                if "favorite_count" in t: metrics["favorite_count"]+=t["favorite_count"]
                if "retweet_count" in t: metrics["retweet_count"]+=t["retweet_count"]
    metrics["total"] = sum(v for k, v in metrics["sites"].items())
    task.insert(day, metrics)

In [None]:
# task 3 - measure suspensions
def measure_suspensions(day):
    task = Task(api_db.db, "measure suspensions")
    # force recalculation each day
    # if task.exists_day(day): return # already processed

    metrics = {"total":0, "users": []}
    users = api_db.col_users.find({"suspended": True, "time_suspended": get_filter_by_day(day)},
     {"screen_name": True, "friends_count": True, "followers_count": True, "statuses_count": True, "description": True, "favourites_count": True, "created_at": True})
    
    for user in users:
        metrics["total"]+=1
        metrics["users"].append(user)

    task.insert(day, metrics)

In [None]:

# candidates
candidates = [("AnaMartinsGomes", 771383605), ("AndreCVentura", 1097962618596327424), ("BrunoARFialho", 1221188948996739072), ("joao_ferreira33", 951055588330475520), ("mmatias_", 948552829), ("Marcelo Rebelo de Sousa", 0), ("LiberalMayan", 1286335166881964032), ("_tinoderans_", 4644839074)]

In [None]:
def refresh_candidates_tweets():
    oldest_t = misc.CONFIG["collection"]["oldest_tweet"]
    for username, _id in candidates:
        if _id <= 0: continue # ignore marcelo
        with DoneMessage("Refreshing %s" % username):
            # refresh all tweets to get updated values for likes, ... since oldest_t
            tweets = get_tweets({"_id": _id}, api_db.api.GetUserTimeline, "since_id", oldest_t, {"trim_user":True})
            insert_tweets(tweets)
            print("refreshed %d tweets" % len(tweets), end="")
refresh_candidates_tweets()

In [None]:
# task 4 - measure presidential candidates
import re
def measure_candidates(day):
    task = Task(api_db.db, "measure candidates")
    if task.exists_day(day): return # already processed


    metrics = {"candidates": {}} # daily metrics
    # global_metrics = {} # only one doc in the collection, which is updated
    def get_tweet_type(tweet):
        if "original" in tweet: return "original"
        if "retweeted_status" in tweet: return "retweet"
        if "quoted_status" in tweet: return "quote"
        if "in_reply_to_user_id" in tweet or "in_reply_to_status_id" in tweet: return "reply"
        return "original"

    for user, _id in candidates:
        _id_str = str(_id)
        if _id > 0: # ignore candidates without twitter account
            # account metrics for this day
            account = get_account_details(user_id=_id)
            if account:
                account = user_to_db_format(account)
                upsert_user(account)
            else: account = api_db.col_users.find({"_id": _id})
            metrics["candidates"][_id_str] = {
                "name": account["name"],
                "screen_name": account["screen_name"],
                "followers_count": account["followers_count"],
                "tweets": []
            }
            # tweet metrics for this day
            tweets = api_db.col_tweets.find({"user": _id, "created_at": get_filter_by_day(day)},
                 {"retweet_count": True, "favorite_count": True, "retweeted_status": True, "quoted_status": True, "in_reply_to_status_id": True, "in_reply_to_user_id": True, "original": True, "full_text": True})
            for t in tweets:
                if "retweet_count" not in t: t["retweet_count"] = 0
                if "favorite_count" not in t: t["favorite_count"] = 0
                _tweet = {"_id": t["_id"], "retweet_count": t["retweet_count"], "favorite_count": t["favorite_count"], "type": get_tweet_type(t)}
                if "original" in t or "quoted_status" in t: _tweet["full_text"] = t["full_text"]
                metrics["candidates"][_id_str]["tweets"].append(_tweet)
            metrics["candidates"][_id_str]["tweet_impact"] = sum(t["retweet_count"] + t["favorite_count"] for t in metrics["candidates"][_id_str]["tweets"])

            # count name mentions and mentions
            regex_query = re.compile(account["name"].replace(" ", ".{0,1}"), re.IGNORECASE)
            metrics["candidates"][_id_str]["name_mentions"] = api_db.col_tweets.count_documents({"created_at": get_filter_by_day(day), "full_text": regex_query})
            # count mentions
            metrics["candidates"][_id_str]["mentions"] = api_db.col_tweets.count_documents({"created_at": get_filter_by_day(day), "user_mentions": _id})

    task.insert(day, metrics)

# Main function that calls declared tasks
Each task must be manually registered

In [None]:
from datetime import datetime, timedelta

In [None]:
def main_caller(day):
    print("")
    with DoneMessage("   count_tweets_by_type"):count_tweets_by_type(day)
    with DoneMessage("   measure_fakenews"):measure_fakenews(day)
    with DoneMessage("   measure_suspensions"):measure_suspensions(day)
    with DoneMessage("   measure_candidates"):measure_candidates(day)

In [None]:
day = misc.CONFIG["collection"]["oldest_tweet"]
# process every day from start to yesterday (today only when whole day has gone by)
while day.date() + timedelta(days=1) < datetime.now(day.tzinfo).date():
    day+=timedelta(days=1)
    with DoneMessage("Processing day %s" % day):
        main_caller(day)

In [None]:
# print(get_account_details(20509689))

In [None]:
# from datetime import timezone
# find_params = find_exclude_invalid({
#     "depth": 0
# })
# oldest_t = misc.CONFIG["collection"]["oldest_tweet"]
# # users = api_db.col_users.find(find_params, no_cursor_timeout=True)
# users = [{"_id": 8665852}]
# for u in users:
#     print("getting tweets for: %s..." % u["_id"], end="", flush=True)
#     tweets = get_tweets(u, api_db.api.GetUserTimeline, "since_id",  datetime(2020, 7, 18, tzinfo=timezone.utc), {"trim_user":True})
#     # insert_tweets(tweets)
#     # update_most_common_language(u, tweets) # removed because not useful here
#     print("got %d new tweets, done." % len(tweets))
#     if len(tweets) > 0: break

In [None]:
# # [print(str(t) + "\n") for t in tweets[-10:-1]]
# for t in tweets[0:10000]:
#     if "in_reply_to_user_id" in t:
#         print(t)

In [None]:
# misc.CONFIG["collection"]["oldest_tweet"]

In [None]:

# for tweet in api_db.col_tweets.find({"created_at": {"$gte": _from, "$lt": _to}}, no_cursor_timeout=True):
#     print(tweet)
#     break

In [None]:
print("DONE")