# Network metrics

This notebook computes different network centrality measures (degree, pagerank, etc.) for candidates nodes. The network was build using the retweet information: if a twitter user A retweets user B, A -> B (A points to B), a directed graph.

In [None]:
from types import new_class
import pandas as pd
import pyathena
import pymongo
from pymongo.errors import BulkWriteError
import os
import dotenv
import datetime
import networkx as nx
import tqdm
import logging
import gc
import plotly.io as pio
from functools import partial


In [None]:
dotenv.load_dotenv(".env")

logging.basicConfig(format='[%(asctime)s] - %(name)s - %(funcName)s - %(levelname)s : %(message)s', level=logging.INFO)
log = logging.getLogger(__name__)
mongo_client = pymongo.MongoClient(os.environ["MONGODB_URL"])
twitter_db = mongo_client.TwitterConstituyenteDB

conn = pyathena.connect(s3_staging_dir=os.environ["AWS_ATHENA_S3_STAGING_DIR"], region_name=os.environ["AWS_REGION"])
candidates_df = pd.read_sql("""SELECT * FROM "twitter-constituyente"."constituyentes";""", conn)
candidates_ids = candidates_df["user__id_str"].dropna().to_list()

In [None]:

# compute network metrics in 7 day window and save to mongo
def compute_graph_metrics(graph):
    for func, col in tqdm.tqdm([
        (lambda g: {node: value for node, value in g.degree()}, "degree"),
        (lambda g: {node: value for node, value in g.in_degree()}, "in_degree"),
        (lambda g: {node: value for node, value in g.out_degree()}, "out_degree"),
        (nx.degree_centrality, "degree_centrality"),
        (partial(nx.eigenvector_centrality, max_iter=1000), "eigenvector_centrality"),
        (nx.in_degree_centrality, "in_degree_centrality"),
        (nx.out_degree_centrality, "out_degree_centrality"),
        # (nx.closeness_centrality, "closeness"),
        # (nx.betweenness_centrality, "betweenness"),
        (nx.pagerank, "pagerank"),
        # (nx.katz_centrality_numpy, "katz"),
    ]):
        nx.set_node_attributes(
            graph, func(graph), col
        )

def bulk_write_to_mongo(collection, data):
    to_insert = len(data)
    try:
        if to_insert > 0:
            collection.insert_many(data, ordered=False)
        return to_insert, 0
    except BulkWriteError as e:
        log.error("BulkWriteError")
        inserted = e.details["nInserted"]
        return inserted, to_insert - inserted


start_date = datetime.date(2021, 1, 1)
end_date = datetime.date(2021, 5, 14)
delta = datetime.timedelta(days=1)

start_window = start_date
end_window = start_window + 6 * delta
window_df = pd.read_sql(f"""
        SELECT * FROM "twitter-constituyente"."daily_graph"
        WHERE (date >= DATE('{(start_window-delta).strftime("%Y-%m-%d")}')) AND (date <= DATE('{(end_window-delta).strftime("%Y-%m-%d")}'));""", conn) 

while start_date <= end_date:
    start_window = start_date
    end_window = start_window + 6 * delta
    # window_df = df[(df["date"]>=start_window) & (df["date"]<=end_window)]
    
    missing_day_df = pd.read_sql(f"""
        SELECT * FROM "twitter-constituyente"."daily_graph"
        WHERE date = DATE('{end_window.strftime("%Y-%m-%d")}');""", conn) 
    window_df = window_df[window_df["date"]>=start_window].append(missing_day_df, ignore_index=True)

    log.info(f"Window: {start_window} - {end_window}, Length: {len(window_df)}")

    graph = nx.DiGraph()
    graph.add_weighted_edges_from((window_df
                .groupby(["source", "target"])
                .agg({"count": "sum"})
                .reset_index()
                .itertuples(index=False, name=None)))
    compute_graph_metrics(graph)
    # del window_df
    # gc.collect()
    graph_data = pd.DataFrame.from_dict(dict(graph.nodes(data=True)), orient='index')
    graph_data["window_start"] = datetime.datetime.combine(start_window, datetime.datetime.min.time())
    graph_data["window_end"] = datetime.datetime.combine(end_window, datetime.datetime.min.time()) 
    
    inserted, not_inserted = bulk_write_to_mongo(
        twitter_db.network_timeseries_7dayswindow_fullbase, 
        graph_data.reset_index().rename(columns={"index": "user__id_str"}).to_dict('records')
    )
    log.info(f"Inserted: {inserted}, Not inserted: {not_inserted}")
    start_date += delta