In [1]:
import pystache, plotly, json, random, sys, yaml, glob, os
import pandas as pd
import plotly.graph_objects as go

from google.cloud import bigquery
from google.oauth2 import service_account
from datetime import datetime

# Creates Interactive Diffusion Graphs

1. Reads yaml configuration files from `/home/jupyter/data/www/covid19-static-pages/configs`
2. Queries for retweets from Big Query
3. Processes and produces simplified JSON output of retweets.
3. Reads simplified JSON into Plotly and writes JSON configurations for Plotly Graphs and HTML files.

In [2]:
credentials = service_account.Credentials.from_service_account_file(
    '/home/jupyter/covid-19-data/.credentials/google-connector.json')
project_id = 'crypto-eon-164220'
client = bigquery.Client(credentials=credentials, project=project_id)

## 1. Get top retweets from Big Query

In [3]:
def get_top_N_retweeted_tweets(table, N=25):
    sys.stderr.write("Querying for top {} retweets in {}...".format(N, table))
    
    query_job = client.query("""
SELECT
  id AS retweet_id,
  min(original_tweet_id) AS orig_id,
  min(tweet_text) AS orig_text,
  min(times_retweeted_) AS times_retweeted,
  min(original_author) AS orig_author,
  min(original_followers) as orig_followers_count,
  min(original_posted) AS orig_posted,
  min(user.screen_name) AS retweeter,
  min(user.followers_count) AS retweeter_followers_count,
  min(PARSE_TIMESTAMP('%a %b %d %T %z %Y', created_at)) AS retweet_timestamp
FROM
  `crypto-eon-164220.tweets.{TABLE}` tweets,
  (
  SELECT
    MIN(retweeted_status.id) AS original_tweet_id,
    MIN(retweeted_status.text) AS tweet_text,
    MIN(retweeted_status.user.screen_name) AS original_author,
    MIN(retweeted_status.user.followers_count) AS original_followers,
    MIN(PARSE_TIMESTAMP('%a %b %d %T %z %Y', retweeted_status.created_at)) AS original_posted,
    COUNT(DISTINCT(id)) AS times_retweeted_
  FROM
    `crypto-eon-164220.tweets.{TABLE}`
  WHERE
    retweeted_status IS NOT NULL
    AND retweeted_status.id >= (SELECT MIN(id) FROM `crypto-eon-164220.tweets.{TABLE}`)
  GROUP BY
    retweeted_status.id
  ORDER BY times_retweeted_ DESC
  LIMIT {N}
  ) topRetweets
WHERE
  topRetweets.original_tweet_id = tweets.retweeted_status.id
GROUP by id
order by orig_id
""".format(TABLE=table,
           N=N) )
    
    sys.stderr.write("done; creating dataframe\n")
    return query_job.result().to_dataframe()

def get_tweets_retweeted_more_than_X_times(table, threshold=2000):
    sys.stderr.write("Querying for tweets retweeted more than {} times in {}...".format(threshold, table))
    
    query_job = client.query("""
SELECT
  id AS retweet_id,
  min(original_tweet_id) AS orig_id,
  min(tweet_text) AS orig_text,
  min(times_retweeted_) AS times_retweeted,
  min(original_author) AS orig_author,
  min(original_followers) as orig_followers_count,
  min(original_posted) AS orig_posted,
  min(user.screen_name) AS retweeter,
  min(user.followers_count) AS retweeter_followers_count,
  min(PARSE_TIMESTAMP('%a %b %d %T %z %Y', created_at)) AS retweet_timestamp
FROM
  `crypto-eon-164220.tweets.{TABLE}` tweets,
  (
  SELECT
    MIN(retweeted_status.id) AS original_tweet_id,
    MIN(retweeted_status.text) AS tweet_text,
    MIN(retweeted_status.user.screen_name) AS original_author,
    MIN(retweeted_status.user.followers_count) AS original_followers,
    MIN(PARSE_TIMESTAMP('%a %b %d %T %z %Y', retweeted_status.created_at)) AS original_posted,
    COUNT(DISTINCT(id)) AS times_retweeted_
  FROM
    `crypto-eon-164220.tweets.{TABLE}`
  WHERE
    retweeted_status IS NOT NULL
    AND retweeted_status.id >= (SELECT MIN(id) FROM `crypto-eon-164220.tweets.{TABLE}`)
  GROUP BY
    retweeted_status.id
  ORDER BY times_retweeted_ DESC
  ) topRetweets
WHERE
  topRetweets.original_tweet_id = tweets.retweeted_status.id
  AND topRetweets.times_retweeted_ > {TIMES_RETWEETED_THRESHOLD} 
GROUP by id
order by orig_id
""".format(TABLE=table,
           TIMES_RETWEETED_THRESHOLD=threshold) )
    
    sys.stderr.write("done; creating dataframe\n")
    return query_job.result().to_dataframe()

