In [1]:
import gzip
import os 
import json
import re

import numpy as np
import pandas as pd
import psycopg2
from sqlalchemy import create_engine

from dotenv import load_dotenv

from utils import config
from utils import utilities

In [2]:
conn = psycopg2.connect(
            dbname=config.DATABASE['DBNAME'],
            host=config.DATABASE['HOST'],
            user=config.DATABASE['USER'],
            password=config.DATABASE['PASSWORD'],
            port=config.DATABASE['PORT'],
        )

engine = create_engine(
            f"postgresql://{config.DATABASE['USER']}:"
            f"{config.DATABASE['PASSWORD']}@"
            f"{config.DATABASE['HOST']}:"
            f"{config.DATABASE['PORT']}/"
            f"{config.DATABASE['DBNAME']}"
        )

In [4]:
"""
all_ids = []
with gzip.open(config.TWEETS_PATH, 'rb') as f:
    for row_number, current_tweet in enumerate(f):
        all_ids.append(json.loads(current_tweet.decode(encoding='utf-8'))['id'])

print(f'Unique id amount: {len(set(all_ids))}, out of total id amount: {len(all_ids)}')
"""

"\nall_ids = []\nwith gzip.open(config.TWEETS_PATH, 'rb') as f:\n    for row_number, current_tweet in enumerate(f):\n        all_ids.append(json.loads(current_tweet.decode(encoding='utf-8'))['id'])\n\nprint(f'Unique id amount: {len(set(all_ids))}, out of total id amount: {len(all_ids)}')\n"

In [5]:
batch_size = 100000

data_rows = []
conversations_existing_ids = utilities.run_written_query('SELECT id FROM conversations', to_dataframe=True, option='from_string').id.astype(str).values
with gzip.open(config.TWEETS_PATH, 'rb') as f:
    for row_number, current_tweet in enumerate(f):
        data_rows.append(json.loads(current_tweet.decode(encoding='utf-8')))
        if (row_number+1) % batch_size == 0:
            print(row_number+1)
            break

  return pd.read_sql(query, con=conn)


100000


In [6]:
tweets = pd.DataFrame(data_rows)
metrics = pd.DataFrame(tweets.public_metrics.to_list())
tweets[metrics.columns] = metrics
tweets.rename(columns={'text' : 'content',
                       'lang' : 'language',

                    }, inplace=True)

tweets.columns

Index(['author_id', 'conversation_id', 'created_at', 'entities', 'id',
       'language', 'possibly_sensitive', 'public_metrics', 'referenced_tweets',
       'reply_settings', 'source', 'content', 'context_annotations',
       'attachments', 'in_reply_to_user_id', 'geo', 'withheld', 'like_count',
       'quote_count', 'reply_count', 'retweet_count'],
      dtype='object')

In [7]:
# TODO authors uploading when uploading conversations_table
# TODO drop duplicates of ids

conversations_table = tweets[['id', 'author_id', 'content', 'possibly_sensitive', 'language',
        'source', 'retweet_count', 'reply_count', 'like_count', 'quote_count', 'created_at']]

In [577]:
# Find all referenced tweets - iterate over existing array of references for each tweet
refs = tweets.dropna(subset='referenced_tweets')[['id', 'referenced_tweets']]
conversation_references_table = pd.DataFrame(columns=['conversation_id', 'parent_id', 'type'])

for layer in range(refs.referenced_tweets.apply(lambda x : len(x)).max()):
    # Select all references from given layer and find tweets they reffer to
    layer_references = refs.referenced_tweets.apply(lambda x : x[layer] if(len(x) > layer) else None).dropna()
    conv_refs = pd.DataFrame(layer_references.to_list(), index=layer_references.index)
    conv_refs.rename(columns={'id' : 'parent_id'}, inplace=True)
    conv_refs = (conv_refs.join(refs.id).rename(columns={'id' : 'conversation_id'}))

    conversation_references_table = pd.concat([conversation_references_table, conv_refs])

In [578]:
links_table = pd.DataFrame(columns=['conversation_id', 'expanded_url', 'title', 'description'])

entities_index = tweets.dropna(subset='entities').entities.index
links_raw = pd.DataFrame(tweets.dropna(subset='entities').entities.to_list(), index=entities_index).dropna(subset='urls').urls


