In [None]:
import socket
import re
import requests
import time
from datetime import datetime
from threading import Thread, Lock
from google.cloud import bigquery

# -------------------------------
# BigQuery configurations
PROJECT_ID = "twitch-realtime-chats"
DATASET_ID = "202503200809"
TABLE_ID = "twich_chat_log_partitioned"
CREDENTIALS_PATH = "/content/twitch-realtime-chats-e6edd4d2d2ab.json"

# Initialise BigQuery client
bq_client = bigquery.Client.from_service_account_json(CREDENTIALS_PATH)
table_ref = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"

# -------------------------------
# Twitch API
CLIENT_ID = "gp762nuuoqcoxypju8c569th9wz7q5"
OAUTH_TOKEN = "oauth:mb053vdgfqc0u2m7folpig78vgxdke"
USERNAME = "danieljia36"
API_URL = "https://api.twitch.tv/helix/streams"
GAMES_API_URL = "https://api.twitch.tv/helix/games"

# Twitch IRC
IRC_SERVER = "irc.chat.twitch.tv"
IRC_PORT = 6667

# game list
GAME_LIST = [
    "League of Legends",
    "Counter-Strike",
    "Grand Theft Auto V",
    "Assassin's Creed Shadows",
    "Fortnite",
    "VALORANT",
    "Marvel Rivals",
    "Apex Legends",
    "Rainbow Six Siege"
]

#
joined_channels = {}

#
sock = None
sock_lock = Lock()

# -------------------------------
def connect():
    """ connect to Twitch IRC server，update global sock """
    global sock
    while True:
        try:
            s = socket.socket()
            s.settimeout(60)
            s.connect((IRC_SERVER, IRC_PORT))
            s.send(f"PASS {OAUTH_TOKEN}\n".encode("utf-8"))
            s.send(f"NICK {USERNAME}\n".encode("utf-8"))
            print("✅ Connected to Twitch IRC")
            with sock_lock:
                sock = s
            return s
        except Exception as e:
            print(f"⚠ Connection failed，retry in 10s... error: {e}")
            time.sleep(10)

def insert_chat_data_batch(rows):
    """ write data into BigQuery by batch """
    if not rows:
        return
    errors = bq_client.insert_rows_json(table_ref, rows)
    if errors:
        print(f"❌ BigQuery write error: {errors}")
    else:
        print(f"✅ Write into BigQuery {len(rows)}  rows")

def get_game_ids(game_names):
    headers = {
        "Client-ID": CLIENT_ID,
        "Authorization": f"Bearer {OAUTH_TOKEN.split(':')[1]}"
    }
    params = [("name", game) for game in game_names]
    response = requests.get(GAMES_API_URL, headers=headers, params=params)
    if response.status_code == 200:
        data = response.json()
        return {game["name"]: game["id"] for game in data.get("data", [])}
    else:
        print(f"Failed to get game id: {response.text}")
        return {}

def get_live_channels(game_ids):
    headers = {
        "Client-ID": CLIENT_ID,
        "Authorization": f"Bearer {OAUTH_TOKEN.split(':')[1]}"
    }
    live_channels = {}
    for game_name, game_id in game_ids.items():
        params = {"game_id": game_id, "first": 20}
        response = requests.get(API_URL, headers=headers, params=params)
        if response.status_code == 200:
            data = response.json()
            live_channels[game_name] = [stream["user_login"] for stream in data.get("data", [])]
        else:
            print(f"Failed to get {game_name} channel: {response.text}")
    return live_channels

def connect_to_channel(s, channel, game_name):
    try:
        s.send(f"JOIN #{channel}\n".encode("utf-8"))
        joined_channels[channel] = game_name
        print(f"Joined: {channel} (Game: {game_name})")
    except Exception as e:
        print(f"Joined {channel} fail: {e}")
        new_sock = connect()
        connect_to_channel(new_sock, channel, game_name)

