In [1]:
import pandas as pd
import orjson as json 
import gzip
import os
from datetime import datetime

In [5]:
input_path = '/data_volume/streaming-data-UTC/'
output_path = '/data_volume/pub/antivax/raw_tweets_parquets/' 

In [6]:
start_date = '2021-02-01'
end_date = '2021-11-01'
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')  

In [7]:
#get list of files to process. exclude files already processed. 
dates = [file[-18:-8] for file in os.listdir(input_path) if '.json.gz' in file]
skip_dates = set([file[-18:-8] for file in os.listdir(output_path) if '.parquet' in file])
dates = [date for date in dates if date not in skip_dates]
files = ['streaming_data--'+date+'.json.gz' for date in dates]

#filer and sort by date
files = [file for file in files if start_date <= datetime.strptime(file[16:26], '%Y-%m-%d') <= end_date] 
files.sort()
files

['streaming_data--2021-02-01.json.gz',
 'streaming_data--2021-02-02.json.gz',
 'streaming_data--2021-02-03.json.gz',
 'streaming_data--2021-02-04.json.gz',
 'streaming_data--2021-02-05.json.gz',
 'streaming_data--2021-02-06.json.gz',
 'streaming_data--2021-02-07.json.gz',
 'streaming_data--2021-02-08.json.gz',
 'streaming_data--2021-02-09.json.gz',
 'streaming_data--2021-02-10.json.gz',
 'streaming_data--2021-02-11.json.gz',
 'streaming_data--2021-02-12.json.gz',
 'streaming_data--2021-02-13.json.gz',
 'streaming_data--2021-02-14.json.gz',
 'streaming_data--2021-02-15.json.gz',
 'streaming_data--2021-02-16.json.gz',
 'streaming_data--2021-02-17.json.gz',
 'streaming_data--2021-02-18.json.gz',
 'streaming_data--2021-02-19.json.gz',
 'streaming_data--2021-02-20.json.gz',
 'streaming_data--2021-02-21.json.gz',
 'streaming_data--2021-02-22.json.gz',
 'streaming_data--2021-02-23.json.gz',
 'streaming_data--2021-02-24.json.gz',
 'streaming_data--2021-02-25.json.gz',
 'streaming_data--2021-02

In [8]:
def extract_full_text(tweet):
    if 'retweeted_status' in tweet and 'extended_tweet' in tweet['retweeted_status']:
        text = tweet['retweeted_status']['extended_tweet']['full_text']
    elif 'retweeted_status' in tweet:
        text = tweet['retweeted_status']['text']
    elif 'extended_tweet' in tweet:
        text = tweet['extended_tweet']['full_text']
    else:
        text = tweet['text']
    return text

tweets_cols = ['tweet_id', 'text', 'tweet_time', 'user_id', 'lang', 'retweeted_tweet_id']
def get_tweet_tuple(tweet): 
    text = extract_full_text(tweet)
    retweeted_tweet_id = int(tweet['retweeted_status']['id']) if 'retweeted_status' in tweet else -1
    tweet_tuple = (tweet['id'], 
             text, 
             tweet['created_at'],
             tweet['user']['id'],
             tweet['lang'],
             retweeted_tweet_id)
    return tweet_tuple

In [9]:
%%time

#extract data from json
for i, file in enumerate(files):
    with gzip.open(input_path+file, 'rb') as zipfile: #open raw data file
        tweets = list()
        for j, line in enumerate(zipfile):
            tweet = json.loads(line) #read one tweet object from json
            if 'limit' in tweet: continue #skips rate limit error objects
            tweets.append(get_tweet_tuple(tweet))
    tweets = pd.DataFrame(tweets, columns = tweets_cols)
    tweets['tweet_time'] = pd.to_datetime(tweets['tweet_time'], format = '%a %b %d %H:%M:%S %z %Y'
                                         ).dt.tz_localize(None)
    tweets.to_parquet(output_path+'tweets'+file[-18:-8]+'.parquet')

CPU times: user 3h 32min 45s, sys: 7min 21s, total: 3h 40min 6s
Wall time: 3h 40min 10s