In [4]:
def create_dataframe_for_plotly(df, fileName):

    retweet_counts_by_id = list(df.orig_id.value_counts(ascending=False).keys())

    count = 0;
    to_return = pd.DataFrame()

    for original_id, retweets in df.groupby('orig_id'):
        count += 1;
        topN = retweet_counts_by_id.index(original_id) + 1 # The Top N tweet...
        print("Processing Tweet# {} - TopN [{}] - ID: {})".format(count, topN, original_id))

        sorted_retweets = retweets.sort_values(by='retweet_id').reindex()

        original_tweet = pd.DataFrame([{
            'id': sorted_retweets.iloc[0].orig_id,
            'created_at': sorted_retweets.iloc[0].orig_posted,
            'username': sorted_retweets.iloc[0].orig_author,
            'followers_count': sorted_retweets.iloc[0].orig_followers_count
        }])

        text = sorted_retweets.iloc[0].orig_text

        interested_rows = sorted_retweets[['retweet_id','retweet_timestamp','retweeter','retweeter_followers_count']]
        interested_rows.columns = ['id','created_at','username','followers_count']

        #Add the first row for the original tweet
        x = pd.concat([original_tweet, interested_rows]).reset_index(drop = True) 

        x['followers_count_cumsum'] = x.followers_count.cumsum()
        x['text'] = text
        x['top_N'] = topN
        
        x.created_at = x.created_at.apply(lambda x: x.isoformat())

        to_return = pd.concat([x, to_return])

    current_datestamp = datetime.today().strftime('%Y-%m-%d')
    with open(fileName +"_"+current_datestamp+'.json','w') as f:
        json.dump(to_return.sort_values(by='top_N').to_dict('records'), f)

In [5]:
def get_data(config):
    
    df = get_top_N_retweeted_tweets(config['table'], N= (config.get("topN") or 10) )
    
    create_dataframe_for_plotly(df, config['data'])

## 2. Create Plots

In [6]:
COLORS = plotly.colors.qualitative.Alphabet

In [7]:
def view_colors():
    sns.palplot(COLORS)

def normalize(values, desired_bounds):
    actual_bounds = (min(values), max(values))
    result = [desired_bounds[0] + (x - actual_bounds[0]) *
            (desired_bounds[1] - desired_bounds[0]) / \
            (actual_bounds[1] - actual_bounds[0]) for x in values]
    return [round(x,2) for x in result]

In [8]:
# Read the latest diffusion data...
def read_dataframe_from_file(CONFIG):

    #Get the latest file
    latest_file = sorted(glob.glob(CONFIG['data']+"*.json"))[-1]
    
    sys.stderr.write("Loading "+latest_file+"...")
    to_plot = json.load(open(latest_file,'r')) #Could put error handling here if necessary

    df = pd.DataFrame(to_plot)
    df['timestamp'] = df.created_at.apply(lambda t: pd.Timestamp(t))
    sys.stderr.write(("Read {:,} retweets\n".format(len(to_plot))))

    return df