def listen_chat():
    """ listen to chats in multiple channels，write data into BigQuery by batch"""
    global sock
    buffer = []
    last_flush = time.time()
    while True:
        try:
            with sock_lock:
                s = sock
            # errors="replace" ,for encoding failure.
            raw_data = s.recv(2048)
            resp = raw_data.decode("utf-8", errors="replace").strip()

            if not resp:
                print("⚠ empty message, reconnecting...")
                s = connect()
                continue

            if resp.startswith("PING"):
                s.send("PONG :tmi.twitch.tv\n".encode("utf-8"))
                continue

            if "JOIN" in resp:
                print(f"✅ Join channel confirmed: {resp}")
                continue

            if "PRIVMSG" in resp:
                try:
                    user_name = re.search(r":(\w+)!", resp).group(1)
                    message_match = re.search(r"PRIVMSG #([\w]+) :(.*)", resp)
                    channel = message_match.group(1)
                    chat_message = message_match.group(2)
                    timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
                    game_name = joined_channels.get(channel, "Unknown Game")
                    row = {
                        "timestamp": timestamp,
                        "game_name": game_name,
                        "channel": channel,
                        "user_name": user_name,
                        "message": chat_message
                    }
                    buffer.append(row)
                    # Write data into table per 1000 rows
                    if len(buffer) >= 1000:
                        insert_chat_data_batch(buffer)
                        buffer.clear()
                        last_flush = time.time()
                except Exception as parse_e:
                    print(f"Parse message failure: {parse_e}")
                    continue
        except socket.timeout:
            print("⚠ No new message in 60s, reconnecting...")
            s = connect()
            continue
        except (socket.error, BrokenPipeError) as e:
            print(f"⚠ Connection error: {e}，reconneting...")
            s = connect()
            continue

def update_channels():
    global sock
    game_ids = get_game_ids(GAME_LIST)
    while True:
        live_channels = get_live_channels(game_ids)
        for game_name, channels in live_channels.items():
            for channel in channels:
                with sock_lock:
                    s = sock
                connect_to_channel(s, channel, game_name)
                time.sleep(2)
        time.sleep(60)

if __name__ == "__main__":
    sock = connect()
    t_listen = Thread(target=listen_chat, daemon=True)
    t_update = Thread(target=update_channels, daemon=True)
    t_listen.start()
    t_update.start()
    t_listen.join()
    t_update.join()


[1;30;43m流式输出内容被截断，只能显示最后 5000 行内容。[0m
:theazertio!theazertio@theazertio.tmi.twitch.tv PRIVMSG #midbeast :Deep
Joined: martinmartin_m2x (Game: League of Legends)
✅ 加入频道确认: :dmeney!dmeney@dmeney.tmi.twitch.tv PRIVMSG #locking :cara ta em sp e n dá um salve pra nada
:danieljia36!danieljia36@danieljia36.tmi.twitch.tv JOIN #martinmartin_m2x
:danieljia36.tmi.twitch.tv 353 danieljia36 = #martinmartin_m2x :danieljia36
:danieljia36.tmi.twitch.tv 366 danieljia36 #martinmartin_m2x :End of /NAMES list
:zxsayonara!zxsayonara@zxsayonara.tmi.twitch.tv PRIVMSG #get_right :ahahaha :)))
:elyogaming!elyogaming@elyogaming.tmi.twitch.tv PRIVMSG #strey_lol :Combien de games avant les ranked?
Joined: clovaobengala (Game: League of Legends)
✅ 加入频道确认: :danieljia36!danieljia36@danieljia36.tmi.twitch.tv JOIN #clovaobengala
Joined: chap_gg (Game: League of Legends)
✅ 加入频道确认: :pakapakapackers!pakapakapackers@pakapakapackers.tmi.twitch.tv PRIVMSG #darkzelk :www
:danieljia36!danieljia36@danieljia36.tmi.twitch.tv 