for layer in range(links_raw.apply(lambda x : len(x)).max()):
    layer_links = links_raw.apply(lambda x : x[layer] if (len(x) > layer) else None).dropna()
    tmp_links = pd.DataFrame(layer_links.to_list(), index=layer_links.index)
    tmp_links = tmp_links.join(tweets.id).rename(columns={'id' : 'conversation_id'})

    links_table = pd.concat([links_table, tmp_links])[list(links_table.columns)]

# Delete links with longer than 255 char URL
links_table = links_table[links_table.expanded_url.apply(lambda url_len : len(url_len) < 256)]

In [563]:
annotations_table = pd.DataFrame(columns=['conversation_id', 'value', 'type', 'probability'])

annotations_raw = pd.DataFrame(tweets.dropna(subset='entities').entities.to_list(), index=entities_index).dropna(subset='annotations').annotations

for layer in range(annotations_raw.apply(lambda x : len(x)).max()):
    layer_annotations = annotations_raw.apply(lambda x : x[layer] if (len(x)>layer) else None).dropna()
    tmp_annotations  = pd.DataFrame(layer_annotations.to_list(), index=layer_annotations.index)
    tmp_annotations.rename(columns={'normalized_text' : 'value'}, inplace=True)
    tmp_annotations = tmp_annotations.join(tweets.id).rename(columns={'id' : 'conversation_id'})
    
    annotations_table = pd.concat([annotations_table, tmp_annotations])[list(annotations_table.columns)]

In [594]:
contexts_raw = tweets.context_annotations.dropna()
annotations_index = contexts_raw.index

context_annotations_table= pd.DataFrame(columns=['conversation_id', 'context_domain_id', 'context_entity_id'])
context_entities_table = pd.DataFrame(columns=['id', 'name', 'description'])
context_domains_table = pd.DataFrame(columns=['id', 'name', 'description'])
domain_entities_tmp = pd.DataFrame(columns=['domains', 'entities'])

for layer in range(contexts_raw.apply(lambda x : len(x)).max()):
    context_annotations = contexts_raw.apply(lambda x : x[layer] if (len(x) > layer) else None).dropna()
    domain_entities_tmp = pd.DataFrame(context_annotations.to_list(), index=context_annotations.index)

    ents = pd.DataFrame(domain_entities_tmp.entity.to_list(), index=context_annotations.index).join(tweets[['id']].rename(columns={'id' : 'conversation_id'}).conversation_id)
    doms = pd.DataFrame(domain_entities_tmp.domain.to_list(), index=context_annotations.index).join(tweets[['id']].rename(columns={'id' : 'conversation_id'}).conversation_id)

    context_entities_table = pd.concat([context_entities_table, ents])[context_entities_table.columns].drop_duplicates(subset='id')
    context_domains_table = pd.concat([context_domains_table, doms])[context_domains_table.columns].drop_duplicates(subset='id')
    
    anns = ents[['id', 'conversation_id']].rename(columns={'id' : 'context_entity_id'}).join(doms[['id']].rename(columns={'id' : 'context_domain_id'}))
    context_annotations_table = pd.concat([context_annotations_table, anns])[context_annotations_table.columns].drop_duplicates()

In [683]:
hashtags_full_list = pd.DataFrame(columns=['tag'])

In [684]:
hashtags_raw = pd.DataFrame(tweets.entities.dropna().to_list(), index=entities_index)
hashtags = hashtags_raw.hashtags.dropna()
all_hashtags = pd.DataFrame(columns=['tag'])

for col in pd.DataFrame(hashtags.to_list()).columns:
    all_hashtags = pd.concat([all_hashtags, pd.DataFrame(pd.DataFrame(hashtags.to_list())[col].to_list(), index=hashtags.index)])[['tag']]

all_hashtags = all_hashtags.dropna()
all_hashtags = all_hashtags.join(tweets.id).rename(columns={'id' : 'conversation_id'})

In [685]:
unique_used_hashtags = all_hashtags[['tag']].drop_duplicates()

In [686]:
new_hashtags = unique_used_hashtags[~unique_used_hashtags['tag'].isin(hashtags_full_list)]

In [692]:
new_hashtags = unique_used_hashtags[~unique_used_hashtags['tag'].isin(hashtags_full_list)]
hashtags_full_list = pd.concat([hashtags_full_list, new_hashtags], ignore_index=True)
upload_hashtags = hashtags_full_list[hashtags_full_list.tag.isin(new_hashtags.tag)]

tag    object
dtype: object