## Panacea Lab COVID-19 dataset to Postgres
[Raw data source](https://github.com/thepanacealab/covid19_twitter)

In [1]:
# last_stop = 20000000
from twitter_auth_info import Auth
max_count = -1 # For debugging
LOOKUP_TWEETS_LIMIT = 100 # https://developer.twitter.com/en/docs/tweets/post-and-engage/api-reference/get-statuses-lookup
runtime_log = open("tsv_to_postgres_log", "w")
error_log = open("tsv_to_postgres_errors_log", "w")


In [2]:
import os
import pandas as pd
from IPython.display import clear_output
from datetime import datetime

def get_time_now():
    now = datetime.now()
    return now.strftime("%Y-%m-%d %H:%M:%S")

from twarc import Twarc
twarc = Twarc(Auth.api['key'], Auth.api['secret'], Auth.access['key'], Auth.access['secret'])
from postgres import Postgres
from psycopg2.errors import UniqueViolation
db = Postgres(url='postgresql://postgres:postgres@localhost:5432/twatch')

In [3]:
from enum import Enum
class ChunkStatus(Enum):
    NOT_STARTED = 0
    INCOMPLETE = 1
    COMPLETE = 2

    
class ChunkManager():
    def __init__(self):
        self.chunks = []
        self.incomplete_chunks = []
        self.load_chunks()
        
        
    def load_chunks(self):
        with open('processed_chunks', 'r') as f:
            lines = f.readlines()
            for l in lines:
                if l[0] != '~':
                    self.chunks.append(int(l))
                else:
                    self.incomplete_chunks.append(int(l[1:]))

        
        
    def save_chunks(self):
        with open('processed_chunks', 'w') as chunk_file:
            for c in self.chunks:
                chunk_file.write("{}\n".format(c))
            for c in self.incomplete_chunks:
                chunk_file.write("~{}\n".format(c))
                
               
    def add_completed_chunk(self, chunk_id):
        if chunk_id not in self.chunks:
            self.chunks.append(chunk_id)
        if chunk_id in self.incomplete_chunks:
            self.incomplete_chunks.remove(chunk_id)
        self.save_chunks()
        
        
    def add_incomplete_chunk(self, chunk_id):
        if chunk_id in self.chunks:
            raise Exception("Chunck already completed")
        if chunk_id not in self.incomplete_chunks:
            self.incomplete_chunks.append(chunk_id)
        self.save_chunks()
                
            
    def check_chunk(self, chunk_id):
        if chunk_id in self.chunks:
            return ChunkStatus.COMPLETE
        elif chunk_id in self.incomplete_chunks:
            return ChunkStatus.INCOMPLETE
        else:
            return ChunkStatus.NOT_STARTED
        
                
    def print_chunks(self):
        print("Completed chunks: {}".format(str(self.chunks)))
        print("Incomplete chunks: {}".format(str(self.incomplete_chunks)))
              
              
    def __del__(self):
        self.save_chunks()
           

In [None]:
buffer = []
num_tweets_saved = 0
total = 0
chunksize = 10 ** 6
chunk_manager = ChunkManager()

df = pd.read_csv('data/full_dataset.tsv', sep="\t", iterator=True, chunksize=chunksize)

for chunk_id, in_df in enumerate(df):
    last_stop = 0
    if chunk_manager.check_chunk(chunk_id) == ChunkStatus.COMPLETE:
        print("Chunk #{} is completed.".format(chunk_id), end='\r', flush=True)
        continue
    if chunk_manager.check_chunk(chunk_id) == ChunkStatus.INCOMPLETE:
        print("Chunk #{} is partially completed.".format(chunk_id))
        chunk_test_distance = int(chunksize / LOOKUP_TWEETS_LIMIT)
        test_row_ids = [i * chunk_test_distance for i in range(LOOKUP_TWEETS_LIMIT)]
        test_results = [False for i in range(LOOKUP_TWEETS_LIMIT)]
        for i, test_row_id in enumerate(test_row_ids):
            row = in_df.iloc[test_row_id]
            test_tweet = db.one(''' SELECT * FROM tweets WHERE tweet_id = %s''', [str(row.tweet_id)])
            test_tweet_incr = db.one(''' SELECT * FROM tweets_incr WHERE tweet_id = %s''', [str(row.tweet_id)])
            if test_tweet is not None or test_tweet_incr is not None:
                test_results[i] = True
        last_stop_id = len(test_results) - 1
        while last_stop_id > 0:
            if test_results[last_stop_id]:
                break
            else:
                last_stop_id -= 1
        last_stop = test_row_ids[last_stop_id]
        print("Resuming from {}".format(last_stop))
                   
    try:
        print("Reading chunk {}.".format(chunk_id), end="\r", flush=True)
        in_df = in_df[in_df['lang']=='en']
        for row in in_df.itertuples():
            if row.Index < last_stop:
                continue
            if max_count > 0 and row.Index > max_count:
                break

#             test_retweet = db.one(''' SELECT * FROM retweets WHERE tweet_id = %s''', [str(row.tweet_id)])
#             test_retweet_incr = db.one(''' SELECT * FROM retweets_incr WHERE tweet_id = %s''', [str(row.tweet_id)])
#             if test_retweet is not None or test_retweet_incr is not None:
    #             print(test_retweet)
#                 continue

            test_tweet = db.one(''' SELECT * FROM tweets WHERE tweet_id = %s''', [str(row.tweet_id)])
            test_tweet_incr = db.one(''' SELECT * FROM tweets_incr WHERE tweet_id = %s''', [str(row.tweet_id)])
            if test_tweet is not None or test_tweet_incr is not None:
    #             print(test_tweet)
                continue


            buffer.append(row.tweet_id)

            if len(buffer) < LOOKUP_TWEETS_LIMIT:
                continue
            else:
                for tweet in twarc.hydrate(buffer):
                    try:
                        if tweet['retweet_count'] + tweet['favorite_count'] < 10:
                            continue
                        user = tweet['user']
                        unix = int(datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S %z %Y').timestamp())
                        if tweet.get('retweeted_status') is not None:
#                             db.run(''' INSERT INTO retweets_incr(root_tweet_id,tweet_id,unix,user_followers_count)
#                                        VALUES (%s,%s,%s,%s)''', (tweet['retweeted_status']['id_str'],tweet['id_str'],unix,user['followers_count']))
                            continue
                        else:
                            db.run(''' INSERT INTO tweets_incr(tweet_id,unix,user_id,num_retweets,num_favourites,
                                       reply_parent_id,text,location)
                                      VALUES (%s,%s,%s,%s,%s,%s,%s,%s)''', 
                                        (tweet["id_str"],unix,user["id_str"],tweet["retweet_count"],tweet["favorite_count"],
                                          tweet["in_reply_to_status_id"] if tweet["in_reply_to_status_id"] is not None else -1,
                                          tweet["full_text"],str(tweet["coordinates"])
                                    ))
                        try:
                            db.run(''' INSERT INTO users(user_id,user_name,user_screen_name,user_location,user_followers_count,user_friends_count,user_is_verified,user_profile_image_url)
                                        VALUES (%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (user_id) DO UPDATE
                                        SET user_name = excluded.user_name,
                                            user_screen_name = excluded.user_screen_name,
                                            user_followers_count = excluded.user_followers_count,
                                            user_friends_count = excluded.user_friends_count,
                                            user_is_verified = excluded.user_is_verified,
                                            user_profile_image_url = excluded.user_profile_image_url;
                                    ''', (user['id_str'],user['name'],user['screen_name'],user['location'],user['followers_count'],user['friends_count'],user['verified'],user['profile_image_url_https']))
                        except UniqueViolation:
                            pass
                        except Exception as e:
                            raise e    
                        finally:              
                            num_tweets_saved += 1
                    except UniqueViolation:
                        pass
                    except Exception as e:
                        print("Encountered error at line {}".format(row.Index))
                        error_log.write("====={}=====\n".format(get_time_now()))
                        error_log.write("Encountered error when processing line {}\n".format(row.Index))
                        error_log.write("Number of tweets saved: {}\n".format(num_tweets_saved))
                        error_log.write(str(e) + "\n")
                        error_log.write("\n=============================\n")
                        error_log.flush()
                    finally:
                        runtime_log.write("===== {} =====\n".format(get_time_now()))
                        runtime_log.write("Stored tweets until line {} to DB, {} tweets saved so far.\n".format(row.Index, num_tweets_saved))
                        runtime_log.write("=============================\n")
                        runtime_log.flush()
                        print("Chunk #{}: {}/{} tweets processed, {} new tweets saved".format(chunk_id ,row.Index % chunksize, chunksize, num_tweets_saved), end="\r", flush=True)
                        buffer.clear()
    except:
        print("Chunk {} interrupted.".format(chunk_id))
        chunk_manager.add_incomplete_chunk(chunk_id)
        raise
    chunk_manager.add_completed_chunk(chunk_id)


Chunk #3: 426419/1000000 tweets processed, 39268 new tweets saved



Chunk #6: 803143/1000000 tweets processed, 79583 new tweets saved

ERROR:twarc:caught connection error HTTPSConnectionPool(host='api.twitter.com', port=443): Max retries exceeded with url: /1.1/statuses/lookup.json (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f965c271ca0>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution')) on 1 try
ERROR:twarc:caught connection error HTTPSConnectionPool(host='api.twitter.com', port=443): Max retries exceeded with url: /1.1/statuses/lookup.json (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f965c271670>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution')) on 2 try
ERROR:twarc:caught connection error HTTPSConnectionPool(host='api.twitter.com', port=443): Max retries exceeded with url: /1.1/statuses/lookup.json (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f965c14aac0>: Failed to establish a new connection: [Errno -3] Temporary failure in name res