### 基本的にShift+Enterで順番に実行していく
* 感情分析をスキップしてチャット内容の閲覧とダウンロードをする場合は該当箇所を飛ばして実行
#### 感情分析を実行する場合はGPUを有効にしてセッションを再起動しておく
* 編集＞ノートブックの設定＞T4 GPU等にチェック

In [None]:
# 完了後にセッションの再起動を求められた場合は再起動する
!pip install dash yt_dlp datasets fugashi ipadic unidic-lite

### 関数の定義等(必ず実行する)

In [None]:
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

#### ↑ cudaと表示されていればGPUが有効となっている

In [None]:
import base64
import io
import json
import time
import csv
import requests
from tqdm import tqdm
import pandas as pd
from charset_normalizer import detect
from dash import Dash, callback, dcc, html, Input, Output, State, dash_table
import plotly.graph_objects as go
from urllib.parse import urlparse
from yt_dlp import YoutubeDL
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from torch.utils.data import DataLoader
from datasets import Dataset

In [None]:
# 関数の定義
def exec_download(url, path='/content'):
    parsed_url = urlparse(url)
    df = None
    metadata = None
    if 'youtube' in parsed_url.netloc:
        df, metadata = download_youtube_chats(url, path)
        print('ダウンロードが完了しました！')
    elif 'twitch' in parsed_url.netloc:
        video_id = parsed_url.path.split('/')[-1]
        if video_id == '':
            print('アーカイブページのURL:https://www.twitch.tv/videos/xxxxxxxxxx を入力してください。')
            return df, metadata
        print('チャットのダウンロード中...')
        df, metadata = download_twitch_chats(video_id)
        print('ダウンロードが完了しました！')
    else:
        print('youtubeかtwitchのurlを入力してください。')
    return df, metadata


