In [None]:
import datetime
import itertools
import json
from pathlib import Path
from typing import Dict, List, Optional, Union

from matplotlib import pyplot as plt
import numpy as np
from tqdm import tqdm

In [None]:
%matplotlib inline

In [None]:
# 定数
ENCODE = 'utf-8'
THRESHOLD_DT_DEFAULT = datetime.datetime(1901, 1, 1)
Y_LABELS = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']

In [None]:
def load_js_file(path: Path) -> Union[Dict, List]:
    with path.open('r', encoding=ENCODE) as fr:
        raw_data = fr.readlines()

    if raw_data[0][-2] in ('[', '{'):
        raw_data[0] = raw_data[0][-2]
    else:
        raise NotImplementedError('想定外の形式です。jsファイルの形式を確認してください')
    
    raw_data = ''.join(raw_data)
    py_data = json.loads(raw_data)
    # 整形する
    py_data = [tweet['tweet'] for tweet in py_data]
    return py_data


In [None]:
def aggregate_tweet(
    tweet_list: List[Dict], /, threshold_dt: Optional[datetime.datetime] = None,
    interval_minutes: int = 60,
) -> List[List[int]]:
    """時刻単位ごとのツイート数をカウントする

    週は月曜はじまり, 時刻単位は引数で指定

    Args:
        tweet_list: ツイート情報の辞書リスト
        interval_minutes: 集計する時間単位[分]
    
    Returns:
        List[List[int]]: data[x][y]: x曜日y時間帯のツイート数
    """
    if threshold_dt is None:
        threshold_dt = THRESHOLD_DT_DEFAULT
    dt_now = datetime.datetime.now()
    if (24 * 60) % interval_minutes:
        raise ValueError('interval_minutes には, 1440を割り切ることのできる値を指定してください')
    if not (0 < interval_minutes <= 60) or not isinstance(interval_minutes, int):
        raise ValueError('interval_minutes には1から60までの整数を指定してください')
    count_l = [[0 for _ in range(24 * 60 // interval_minutes)] for _ in range(7)]
    for tweet in tqdm(tweet_list):
        # "created_at" : "Wed May 01 10:52:48 +0000 2019"
        created_at_str = tweet['created_at']
        created_at_dt = datetime.datetime.strptime(
            created_at_str,
            '%a %b %d %H:%M:%S +0000 %Y'
        )
        # UTCになっているので, +9時間
        created_at_dt += datetime.timedelta(hours=9)
        if created_at_dt < threshold_dt:
            # 指定日時より前のツイートは集計しない
            continue
        # 曜日
        weekday_val = created_at_dt.weekday()
        # 時間帯
        hour_val = (created_at_dt.hour * 60 + created_at_dt.minute) // interval_minutes
        count_l[weekday_val][hour_val] += 1
    
    return count_l


In [None]:
def generate_graph(tweet_count_l: List[List[int]]) -> None:
    """グラフを描画する
    
    Args:
        tweet_count_l: aggregate_tweet 関数の返り値
    
    Returns:
        Image: 画像データ
    """
    assert isinstance(tweet_count_l, list) and isinstance(tweet_count_l[0], list)
    data_count = (len(tweet_count_l), len(tweet_count_l[0]))

    fig, ax = plt.subplots()
    heatmap = ax.pcolor(tweet_count_l, cmap='Reds')

    ax.tick_params(length=0)
    # xlabel: 時間帯
    ax.set_xlabel('Hour')
    ax.xaxis.tick_top()
    ax.xaxis.set_label_position('top')
    ax.set_xticks(np.arange(data_count[1]) + 1, minor=False)
    x_deg = data_count[1] // 24
    x_labels = [x // x_deg if x % x_deg == 0 else '' for x in range(data_count[1])]
    ax.set_xticklabels(x_labels, minor=False)
    # ylabel: 曜日
    ax.set_ylabel('Weekday')
    ax.invert_yaxis()
    ax.set_yticks(np.arange(data_count[0]) + 0.5, minor=False)
    ax.set_yticklabels(Y_LABELS, minor=False)
    # 完成
    plt.show()

    return heatmap

target_path (`Path`): `Path()` の引数に、解析する `tweets.js` の絶対パスを指定する

threshold_dt (`datetime.datetime`): 指定した日時以降のツイートのみを集計する

interval_minutes (`int`): 集計するツイート時刻の間隔(単位: 分)

In [None]:
# main
target_path = Path('/path/to/tweets.js')
threshold_dt = datetime.datetime(1901, 1, 1)
interval_minutes = 20


def main():
    tweet_data = load_js_file(target_path)
    aggred_data = aggregate_tweet(tweet_data, threshold_dt, interval_minutes)
    generate_graph(aggred_data)
    return

main()