In [9]:
def calculate_self_retweets(input_df):

    sys.stderr.write("Discounting self-retweets: [")
    df = pd.DataFrame.copy(input_df, deep=True)
    
    tweet_data = {}

    for topN in df.top_N.unique():
        newCounter = 0
        subValue = 0
        for idx, row in df[df.top_N == topN].sort_values(by='id').iterrows():
            if newCounter==0:
                thisUser = row.username
                origFollowerCount = row.followers_count
                origTweetID = str(row.id)
                tweet_data[str(topN)] = {
                    'text' : row.text,
                    'user' : thisUser,
                    'rank' : int(topN),
                    'color': COLORS[topN%len(COLORS)],
                    'id'   : origTweetID,
                    'self-rt' : [],
                    'time' : row.timestamp.isoformat()
                }
            else:
                if row.username == thisUser:
                    if row.followers_count > origFollowerCount:
                        subValue = origFollowerCount
                    else:
                        subValue = row.followers_count
                    tweet_data[str(topN)]['self-rt'].append(
#                         "2020-04-19T01:53:07+00:00"
                        {'x':row.timestamp.isoformat(), 'subValue':subValue}
                    )                    
            if subValue > 0:
                df.loc[idx,'followers_count_cumsum'] = row.followers_count_cumsum - subValue
            newCounter += 1;
        sys.stderr.write("{},".format(topN))
        
    sys.stderr.write("] done\n".format(topN))
    
    return (df, tweet_data)

In [10]:
def custom_label(row):
    return "{}: {} followers".format(row.username, row.followers_count)

def buildPlotlyGraph(df, topN=25):
    sys.stderr.write("Plotting top {} [".format(topN))
    
#     df['globalScaledMarker'] = df.followers_count.apply(lambda x: np.log2(x+1))
    df['globalScaledMarker'] = normalize(list(df.followers_count), (10,100))

    fig = go.Figure()

    for topNidx in range(1,topN+1):

        #Should be sorted safely?
        plot_df = df[df.top_N==topNidx].sort_values(by='id')

        if len(plot_df) > 0:

            tweetId   = str(plot_df.head(1).id.values[0])
            tweetText = plot_df.head(1).text.values[0]

            if pd.isna(tweetText):
                raise "No Tweet Text on First Entry"

            color = COLORS[topNidx%len(COLORS)]

            fig.add_trace(go.Scattergl(
                name = str(topNidx), #rank
                x    = plot_df.timestamp, 
                y    = plot_df.followers_count_cumsum,
                mode = 'markers+lines',
                marker = dict(
                    size  = plot_df.globalScaledMarker, #normalize(list(plot_df.followers_count), (10,35)),
                    color = color,
                    opacity = 0.5,
                    line=dict(
                        color='white',
                        width=0.4
                    ),
                ),
                line=dict(
                    color=color,
                    width=0.75,
                ),
                hovertemplate ='%{x} - %{text}',
                text = list(plot_df.apply(lambda row: custom_label(row), axis=1)),
                meta={'u':plot_df.username,
                      'f':plot_df.followers_count },
                showlegend = True
            ))
            sys.stderr.write(".")

    fig.update_layout(
        autosize=False,
        width=1400,
        height=600,
        margin=dict(
            t=1,r=50,l=1,b=1
        ),
        legend=dict(
            x=1,
            y=1,
            traceorder="normal",
            font=dict(
                family="sans-serif",
                size=12,
                color="black"
            ),
    ),
        yaxis_title="Potential Audience Exposure",)

    sys.stderr.write("] Done\n")
    return fig

In [11]:
# # For testing (initialize all below functions first (ah, jupyter)): 
# # c = full_run('/home/jupyter/data/www/covid19-static-pages/configs/cdc.yaml', query=False)

# # Then change the function as much as needed and access with:
# fig = buildPlotlyGraph(c['no_self_retweets'], (c.get('TopN') or 25))
# # fig.show()

# #Or write the JSON
# figJSON = json.loads(plotly.io.to_json(fig))
# figJSON['tweets'] = c['tweets']
# with open(STATIC_PAGES_ROOT +"/docs/"+ c['JSON'],'w') as outFile: 
#     json.dump(figJSON, outFile)
# buildSingleStaticPlotlyPage(c)

In [12]:
def buildSingleStaticPlotlyPage(CONFIG):
    sys.stderr.write("Writing HTML... ")

    main_template = open(STATIC_PAGES_ROOT + '/templates/plotly_js_template.html').read()

    with open(STATIC_PAGES_ROOT + "/" + CONFIG['output'],'w') as outFile:
        outFile.write(pystache.render(main_template, CONFIG))
    sys.stderr.write(" view at: http://epic.tweetsonamap.com/covid19-static-pages/"+CONFIG['output']+"\n")