def json_to_df(path):
    chats = []
    timestamps = []
    minutes = []

    with open(path, 'r', encoding='utf-8') as file:
        for line in file:
            if line.strip() == '':
                continue
            l_json = json.loads(line)
            j_action = l_json['replayChatItemAction']['actions'][0]
            if 'addChatItemAction' in j_action:
                item = j_action['addChatItemAction']['item']
                if 'liveChatTextMessageRenderer' in item:
                    message_runs = item['liveChatTextMessageRenderer']['message']['runs']
                    if message_runs and 'text' in message_runs[0]:
                        chat = message_runs[0]['text']
                        timestamp = int(l_json['replayChatItemAction']['videoOffsetTimeMsec']) // 1000
                        chats.append(chat)
                        timestamps.append(timestamp)
                        minutes.append(int(timestamp // 60))

    return pd.DataFrame({'chat': chats, 'second': timestamps, 'minute': minutes})


def get_json_data(video_id, cursor):
    loop_data = json.dumps([
        {
            "operationName": "VideoCommentsByOffsetOrCursor",
            "variables": {
                "videoID": video_id,
                "cursor": cursor
            },
            "extensions": {
                "persistedQuery": {
                    "version": 1,
                    "sha256Hash": "b70a3591ff0f4e0313d126c6a1502d79a1c02baebb288227c582044aa76adf6a"
                }
            }
        }
    ])
    return loop_data


def download_youtube_chats(url, path):
    res = exec_youtube_dl(url, path)
    title = res['title']
    video_id = res['id']
    timestamp = pd.to_datetime(res['timestamp'], unit='s', utc=True)

    json_path = f"{path}/{video_id}.live_chat.json"
    df = json_to_df(json_path)

    metadata = {
        'タイトル': title,
        '放送日時': timestamp.tz_convert('Asia/Tokyo').strftime("%Y/%m/%d/%H:%M"),
        'url': url,
    }

    return df, metadata


def exec_youtube_dl(url, path):
    output_path = f'{path}/%(id)s'

    with YoutubeDL({
        'format': 'best',
        'outtmpl': output_path,
        'writesubtitles': True,
        'skip_download': True
    }) as ydl:
        res = ydl.extract_info(url, download=False)
        ydl.download([url])

    return res


def download_twitch_chats(video_id):
    api_url = 'https://gql.twitch.tv/gql'
    first_data = json.dumps([
        {
            "operationName": "VideoCommentsByOffsetOrCursor",
            "variables": {
                "videoID": video_id,
                "contentOffsetSeconds": 0
            },
            "extensions": {
                "persistedQuery": {
                    "version": 1,
                    "sha256Hash": "b70a3591ff0f4e0313d126c6a1502d79a1c02baebb288227c582044aa76adf6a"
                }
            }
        }
    ])

    # 1回目のセッションスタート
    session = requests.Session()
    session.headers = {'Client-ID': 'kd1unb4b3q4t58fwlpcbzcbnm76a8fp', 'content-type': 'application/json'}

    response = session.post(
        api_url,
        first_data,
        timeout=10
    )

    response.raise_for_status()
    data = response.json()

    chats = []
    seconds = []
    minutes = []
    for comment in data[0]['data']['video']['comments']['edges']:
        chats.append(comment['node']['message']['fragments'][0]['text'])
        timestamp = int(comment['node']['contentOffsetSeconds'])
        seconds.append(timestamp)
        minutes.append(timestamp // 60)

    cursor = None
    if data[0]['data']['video']['comments']['pageInfo']['hasNextPage']:
        cursor = data[0]['data']['video']['comments']['edges'][-1]['cursor']
        time.sleep(0.1)

    # session loop
    while cursor:
        response = session.post(
            api_url,
            get_json_data(video_id, cursor),
            timeout=10
        )
        response.raise_for_status()
        data = response.json()

        for comment in data[0]['data']['video']['comments']['edges']:
            chats.append(comment['node']['message']['fragments'][0]['text'])
            timestamp = int(comment['node']['contentOffsetSeconds'])
            seconds.append(timestamp)
            minutes.append(timestamp // 60)

        if data[0]['data']['video']['comments']['pageInfo']['hasNextPage']:
            cursor = data[0]['data']['video']['comments']['edges'][-1]['cursor']
            time.sleep(0.1)
        else:
            cursor = None

    metadata = {'url': f"https://www.twitch.tv/videos/{video_id}"}
    df = pd.DataFrame({'chat': chats, 'second': seconds, 'minute': minutes})
    return df, metadata


def classify_emotions(model, tokenizer, texts, batch_size, tokens_max_len, device):
    model.to(device)
    dataset = Dataset.from_dict({'text': texts})
    dataset = dataset.map(
        lambda x: tokenizer(x['text'], truncation=True, padding='max_length', max_length=tokens_max_len),
        batched=True)
    dataset.set_format(type='torch', columns=['input_ids', 'attention_mask'])
    dataloader = DataLoader(dataset, batch_size=batch_size)

    results = []
    model.eval()
    with torch.no_grad():
        for batch in tqdm(dataloader, desc='感情分析の進捗', unit='batch'):
            batch = {k: v.to(device) for k, v in batch.items()}
            outputs = model(**batch)
            predictions = torch.argmax(outputs.logits, dim=-1)
            results.extend([model.config.id2label[pred.item()] for pred in predictions])
    return results

# Dash用の変数と関数を定義
EMOTION_COLORS = {
    '喜び': '#FFD700',  # ゴールド（黄金色）
    '期待': '#FFA07A',  # ライトサーモン
    '信頼': '#90EE90',  # ライトグリーン
    '驚き': '#00CED1',  # ダークターコイズ
    '悲しみ': '#6495ED',  # コーンフラワーブルー
    '恐れ': '#4682B4',  # スティールブルー
    '嫌悪': '#8B008B',  # ダークマゼンタ
    '怒り': '#CD5C5C',  # インディアンレッド
    '中立': 'grey',
    '未分類': 'grey'
}

def create_table(df):
    return html.Div([
            dash_table.DataTable(
                data=df.to_dict('records'),
                columns=[{'name': i, 'id': i} for i in df.columns],
                page_size=10
            ),
            html.Hr()
        ])

def parse_contents(contents):
    content_type, content_string = contents.split(',')
    decoded = base64.b64decode(content_string)

    try:
        detected = detect(decoded)
        encoding = detected['encoding']

        # ファイルの内容を文字列として読み込む
        file_content = io.StringIO(decoded.decode(encoding))

        # 最初の行を読み取り、メタデータかどうかを確認
        first_line = file_content.readline().strip()
        metadata = {}
        if first_line.startswith('# attrs:'):
            metadata = json.loads(first_line[8:])
            df = pd.read_csv(file_content)
        else:
            file_content.seek(0)  # ファイルポインタを先頭に戻す
            df = pd.read_csv(file_content)
        table = create_table(df)
        return table, df, metadata
    except Exception as e:
        print(e)
        return html.Div([
            'ファイルの処理中にエラーが発生しました。'
        ]), None, {}

def create_plot(df, bin_width):
    if df is None:
        return None
    # ビンの設定
    max_minutes = df['minute'].max()

    # ヒストグラムの作成
    fig = go.Figure()
    if 'emotion' not in df.columns:
        df['emotion'] = '未分類'
    for emotion in reversed(EMOTION_COLORS.keys()):
        emotion_data = df[df['emotion'] == emotion]
        fig.add_trace(go.Histogram(
            x=emotion_data['minute'],
            name=emotion,
            marker_color=EMOTION_COLORS.get(emotion, 'grey'),
            xbins=dict(start=0, end=max_minutes, size=bin_width),
            autobinx=False
        ))

    fig.update_layout(
        barmode='stack',
        title=None,
        xaxis_title='時間 (分)',
        yaxis_title='コメント数',
        margin=dict(
            l=50, r=50, t=30, b=50
        ),
        legend=dict(
            font=dict(
                size=16
            ),
            itemclick='toggleothers',
            itemdoubleclick='toggle'
        ),
        xaxis=dict(
            rangeslider=dict(
                visible=True
            ),
            rangemode='nonnegative',
        ),
        yaxis=dict(
            rangemode='nonnegative',
        ),
        modebar=dict(
            remove=['select', 'lasso']
        )
    )

    if bin_width == 1:
        fig.update_traces(hovertemplate='%{x} - %{x}59秒<br>%{y}')
        fig.update_xaxes(dtick=5, ticksuffix='分')
    else:
        tick_vals = list(range(0, int(max_minutes) + bin_width, bin_width))
        tick_text = [f'{i}分' for i in tick_vals]
        fig.update_xaxes(tickvals=tick_vals, ticktext=tick_text, ticksuffix='分59秒')

    return fig

app = Dash(__name__)

app.layout = html.Div([
    dcc.Upload(
        id='upload-data',
        children=html.Div([
            'ドラッグ&ドロップまたは ',
            html.A('ファイルを選択')
        ]),
        style={
            'width': '100%',
            'height': '60px',
            'lineHeight': '60px',
            'borderWidth': '1px',
            'borderStyle': 'dashed',
            'borderRadius': '5px',
            'textAlign': 'center',
            'margin': '10px'
        },
        multiple=False
    ),
    html.Div(id='output-data-upload'),
    html.Div([
        html.Div(id='metadata-output', style={'display': 'inline-block', 'verticalAlign': 'top', 'width': '80%'}),
        html.Div([
            html.Button('Download CSV', id='btn-download-csv', 
                        style={'fontSize': '0.8em', 'padding': '5px 10px'})
        ], style={'display': 'inline-block', 'verticalAlign': 'top', 'width': '20%', 'textAlign': 'right'})
    ], style={'marginBottom': '20px'}),
    dcc.Graph(id='histogram'),
    html.Div([
        html.Label('コメント数を集計する時間の長さ:'),
        dcc.Slider(
            id='bin-slider',
            min=1,
            max=10,
            step=1,
            value=1,
            marks={i: f'{i}分' for i in range(1, 11)},
        )
    ]),
    dcc.Download(id='download-dataframe-csv'),
])

@callback(
    [Output('output-data-upload', 'children'),
     Output('metadata-output', 'children'),
     Output('histogram', 'figure')],
    [Input('upload-data', 'contents'),
     Input('bin-slider', 'value')]
)
def update_output(contents, bin_width_minutes):
    global global_df
    global global_metadata
    if contents is None:
        df = global_df
        if df is None:
            return None, None, {}
        children = create_table(df)
        metadata = global_metadata
        metadata_output = format_metadata(metadata) if metadata else None
        fig = create_plot(df, bin_width_minutes)
        return children, metadata_output, fig

    children, df, metadata = parse_contents(contents)

    metadata_output = format_metadata(metadata) if metadata else None

    fig = create_plot(df, bin_width_minutes)

    global_df = df
    global_metadata = metadata

    return children, metadata_output, fig

def format_metadata(metadata):
    formatted_metadata = []
    for key, value in metadata.items():
        if isinstance(value, str) and value.startswith('http'):
            formatted_value = html.A(value, href=value, target='_blank')
        else:
            formatted_value = str(value)
        formatted_metadata.append(html.Div([
            html.Strong(f"{key}: "),
            formatted_value
        ]))
    return html.Div(formatted_metadata)

@callback(
    Output('download-dataframe-csv', 'data'),
    Input('btn-download-csv', 'n_clicks'),
    prevent_initial_call=True,
)
def download_csv(n_clicks):
    global global_df
    global global_metadata
    if global_df is not None:
        buffer = io.StringIO()
        if len(global_metadata) != 0:
            buffer.write(f"# attrs: {json.dumps(global_metadata, ensure_ascii=False)}\n")
        global_df.to_csv(buffer, index=False, quoting=csv.QUOTE_ALL, escapechar='\\', quotechar='"', encoding='utf-8')
        
        return dcc.send_string(
            buffer.getvalue(),
            'chat_data.csv',
            'text/csv',
        )
    return None

### URLを入力してチャットをダウンロード
* 次のセルをShift+Enterで実行するとURLを聞かれるので、URLを入力してEnter(URL入力中はShift+Enterだとエラーが出ます)

In [None]:
url = input('youtubeかtwitchのURLを入力してください。')
global_df, global_metadata = exec_download(url)

### 感情分析の実行(スキップしても時間帯毎のチャット数の確認やローカルPCへの保存は可能)

In [None]:
batch_size = 16
tokens_max_len = 64
model = AutoModelForSequenceClassification.from_pretrained('iton/YTLive-JaBERT-Emotion-v1')
tokenizer = AutoTokenizer.from_pretrained('tohoku-nlp/bert-base-japanese-v3', clean_up_tokenization_spaces=True)
global_df['emotion'] = classify_emotions(model, tokenizer, global_df['chat'].tolist(), batch_size, tokens_max_len, device)

### アプリの実行(グラフを描画、ローカルPCに保存)
* 右側のDownload_csvでローカルPCに分析結果をダウンロード
* ドラッグ＆ドロップで過去に保存したチャットのグラフを表示
* グラフ内の右上のカメラマークでグラフを画像として保存
* グラフ右側の「喜び」などをクリックすると単体表示
    * クリックやダブルクリックで切り替え
* グラフ下部のスライダーを調整して何分間のチャットをまとめて集計するか変更

In [None]:
if __name__ == '__main__':
    app.run(jupyter_mode='inline')