In [1]:
import bz2, os, sys, glob
import json, csv, re, datetime
import pickle
from joblib import Parallel, delayed
from tqdm import tqdm

### Dump into daily tweets data(pickle): vid -> tweetCounts

In [2]:
def bz2_csv_rows(fp):
    with bz2.open(fp, mode='rt') as bzfp:
        for line in tqdm(bzfp, desc="bz2 csv file"):
            sp = line.split(',')
            yield sp
            
# time: 3945000it [02:47, 23483.87it/s]
# Size: 17.4MB
def read_write_file(infile, outfile):
    """
    {
        vid_1: tweetCounts,
        vid_2: tweetCounts,
        ...
    }
    """
    map_vid_tweetCounts = {}
    all_vids = []    
    all_dates = []

    counts = 0    
    for row in bz2_csv_rows(infile):
        counts += 1
        # TODO: test only
#         if int(counts / 100000) == 1:
#             break
            
        # ignore rate message rows
        if len(row) < 10:
            continue
        
        # get the date (yyyy-mm-dd)
        date = row[1].strip()

        vids = []
        # single vid is of length 11
        original_vids = row[7].strip()
        if original_vids != 'N':
            vids.extend(original_vids.split(";"))
        retweeted_vids = row[8].strip()
        if retweeted_vids != 'N':
            vids.extend(retweeted_vids.split(";"))
        quoted_vids = row[9].strip()
        if quoted_vids != 'N':
            vids.extend(quoted_vids.split(";"))
        
        for vid in vids:
            if vid not in map_vid_tweetCounts:
                map_vid_tweetCounts[vid] = 0
            map_vid_tweetCounts[vid] += 1
        
        all_vids.append(vids)
        all_dates.append(date)
    
    if outfile is not None:
        pickle.dump(map_vid_tweetCounts, open(outfile, "wb"))

In [3]:
def dirtodir_read_write(indir, outdir, date_range):
    ap = {
        "indir": indir,
        "outdir": outdir,        
    }

    date_start = datetime.datetime.strptime(date_range[0], "%Y-%m-%d")
    date_end   = datetime.datetime.strptime(date_range[1], "%Y-%m-%d")
    
    res = []
    for infile in glob.glob(indir + "/*.bz2"):
        date_str = os.path.split(infile)[1].split(".")[0]
        date = datetime.datetime.strptime(date_str, "%Y-%m-%d")
        if date >= date_start and date <= date_end:
            outfile = os.path.join(outdir, date_str + ".pik")
            res.append((infile, outfile))

    Parallel(n_jobs=5)(delayed(read_write_file)(x[0], x[1]) for x in res[:10])

In [4]:
tweets_proc_dir = "/data4/u5941758/yt_tweets_2015_2019/tweet_stats"
output_dir      = "/localdata/u6314203/daily_tweets"

# example_in_path  = os.path.join(tweets_proc_dir, "2016-07-01.bz2")
# example_out_path = os.path.join(output_dir, "2016-06-30.pik")
# read_write_file(example_in_path, example_out_path)

In [5]:
t1 = datetime.datetime.now()
dirtodir_read_write(tweets_proc_dir, output_dir, ("2016-07-02", "2016-07-11"))
t2 = datetime.datetime.now()
print("Total time:{} seconds".format((t2-t1).total_seconds()))

Total time:203.97799 seconds


### Read dumped daily data and integrate to get final output
#### Final output
```
# category.pik/json
{
    vid_1: {
        day_zero: 2016-06-30,
        days: [],
        tweets: []
    },
    vid_2: {
        day_zero: 2016-06-30,
        days: [],
        tweets: []
    }...
}
```

In [6]:
def read_vids(inpath):
    res = dict()
    with open(inpath, 'r', encoding="utf-8") as file:
        reader = csv.reader(file)
        for row in reader:
            if len(row)==0: continue
            category = row[0]
            vids = row[1:]
            res[category] = set(vids)
    return res

In [7]:
def find_category(vid, map_category_vids):
    for cat, vid_set in map_category_vids.items():
        if vid in vid_set:
            return cat
    return None

