# slackのメッセージをbigqueryに格納

1. Retrieve all channel ids
2. Update mongo atlas channel collection
3. Retrieve conversation history
    1. Pick up one channel id
    2. Retrieve conversation history and ts of it
    3. Insert them to bigquery
    4. Update the latest ts in Mongo Atlas
    5. Back to No.1 and pick another channel id
4. Retrieve conversation reply
    1. Pick up one channel id and one conversasion history ts
    2. Retrieve conversation reply and ts of it
    3. Insert them to bigquery
    4. Update the latest ts in Mongo Atlas
    5. Back to No.1 and pick another conversasion history ts

In [None]:
from enum import Enum
import os

import requests
import json

from tqdm import tqdm
import pandas as pd
from google.cloud import bigquery
import pymongo

In [None]:
TOKEN = os.environ['SLACK_MESSAGE_SLACK_TOKEN']

MONGO_URI_SLACK_MESSAGE = os.environ['MONGO_URI_SLACK_MESSAGE']
mongo_client = pymongo.MongoClient(MONGO_URI_SLACK_MESSAGE).myslack

In [None]:
def parse_json(dic, prefixes=[]):
    new_dic = {}
    for key, value in dic.items():
        if type(value) == list:
            value = {str(i): v for i, v in enumerate(value)}
            
        if type(value) == dict:
            new_dic.update(parse_json(value, prefixes + [key]))
        else:
            new_dic[f"{'.'.join(prefixes + [key])}"] = value
    return new_dic


def write_to_jsonl(file_path, lis: list):
    with open(file_path, 'w', encoding='utf8') as f:
        for dic in lis:
            json.dump(dic, f, ensure_ascii=False)
            f.write('\n')


def insert_to_bigquery(json_list, table_name):
    bq_client = bigquery.Client()
    table_id = f"disco-name-210809.myslackchannel.{table_name}"

    errors = bq_client.insert_rows_json(table_id, json_list)
    if errors != []:
        print("Encountered errors while inserting rows: {}".format(errors))


class RequestType(Enum):
    CHANNELS = 1
    HISTORY = 2
    JOIN = 3
    

def request_slack_api(request_type: RequestType, 
                      payload: dict = {},
                     ):
    if request_type == RequestType.CHANNELS:
        url = "https://slack.com/api/conversations.list"
        payload["limit"] = "1000"
    elif request_type == RequestType.HISTORY:
        url = "https://slack.com/api/conversations.history"
    elif request_type == RequestType.JOIN:
        url = "https://slack.com/api/conversations.join"
        
    header = {
        "Authorization": "Bearer {}".format(TOKEN)
    }

    res = requests.get(url, headers=header, params=payload)
    return res.json()


def retrieve_slack_api(request_type: RequestType, 
                       payload: dict = {},
                      ):
    res = []
    res.append(request_slack_api(request_type, payload))
    if not res[-1]['ok']:
        print(res)
        return []

    while res[-1].get('has_more'):
        payload['cursor'] = res[-1]['response_metadata']['next_cursor']
        res.append(request_slack_api(request_type, payload))

    return res


def retrieve_channel_history(channel_id, latest_ts=0):
    messages = []
    payload = {
        "channel" : channel_id,
        "limit": 200,
        "oldest": latest_ts
    }
    
    res_list = retrieve_slack_api(RequestType.HISTORY, payload)
    [messages.extend(res['messages']) for res in res_list if res]
    return messages


def parse_channels(res: dict):
    flatten_df = pd.concat([pd.Series(parse_json(dic)) for dic in res['channels']], axis=1).T
    return flatten_df


def parse_messages(messages: list):        
    return pd.concat([pd.Series(parse_json(m)) for m in messages], axis=1).T

## Retrieve all channel ids

In [None]:
res_list = retrieve_slack_api(RequestType.CHANNELS)
channels = []
[channels.extend(res['channels']) for res in res_list]
channels

## Join all channels

In [None]:
for channel in tqdm(channels):
    channel_id = channel['id']
    payload = {
        'channel': channel_id
    }
    request_slack_api(RequestType.JOIN, payload)

## Retrieve conversation history

1. Pick up one channel id
2. Find the latest ts from Mongo Atlas, 0 if no collection inserted.
2. Retrieve conversation history and ts of it
3. Insert them to bigquery
4. Update the latest ts in Mongo Atlas
5. Back to No.1 and pick another channel id

In [None]:
for channel in tqdm(channels):
    channel_id = channel['id']
    
    latest_ts_channel = mongo_client.history.find_one({"channel_id": channel_id}, sort=[('ts', -1)])
    latest_ts = latest_ts_channel.get('ts', 0) if latest_ts_channel else 0
    messages = retrieve_channel_history(channel_id, latest_ts)

    if messages:
        for message in messages:
            message['channel_id'] = channel_id
        insert_to_bigquery(messages, table_name='history')
        messages.sort(key=lambda message: message['ts'])
        latest_ts = messages[-1]['ts']
        mongo_client.history.update_one({'channel_id': channel_id}, {'$set': {'ts': latest_ts}}, upsert=True)

## Retrieve conversation reply

1. Pick up one channel id and one conversasion history ts
1. Retrieve conversation reply and ts of it
1. Insert them to bigquery
1. Update the latest ts in Mongo Atlas
1. Back to No.1 and pick another conversasion history ts

In [None]:
def retrieve_replies(channel_id, thread_ts, latest_reply_ts=0):
    messages = []
    payload = {
        "channel" : channel_id,
        "ts": thread_ts,
        "oldest": latest_reply_ts,
        "limit": 200,
    }

    res_list = retrieve_slack_api(RequestType.HISTORY, payload)
    [messages.extend(res['messages']) for res in res_list if res]
    return messages

In [None]:
for channel in tqdm(channels):
    channel_id = channel['id']
    messages = retrieve_channel_history(channel_id)
        
    for message in messages:
        thread_ts = message['ts']
        latest_ts_history = mongo_client.reply.find_one({"channel_id": channel_id, 'thread_ts': thread_ts}, sort=[('ts', -1)])
        latest_reply_ts = latest_ts_history.get('ts', 0) if latest_ts_history else 0
        
        thread_messsages = retrieve_replies(channel_id, thread_ts, latest_reply_ts)
        if thread_messsages:
            for thread_messsage in thread_messsages:
                thread_messsage['channel_id'] = channel_id
            insert_to_bigquery(thread_messsages, table_name='reply')
            thread_messsages.sort(key=lambda m: m['ts'])
            latest_ts = thread_messsages[-1]['ts']
            mongo_client.reply.update_one({'channel_id': channel_id, 'thread_ts': thread_ts}, {'$set': {'ts': latest_ts}}, upsert=True)

#### TODO
- [ ] channelの情報をMongoのchannelコレクションからbqに移す
- [ ] threadがないmessageとか、チャンネルがarchiveされているときとかに無駄な処理が発生している