# Twitch Recommendation System

Group Members: Ritwika Das, Nideesh Bharath Kumar, Shriya Shrestha

## Import Statements

In [None]:
!pip install websocket-client requests pyngrok

In [None]:
!pip install transformers datasets accelerate

In [None]:
import os
import json
import random
import time
import re
import threading
import requests
from pyngrok import conf, ngrok
import websocket
from datetime import datetime
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM, Trainer, TrainingArguments, DataCollatorForLanguageModeling
from datasets import load_dataset
import torch
import glob
from difflib import get_close_matches
from sentence_transformers import SentenceTransformer, util
import numpy as np
import matplotlib.pyplot as plt

## Twitch API Integration

Client ID and Client Secret Keys to access Twitch API

In [None]:
CLIENT_ID = 'gqvdji473lk1d7ka471jn9db93qjr9'
CLIENT_SECRET = 'd2365e4zr946878dxy3q49wp4r78vj'

Getting the Oauth Token from Twitch API to allow authorization.

In [None]:
def get_twitch_token():
    url = 'https://id.twitch.tv/oauth2/token'
    params = {
        'client_id': CLIENT_ID,
        'client_secret': CLIENT_SECRET,
        'grant_type': 'client_credentials'
    }
    res = requests.post(url, params=params)
    res.raise_for_status()
    return res.json()['access_token']

access_token = get_twitch_token()

HEADERS = {
    'Client-ID': CLIENT_ID,
    'Authorization': f'Bearer {access_token}'
}

Gets a list of the live streamers currently streaming on Twitch, and returns the list as a .json file with the streamer usernames.

In [None]:
def get_live_streamers(limit):
    url = 'https://api.twitch.tv/helix/streams'
    params = {'first': limit}
    res = requests.get(url, headers=HEADERS, params=params)
    res.raise_for_status()
    data = res.json()['data']
    return [stream['user_name'].lower() for stream in data]

Cleans the messages received from Twitch, by removing any links, characters, and making all words lowercase to simplify training of the data.

In [None]:
def clean_message(msg):
    msg = re.sub(r"http\S+", "", msg)
    msg = re.sub(r"[^A-Za-z0-9\s]+", "", msg)
    return msg.lower().strip()

Entire class that has all functions for getting a message, cleaning it, storing it into a json based on the streamer, and placing it all in a folder path.

In [None]:
class TwitchChatCollector:
    def __init__(self, streamer):
        self.streamer = streamer
        self.messages = []
        self.ws = None

    def on_message(self, ws, message):
        if "PRIVMSG" in message:
            try:
                parts = message.split(":", 2)
                if len(parts) > 2:
                    raw_msg = parts[2]
                    cleaned = clean_message(raw_msg)
                    self.messages.append(cleaned)
                    print(f"[{self.streamer}] {cleaned}")
            except Exception as e:
                print(f"Error parsing message: {e}")

    # Opens the Twitch account to recieve the data from Twitch API
    def on_open(self, ws):
        ws.send("PASS oauth:tospw7iiv95sk1rqg5092rndixx70n")
        ws.send("NICK data_science_project")
        ws.send(f"JOIN #{self.streamer}")

    # If an error occurs while collecting streamer information
    def on_error(self, ws, error):
        print(f"[{self.streamer}] WebSocket error: {error}")

    # Closing the collecting data connection
    def on_close(self, ws, code, msg):
        print(f"[{self.streamer}] Closed connection")

    # Start recieving any chat data from Twitch API
    def start(self):
        self.ws = websocket.WebSocketApp(
            "wss://irc-ws.chat.twitch.tv:443",
            on_open=self.on_open,
            on_message=self.on_message,
            on_error=self.on_error,
            on_close=self.on_close
        )
        self.thread = threading.Thread(target=self.ws.run_forever)
        self.thread.start()


    # Stop recieving any chat data from Twitch API
    def stop(self):
        if self.ws:
            self.ws.close()
            self.thread.join()

    # Saving all the chat data recieved from each streamer into .json files and storing them in a folder path
    def save_messages(self, folder_path="twitch_chat_logs"):
        os.makedirs(folder_path, exist_ok=True)

        filename = f"{self.streamer}_chat_{datetime.now().strftime('%Y%m%d%H%M%S')}.json"
        filepath = os.path.join(folder_path, filename)

        with open(filepath, 'w') as f:
            json.dump(self.messages, f, indent=2)

        print(f"Saved {len(self.messages)} messages for {self.streamer} to {filepath}")