def build_dataset(indir, vids_path):
    dataset = dict()
    
    # engage vids
    map_category_vids = read_vids(vids_path)
    # whether videos in engage dataset
    num_not_in_engage = 0
    total = 0
    
    # files sorted by date ascending
    daily_tweets_files = glob.glob(indir + "/*.pik")
    daily_tweets_files.sort(key=lambda x:datetime.datetime.strptime(os.path.split(x)[1].split(".")[0], "%Y-%m-%d"))
#     print("Read daily tweets from:\n{}".format(daily_tweets_files))
    
    for infile in tqdm(daily_tweets_files, desc="daily tweets files"):
        date_str = os.path.split(infile)[1].split(".")[0]
        date = datetime.datetime.strptime(date_str, "%Y-%m-%d")
        date_aligned = date - datetime.timedelta(days=1)
        
        daily_tweets = pickle.load(open(infile, "rb"))
        for vid, tweetCounts in daily_tweets.items():
            total += 1
            cat = find_category(vid, map_category_vids) 
            if cat is None: 
                num_not_in_engage += 1
                continue                
            if cat not in dataset:
                dataset[cat] = dict()
                
            if vid not in dataset[cat]:
                dataset[cat][vid] = {
                    "day_zero": date_aligned.strftime("%Y-%m-%d"),
                    "days": [0],
                    "tweets": [tweetCounts]
                }
            else:
                offset_day = (date_aligned - datetime.datetime.strptime(dataset[cat][vid]["day_zero"], "%Y-%m-%d")).days
                dataset[cat][vid]["days"].append(offset_day)
                dataset[cat][vid]["tweets"].append(tweetCounts)
    print("Num of videos not in filtered engagement dataset:{}/{}".format(num_not_in_engage, total))
    return dataset

In [8]:
engage_vids_path = "../data/engage16/filtered/vids_filter(all).csv"
daily_tweets_dir = output_dir # "/localdata/u6314203/daily_tweets"

In [9]:
x = build_dataset(daily_tweets_dir, engage_vids_path)

daily tweets files: 100%|██████████| 10/10 [00:29<00:00,  2.97s/it]

Num of videos not in filtered engagement dataset:10495262/10949243





In [16]:
def write_dataset(outdir, dataset):
    for category, data in tqdm(dataset.items(), desc="Write dataset"):       
        outfile = os.path.join(outdir, "{}.json".format(category))
        with open(outfile, "w") as f:
            json.dump(data, f)

In [17]:
dataset_out_dir = "/localdata/u6314203/dataset_tweets"

In [18]:
write_dataset(dataset_out_dir, x)

Write dataset: 100%|██████████| 13/13 [00:02<00:00,  5.12it/s]


In [19]:
x["comedy"]

{'O55H4fG1WCg': {'day_zero': '2016-07-01',
  'days': [0, 1, 2],
  'tweets': [47, 9, 2]},
 'e9Z_8qT1mUs': {'day_zero': '2016-07-01', 'days': [0], 'tweets': [1]},
 'NpjBsjecA9g': {'day_zero': '2016-07-01', 'days': [0], 'tweets': [1]},
 'WA06tstaL_E': {'day_zero': '2016-07-01',
  'days': [0, 1, 2, 3, 4, 5, 6, 7, 8],
  'tweets': [97, 28, 12, 10, 4, 5, 1, 1, 1]},
 '5chQq80OdZQ': {'day_zero': '2016-07-01',
  'days': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  'tweets': [420, 132, 51, 28, 19, 16, 4, 10, 7, 5]},
 'Xry_axsKT40': {'day_zero': '2016-07-01',
  'days': [0, 1, 2, 3, 4, 5, 6, 7],
  'tweets': [129, 12, 6, 6, 1, 1, 6, 2]},
 'PxD_KEuDCHo': {'day_zero': '2016-07-01', 'days': [0], 'tweets': [1]},
 'bcmbKN1aIOU': {'day_zero': '2016-07-01',
  'days': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  'tweets': [1807, 1055, 523, 262, 127, 98, 72, 33, 36, 39]},
 '_8p1XF30-uA': {'day_zero': '2016-07-01',
  'days': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  'tweets': [84, 25, 11, 7, 7, 2, 3, 1, 2, 1]},
 'jU_hcRK3m0g': {'day_zero'