# PLRデータ分析例

PLRに保存したデータの取得例です。

本環境は [Jupyter Notebook](https://jupyter.org/) の形で構成されており、[Pythonスクリプト](https://www.python.jp/)を用いて記述しています。

ライブラリは公式イメージ [jupyter/scipy-notebook](https://jupyter-docker-stacks.readthedocs.io/en/latest/using/selecting.html#jupyter-scipy-notebook) のものがインストールされています。

**このサンプルは古いplr-notebook形式に基づいて作成されています。00_PLR初期設定に記載した、`plrget`コマンドを利用してください。**

# チャンネルの取得

処理したいデータが格納されたチャンネルを取得します。cogtask.meにより格納されたデータを処理したい場合は、 `cog-pds-log` となります。


> Jupyter Notebook Serverの起動直後など、実行がなかなか終わらない状態になる可能性があります。
> 実行状態が `[*]` のまま先に進まない場合は 中断(■) を押して一旦中断し、再度実行してみてください。

In [None]:
import asyncio
from plrfs.rpc_client import PLRFSClient

# クライアントライブラリを初期化
loop = asyncio.get_event_loop()

client = await PLRFSClient(loop).connect()

# チャンネル一覧を取得
channels = await client.get_files([])
for ch in channels:
    print('Channel', repr(ch['name']))

参照したいチャンネル名を以下に記述します。

In [None]:
channel_name = 'cog-pds-log'
channel_name

In [None]:
channels = await client.get_files([])
cogtask_me_chs = [c for c in channels if c['name'] == channel_name]
assert len(cogtask_me_chs) > 0, 'cogtask.meのチャンネルが見つかりません。'
cogtask_me_ch = cogtask_me_chs[0]
cogtask_me_ch

データの読み込みは `PLRFSClient`から実施できます。これは以下のような関数を持っています。

In [None]:
help(client)

ファイルシステムを模した構造になっていて、 `client.get_files` で指定したチャンネル、アイテムの配下のアイテム一覧を取得することができます。また、`client.get_file` で指定したチャンネル、アイテムの配下のデータを取得することができます。


# 動画視聴データの取得

試しに、チャンネルから動画視聴データを取り出してみましょう。

In [None]:
# https://cogtask.me/t/Z1WcnPbeOCmMtYIhXyOG ならば、 Z1WcnPbeOCmMtYIhXyOG
# 第1回
task_id = 'Z1WcnPbeOCmMtYIhXyOG'
# 第2回
#task_id = 'ntisRb7k6uIGASaTcsUw'
# 第3回
#task_id = 'FYEQso54EmZ3L3mNQkyy'
task_id

まず、分析対象としたいタスクのIDに合致するログデータを抽出してみます。

In [None]:
import json
import pandas as pd

timeline_item_data = []
# チャンネル内のアイテムを取得する
timeline_items = await client.get_files([cogtask_me_ch['id']])

for item in timeline_items:
    print('Processing...', item['id'])
    # アイテムのプロパティを取得する
    timeline_properties = await client.get_files([cogtask_me_ch['id'], item['id']])
    if 'cogPDSJSON' not in [p['name'] for p in timeline_properties]:
        continue
    
    # プロパティごとのデータを取得する
    cogPDSJSON = await client.get_file([cogtask_me_ch['id'], item['id'], [p['id'] for p in timeline_properties if p['name'] == 'cogPDSJSON'][0]])
    cogPDSJSONDict = json.loads(cogPDSJSON['content'])
    if 'meta' in cogPDSJSONDict and 'task' in cogPDSJSONDict['meta'] and cogPDSJSONDict['meta']['task']['id'] == task_id:
        assert 'cnt' in [p['name'] for p in timeline_properties]
        assert 'begin' in [p['name'] for p in timeline_properties]
        summary = await client.get_file([cogtask_me_ch['id'], item['id'], [p['id'] for p in timeline_properties if p['name'] == 'cnt'][0]])
        begin = await client.get_file([cogtask_me_ch['id'], item['id'], [p['id'] for p in timeline_properties if p['name'] == 'begin'][0]])
        if 'cogPDSUser' in [p['name'] for p in timeline_properties]:
            cogPDSUser = await client.get_file([cogtask_me_ch['id'], item['id'], [p['id'] for p in timeline_properties if p['name'] == 'cogPDSUser'][0]])
        else:
            cogPDSUser = None
        timeline_item_data.append((summary['content'].decode('utf8'), begin['content'].decode('utf8'),
                                   cogPDSUser['content'].decode('utf8') if cogPDSUser is not None else None, json.loads(cogPDSJSON['content'])))

df = pd.DataFrame(timeline_item_data, columns=['Summary', 'Time', 'User', 'Detail'])
df

このようにして、Personary上のデータをメモリに読み込むことができます。これらの内容を**暗号化等をかけていないファイル等に出力する際は十分に取り扱いに注意**してください。

各データの `Detail` にはログが記録されています。

In [None]:
df['Detail'].values[-1]

# 分析例1: 動画再生時間(実時間)

動画の再生時間（実時間）がどの程度なのか抽出してみます。

PLAYING - PAUSED または PLAYING - ENDEDの間のtimeの変化を集計します。

In [None]:
# PC時計上での再生時間を計算する ... 結果はミリ秒で返す
def compute_pc_time_duration(log):
    playing = False # 状態の記憶用
    start = None
    duration = 0
    for action in log['data']['history']:
        if action['event']['type'] != 'StateChange':
            # 再生状態変化のみに着目
            continue
        if not playing:
            # チェック: 停止状態と思われる際に停止系イベントが来ることはない(はず) - 警告扱い
            if action['event']['state'] == 'PAUSED' or action['event']['state'] == 'ENDED':
                print('WARNING: Unexpected event', action)
            if action['event']['state'] == 'PLAYING':
                playing = True
                start = action['time']
        else:
            # チェック: 再生状態と思われる際に再生イベントが来ることはない
            if action['event']['state'] == 'PLAYING':
                print('WARNING: Unexpected event', action)
            if action['event']['state'] == 'BUFFERING' or action['event']['state'] == 'PAUSED' or action['event']['state'] == 'ENDED':
                playing = False
                # 再生開始時からの差分
                duration += action['time'] - start
                start = None
    return duration

compute_pc_time_duration(df['Detail'].values[-1])

In [None]:
# 動画上での再生時間を計算する ... 結果は秒で返す
def compute_movie_time_duration(log):
    playing = False # 状態の記憶用
    start = None
    duration = 0
    for action in log['data']['history']:
        if action['event']['type'] != 'StateChange':
            # 再生状態変化のみに着目
            continue
        if not playing:
            # チェック: 停止状態と思われる際に停止系イベントが来ることはない(はず) - 警告扱い
            if action['event']['state'] == 'PAUSED' or action['event']['state'] == 'ENDED':
                print('WARNING: Unexpected event', action)
            if action['event']['state'] == 'PLAYING':
                playing = True
                start = action['state']['currentTime']
        else:
            # チェック: 再生状態と思われる際に再生イベントが来ることはない(はず)
            if action['event']['state'] == 'PLAYING':
                print('WARNING: Unexpected event', action)
            if action['event']['state'] == 'BUFFERING' or action['event']['state'] == 'PAUSED' or action['event']['state'] == 'ENDED':
                playing = False
                # 再生開始時からの差分
                duration += action['state']['currentTime'] - start
                start = None
    return duration

compute_movie_time_duration(df['Detail'].values[-1])

In [None]:
# 最大の再生速度を計算する
def max_playback_rate(log):
    return max([action['state']['playbackRate'] for action in log['data']['history']])

max_playback_rate(df['Detail'].values[-1])

In [None]:
df['PCDuration'] = [compute_pc_time_duration(detail) / 1000.0 for detail in df['Detail'].values]
df['MovieDuration'] = [compute_movie_time_duration(detail) for detail in df['Detail'].values]
df['PlaybackRate'] = [max_playback_rate(detail) for detail in df['Detail'].values]
# ログの記録時間(PCの時計): 重複排除が必要かも...
df['Recorded'] = [detail['meta']['recorded'] for detail in df['Detail'].values]
df

動画視聴アプリでは、ログの取得漏れを減らすため、一時停止のたびにPLRへとログを送信しています。そのため、人ごとにデータを結合する必要があります。

この例では、 `User` カラムにユーザ識別子を入れてあります。この値はcogtask.meによって振られるもので、タスク内での識別に利用できます。（セキュリティの観点で、PLRのデータだけでは、タスク間の名寄せはできないようにしています。）

> 個人のデータはUser = Noneになります。

In [None]:
df.groupby('User').sum()

実視聴時間と動画内視聴時間の分布(単位は秒)... 実視聴時間に対して動画内視聴時間が長いということは、再生速度を上げている or 同じところを何度も繰り返してみている...  ことを示唆する。

In [None]:
df.groupby('User').sum().plot(kind='scatter', x='PCDuration', y='MovieDuration')

被験者ごとのPlayback Rate(の最大値)の頻度。2倍速で見ている人もそれなりに。

In [None]:
df[['User', 'PlaybackRate']].groupby('User').max().hist()