## Collection of Chat Data

Main function that prints out all the chat data it is receiving from Twitch API and then displaying the total amount of chats from each streamer after collecting for about 60 seconds.

In [None]:
def main():
    folder_path = "twitch_chat_logs"
    streamers = get_live_streamers(limit=20)
    collectors = [TwitchChatCollector(s) for s in streamers]

    print(f"Starting chat collection for: {streamers}")

    for collector in collectors:
        collector.start()

    # Collect chat data for 60 seconds
    time.sleep(60)

    for collector in collectors:
        collector.stop()
        collector.save_messages(folder_path=folder_path)

if __name__ == "__main__":
    main()

## Converting JSON to DataFrame

These are example user based input templates made for the model to determine what streamer would be a best fit for them.

In [None]:
preference_templates = [
    "I'm looking for a calm and cozy community.",
    "I want an energetic and funny chat with lots of memes.",
    "I prefer strategic talk and respectful discussion.",
    "I like hype moments and esports energy.",
    "I'm into chill vibes and friendly interactions.",
    "I want chaotic and spammy but hilarious chat.",
    "Looking for streamer with a welcoming and kind chat.",
    "I want a toxic but entertaining and argumentative chat."
]

Creates training samples from the .json files of chat data we created right before this. Therefore we can train the model on this created dataset.

In [None]:
# Selects a random message from the list of chats in the .json file
def sample_messages(messages, num=20):
    return "\n".join(random.sample(messages, min(num, len(messages))))

# Generates a dataset by going through each .json file in the folder path and seperating information based on prompts, chats and responses
def generate_dataset(chat_folder):
    dataset = []
    for file in os.listdir(chat_folder):
        if file.endswith(".json"):
            with open(os.path.join(chat_folder, file), 'r') as f:
                messages = json.load(f)
                streamer = file.replace(".json", "").split("_chat_")[0]
                for _ in range(5):
                    entry = {
                        "prompt": random.choice(preference_templates),
                        "chats": sample_messages(messages, num=20),
                        "response": streamer
                    }
                    dataset.append(entry)
    return dataset

dataset = generate_dataset("twitch_chat_logs")
with open("chat_recommendation_dataset.json", "w") as f:
    for item in dataset:
        f.write(json.dumps(item) + "\n")

print(f"Created {len(dataset)} training samples.")

Going through the dataset created and checking the information and display some of the data to verify it was done correctly

In [None]:
with open("chat_recommendation_dataset.json", "r") as f:
    data = [json.loads(line) for line in f]


df = pd.DataFrame(data)

print(df.head())
print(df.describe(include='all'))
print(df['response'].value_counts())

## Model Creation and Training

Originally, we were using DistilGPT2 as our LLM model.

In [None]:
os.environ["WANDB_DISABLED"] = "true"
dataset = load_dataset('json', data_files='chat_recommendation_dataset.json', split='train')

model_name = "distilgpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

def format_for_lm(example):
    prompt = f"User: {example['prompt']}\nChat:\n{example['chats']}\nRecommend:"
    target = f" {example['response']}"
    example['text'] = prompt + target
    return example

dataset = dataset.map(format_for_lm)

def tokenize(example):
    return tokenizer(example["text"], truncation=True, padding="max_length", max_length=512)

tokenized_dataset = dataset.map(tokenize, batched=True)
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)

training_args = TrainingArguments(
    output_dir="./llm_chat_recommender",
    overwrite_output_dir=True,
    per_device_train_batch_size=2,
    num_train_epochs=3,
    save_steps=500,
    save_total_limit=2,
    logging_dir="./logs",
    logging_steps=10,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator
)

trainer.train()

Saves the training of the model, so that we do not have to keep training the model over and over.

In [None]:
trainer.save_model("./llm_chat_recommender_final")
tokenizer.save_pretrained("./llm_chat_recommender_final")

from transformers import AutoTokenizer, AutoModelForCausalLM

model = AutoModelForCausalLM.from_pretrained("./llm_chat_recommender_final")
tokenizer = AutoTokenizer.from_pretrained("./llm_chat_recommender_final")

In [None]:
def load_chat_log(file_path, max_lines=20):
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    return "\n".join([line.strip() for line in lines[-max_lines:]])

log_files = glob.glob("twitch_chat_logs/*.json")
latest_log = max(log_files, key=os.path.getctime)
chat_text = load_chat_log(latest_log)