<br><br><hr><br><br>

# Runtime


In [13]:
# GLOBAL VARIABLES?
STATIC_PAGES_ROOT = '/home/jupyter/data/www/covid19-static-pages'

In [14]:
def full_run(yaml_config, query=True, plot=True, write_html=True):
    print("Building page for: {}".format(yaml_config))
    config = yaml.load(open(yaml_config,'r'),
                       Loader=yaml.FullLoader)
    
    if query:
        get_data(config)
    
    if plot:
        config['df'] = read_dataframe_from_file(config)
    
        topN = config.get('topN') or 25
    
        config['no_self_retweets'], config['tweets'] = calculate_self_retweets(config['df'] )

        config['fig'] = buildPlotlyGraph(config['no_self_retweets'], topN=topN)
    
        figJSON = json.loads(plotly.io.to_json(config['fig']))
        figJSON['tweets'] = config['tweets']
        
        with open(STATIC_PAGES_ROOT +"/docs/"+ config['JSON'],'w') as outFile: 
            json.dump(figJSON, outFile)
            
    if write_html:
        buildSingleStaticPlotlyPage(config)
    
    return config

In [15]:
# Do some testing?
# full_run('/home/jupyter/data/www/covid19-static-pages/configs/covid-maps.yaml')

# x = full_run('/home/jupyter/data/www/covid19-static-pages/configs/who.yaml', query=False, plot=False)


### Load each configuration and do a full run

Loading from `/home/jupyter/data/www/covid19-static-pages/configs/`

Be sure to set `query=True` if we actually need to run the update; but let's only do that weekly because of cost.

In [16]:
pages = glob.glob(STATIC_PAGES_ROOT + "/configs/*.yaml")
print("Found {} configurations".format(len(pages)))

Found 8 configurations


In [21]:
for configuration_file in pages:
    x = full_run(configuration_file, query=False, plot=False)

Building page for: /home/jupyter/data/www/covid19-static-pages/configs/cdc-keywords.yaml
Building page for: /home/jupyter/data/www/covid19-static-pages/configs/covid-maps.yaml
Building page for: /home/jupyter/data/www/covid19-static-pages/configs/covid-charts.yaml
Building page for: /home/jupyter/data/www/covid19-static-pages/configs/cdc.yaml
Building page for: /home/jupyter/data/www/covid19-static-pages/configs/trump.yaml
Building page for: /home/jupyter/data/www/covid19-static-pages/configs/us_governors.yaml
Building page for: /home/jupyter/data/www/covid19-static-pages/configs/covid-data.yaml
Building page for: /home/jupyter/data/www/covid19-static-pages/configs/who.yaml


Writing HTML...  view at: http://epic.tweetsonamap.com/covid19-static-pages/docs/cdc-keyword.html
Writing HTML...  view at: http://epic.tweetsonamap.com/covid19-static-pages/docs/covid-maps.html
Writing HTML...  view at: http://epic.tweetsonamap.com/covid19-static-pages/docs/covid-charts.html
Writing HTML...  view at: http://epic.tweetsonamap.com/covid19-static-pages/docs/cdc.html
Writing HTML...  view at: http://epic.tweetsonamap.com/covid19-static-pages/docs/trump.html
Writing HTML...  view at: http://epic.tweetsonamap.com/covid19-static-pages/docs/us-governors.html
Writing HTML...  view at: http://epic.tweetsonamap.com/covid19-static-pages/docs/covid-data.html
Writing HTML...  view at: http://epic.tweetsonamap.com/covid19-static-pages/docs/who.html


<br><br><br><hr><br><br><br>

In [None]:
for configuration_file in pages:
    x = full_run(configuration_file, query=True)

In [84]:
print("Be sure to run the following command to copy the data files to Google buckets:\n\n")
print("gsutil -m cp -r /home/jupyter/data/www/covid19-static-pages/docs/data gs://epic-covid19/diffusion-graphs/")

Be sure to run the following command to copy the data files to Google buckets:


gsutil -m cp -r /home/jupyter/data/www/covid19-static-pages/docs/data gs://epic-covid19/diffusion-graphs/