## Testing

Generating a reccomendation based on the trained model.

In [None]:
def generate_recommendation(user_input, chat_logs, model, tokenizer, valid_streamers):
    prompt = f"""User: {user_input}
Chat:\n{chat_logs}
Recommend:"""

    # Tokenize the input prompt
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    # Generate the output using the model
    output = model.generate(
        **inputs,
        max_new_tokens=10,
        do_sample=True,
        top_k=50,
        top_p=0.95,
        temperature=0.6,
        pad_token_id=tokenizer.eos_token_id
    )

    # Decode the output
    decoded = tokenizer.decode(output[0], skip_special_tokens=True).strip()


    # Remove the prompt part if the model repeats it
    predicted = decoded.split("Recommend:")[-1].strip() if "Recommend:" in decoded else decoded

    print("decoded" + str(decoded))

    # Debug print statements for raw output and predicted part
    print("RAW MODEL OUTPUT:", decoded)
    print("PREDICTED PART ONLY:", predicted)

    # First, try direct containment to match a streamer name
    for streamer in valid_streamers:
        if streamer.lower() in predicted.lower():
            return streamer

    # If no direct match, use fuzzy matching to find the closest streamer
    matches = get_close_matches(predicted.lower(), valid_streamers, n=1, cutoff=0.2)
    if matches:
        return matches[0]

    # If no match is found, return the "Unknown" message with the model's output
    return f"Unknown — model output: {predicted}"

In [None]:
try:
    valid_streamers = get_live_streamers(limit=5)
    print("Valid streamers:", valid_streamers)
except requests.exceptions.RequestException as e:
    print(f"Error fetching live streamers: {e}")
    valid_streamers = []

user_input = "I want an energetic and funny chat with lots of memes."
streamer_key = generate_recommendation(user_input, chat_text, model, tokenizer, valid_streamers)

print("Recommended Streamer:", streamer_key)

## Updated Model

After observing that our previous model was not always outputting a valid streamer, we realized that using fuzzy matching was not an optimal way to train based off of this data. Therefore, we decided to switch to a different model all-MiniLM-L6-v2. This model allowed better classification and improved in detecting a proper streamer based on the user prompts.

In [None]:
prompt = "I want an energetic and funny chat with lots of memes."
llm  = SentenceTransformer("all-MiniLM-L6-v2")

def tidy(chat: str) -> str:
    return re.sub(r"\s+", " ", chat.strip()).lower()

body = [prompt]
files  = []

for path in glob.glob("twitch_chat_logs/*.json"):
    with open(path, encoding="utf-8") as f:
        doc = " ".join(re.sub(r"\s+", " ", s.strip()).lower() for s in json.load(f) if isinstance(s, str))
    body.append(doc)
    files.append(os.path.basename(path))

chat_embed = llm.encode(body, convert_to_tensor=True)
scores = util.cos_sim(chat_embed[0], chat_embed[1:]).cpu().numpy().flatten()

rec = files[scores.argmax()]
print(f"Streamer Rec: {rec.split('_')[0]}")

print("\nTop 10 Rec:")
for i in (-scores).argsort()[:10]:
    print(f"{files[i]}  {scores[i]}")

## Plots

This plot shows all the top similarity scores for streamers

In [None]:
top_idx = np.argsort(-scores)[:10]
top_files = [files[i].split('_')[0] for i in top_idx]
top_scores = scores[top_idx]

print("Top 10 Recommendations:")
for name, sc in zip(top_files, top_scores):
    print(f"{name}: {sc:.4f}")

plt.figure()
plt.bar(top_files, top_scores)
plt.xlabel("Streamer")
plt.ylabel("Cosine Similarity Score")
plt.xticks(rotation=45, ha="right")
plt.title("Top 10 Streamer Recommendations")
plt.tight_layout()
plt.show()

This plot shows the spread of all the similarity scores

In [None]:
plt.figure()
plt.hist(scores, bins=20)
plt.xlabel("Cosine Similarity")
plt.ylabel("Count of Logs")
plt.title("Histogram of All Similarity Scores")
plt.show()

This plot shows the falloff in similarity scores related to the prompt

In [None]:
ranks = np.arange(1, len(scores)+1)
sorted_scores = np.sort(scores)[::-1]
plt.figure()
plt.scatter(ranks, sorted_scores)
plt.xlabel("Rank")
plt.ylabel("Cosine Similarity")
plt.title("Ranked Similarity Scores")
plt.show()