<a href="https://colab.research.google.com/github/stcoats/VoD_toolkit/blob/main/VoD_toolkit_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

If using WhisperX, ensure that "Runtime", from the dropdown menu, is GPU.

1. Install the necessary repositories. Don't restart if you get the message "WARNING: The following packages were previously imported in this runtime:
  [pydevd_plugins]". Just click "Cancel".

In [None]:
!pip install yt-dlp
!pip install webvtt-py==0.4.6
!pip install git+https://github.com/m-bain/whisperx.git

In [None]:
import pandas as pd
from bs4 import BeautifulSoup
from IPython.display import HTML
import yt_dlp
import glob
import re
from datetime import datetime, timedelta
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker


2. Use the code below if you are retrieving VoD from YouTube. If you are retrieving VoD from Twitch, go to 3. below.

In [None]:
URLS = ['https://www.youtube.com/watch?v=cUUuRK3Rm4k']

ydl_opts = {#'format':'ba[ext=m4a]/ba[ext=mp4]',
                    #'overwrites' : True,
                    #'extract-audio':False,
                    #'audio-format':'wav',
                    'writesubtitles':True,

                    'writeautomaticsub': True, #This will retrieve the YT-generated Automatic Speech Recognition transcript (not manual transcripts/captions)
                    'subtitleslangs': ['en','live_chat'],
                    'outtmpl': '/content/%(uploader)s/%(upload_date)s--%(id)s--%(title)s.%(ext)s', #Here we are using yt-dl syntax to capture the channel name, upload date, video id, video title, and file extension
                    'skip_download':True,
                    #'ignoreerrors': True,
}
    # ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments
    #'postprocessors': [{  # Extract audio using ffmpeg
    #    'key': 'FFmpegExtractAudio',
    #    'preferredcodec': 'm4a',
    #}]
#}

with yt_dlp.YoutubeDL(ydl_opts) as ydl:
    error_code = ydl.download(URLS)

3. For Twitch, a patched version of yt-dlp must be installed

In [None]:
!pip uninstall yt-dlp -y
!git clone https://github.com/mpeter50/yt-dlp
%cd yt-dlp
!pip install -r requirements.txt
!pip install -e .
!git checkout twitchvod-livechat

This retrieves the audio from a Twitch VoD.

In [None]:
!yt-dlp -S "+size,+wa" --extract-audio 'https://www.twitch.tv/videos/2231052156' -o "/content/%(uploader)s/%(title)s.f%(format_id)s.%(ext)s" --extractor-args Twitch:device_id=did;client_integrity=cit1

To get the Twitch chat transcript, we will use the TwitchDowloaderCLI

In [None]:
!wget https://github.com/lay295/TwitchDownloader/releases/download/1.55.0/TwitchDownloaderCLI-1.55.0-Linux-x64.zip
!unzip TwitchDownloaderCLI-1.55.0-Linux-x64.zip
!sudo chmod +x TwitchDownloaderCLI
!./TwitchDownloaderCLI chatdownload --id 2231052156 -o /content/out.html

In [None]:

def parse_twitch_comments(file_path):
    """
    Parses a Twitch chat log HTML file and returns a DataFrame with embedded images for badges and emotes.

    Parameters:
        file_path (str): The path to the input HTML file.

    Returns:
        pd.DataFrame: A DataFrame containing 'time', 'author', and 'message'.
    """
    # Read the HTML content from the file
    with open(file_path, "r", encoding="utf-8") as file:
        html_data = file.read()

    # Parse the HTML using BeautifulSoup
    soup = BeautifulSoup(html_data, 'html.parser')

    # List to hold the parsed data
    data = []

    # Extract information from each 'pre' element
    for comment in soup.find_all('pre', class_='comment-root'):
        time = comment.text.split(']')[0].strip('[')  # Extract timestamp

        # Find the badge image (if exists)
        badge_img_tag = comment.find('img', class_='badge-image')
        badge_img = badge_img_tag['src'] if badge_img_tag else None  # Extract image URL

        # Extract author name
        author = comment.find('span', class_='comment-author').text if comment.find('span', class_='comment-author') else None

        # Create author column combining username and badge image
        if badge_img:
            author = f'{author} <img src="{badge_img}" width="20">'  # Append badge image next to author name

        # Extract comment message
        message = comment.find('span', class_='comment-message').text.strip(': ') if comment.find('span', class_='comment-message') else ""

        # Find and append emote images within the message
        for emote_img_tag in comment.find_all('img', class_='emote-image'):
            emote_img_url = emote_img_tag['src']
            emote_img_tag = f'<img src="{emote_img_url}" width="20">'
            message += f' {emote_img_tag}'  # Append emote image within the message

        # Append the extracted data to the list
        data.append([time, author, message])

    # Create a DataFrame
    df = pd.DataFrame(data, columns=['time', 'author', 'message'])

    # Convert the DataFrame to HTML and display the rendered images
    #return HTML(df.to_html(escape=False))
    return df
# Example usage:
twitch_df = parse_twitch_comments("/content/out.html")
# display(result)

If ASR captions are not available, they can be created with WhisperX

In [None]:
import whisperx
import gc

device = "cuda"
audio_file = glob.glob("/content/*/*.m*")
batch_size = 16 # reduce if low on GPU mem
compute_type = "float16" # change to "int8" if low on GPU mem (may reduce accuracy)
model = whisperx.load_model("small", device, compute_type=compute_type)

audio = whisperx.load_audio(audio_file[0])
result = model.transcribe(audio, batch_size=batch_size)
model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
whisper_df = pd.DataFrame(result["segments"])
whisper_df1 = whisper_df[["start","text"]]
whisper_df1.columns = ["time","text"]

In [None]:
whisper_df1

In [None]:
#This cell is for YouTube VoD

json_path = glob.glob("/content/P*/*.json")
vtt_path =  glob.glob("/content/P*/*.en.vtt")
json_data = pd.read_json(json_path[0],
                    orient='records',
                    lines=True)

In [None]:
vtt_text = open(vtt_path[0], "r").read()
hits = re.findall(r"(?<!<)\d\d\:\d\d\:\d\d\.\d\d\d.*\n.+?\n.+?\n", vtt_text, flags=re.M)
raw_text = [(x.split("\n")[0], re.sub("(?:<\s?\d\d\:\d\d\:\d\d\.\d\d\d>|</?c>)","",x.split("\n")[-2])) for x in hits]
text1 = [re.sub(" align:start position:0%","",y[0]) for y in raw_text if not y[1] == " "]
text2 = [re.sub(" align:start position:0%","",y[1]) for y in raw_text if not y[1] == " "]
text3 = []
for z in zip(text1,text2):
    text3.append(z)
pd.DataFrame(text3)

transcript_df = pd.DataFrame(text3)

In [None]:
def extract_content(input_df):
  extracted_messages = []
  for entry in input_df['replayChatItemAction']:
      #entry1 = json.loads(entry)
      actions = entry.get("actions", [])
      for action in actions:
          item = action.get("addChatItemAction", {}).get("item", {})
          renderer = item.get("liveChatTextMessageRenderer", {})
          message_runs = renderer.get("message", {}).get("runs", [])
          author_name = renderer.get("authorName", {}).get("simpleText", "")
          timestamp_usec = renderer.get("timestampText", {}).get("simpleText", "")
          badge_url = ''

          try:
              # Get author badges (if any)
              badges = renderer.get("authorBadges", [])

              # Loop through the badges and find the 32px thumbnail
              for badge in badges:
                  badge_thumbnails = badge.get('liveChatAuthorBadgeRenderer', {}).get('customThumbnail', {}).get('thumbnails', [])
                  for thumbnail in badge_thumbnails:
                      if thumbnail.get('width') == 32:  # Check if it's a 32-pixel image
                          badge_url = thumbnail.get('url', 'No badge')
                          break
          except (KeyError, IndexError):
              badge_url = ''
          if badge_url != '':
            author_name = author_name + ' <img src=' + badge_url + '>'

          # Initialize message content
          message_content = []
          for run in message_runs:
              if "text" in run:
                  message_content.append(run["text"])
              elif "emoji" in run:
                  emoji_info = run["emoji"]
                  emoji_label = emoji_info.get("image", {}).get("accessibility", {}).get("accessibilityData", {}).get("label", "")
                  if len(emoji_info.get("image", {}).get("thumbnails", {})) == 1:
                      emoji_image = emoji_info.get("image", {}).get("thumbnails", {})[0].get("url", "")
                  else:
                      emoji_image = emoji_info.get("image", {}).get("thumbnails", {})[1].get("url", "")
                  if emoji_info.get("isCustomEmoji") == True:
                      message_content.append(f'<img src="{emoji_image}">')
                  else:
                      message_content.append(f"{emoji_label}")  # Formatting the emoji with colons

          # Join all parts of the message (text and emojis)
          full_message = "".join(message_content)

          # Append the extracted data to the list
          extracted_messages.append({
              "timestamp_usec": timestamp_usec,
              "author": author_name,
              "message": full_message
          })
  return extracted_messages


In [None]:
chat_df = pd.DataFrame(extract_content(json_data))
chat_df

In [None]:
chat_df

In [None]:
chat_df.columns = ["time","author","message"]
transcript_df.columns = ["time","text"]
transcript_df["time"] = [x.split(" --> ")[0] for x in transcript_df.time]

In [None]:
def string_to_timedelta(time_str):
    if not pd.isnull(time_str):
        # Split the time string into hours, minutes, and seconds
        if time_str.startswith("-"):
            time_str = time_str.replace("-","")
            parts = time_str.split(':')
            if len(parts) == 3:
                hours = int(parts[0])
                minutes = int(parts[1])
                seconds = float(parts[2])
            elif len(parts) == 2:
                hours = 0
                minutes = int(parts[0])
                seconds = float(parts[1])
            else:
                return None

            # Convert hours, minutes, and seconds to timedelta
            time_delta = -timedelta(hours=hours, minutes=minutes, seconds=seconds).total_seconds()
        else:
            parts = time_str.split(':')
            if len(parts) == 3:
                hours = int(parts[0])
                minutes = int(parts[1])
                seconds = float(parts[2])
            elif len(parts) == 2:
                hours = 0
                minutes = int(parts[0])
                seconds = float(parts[1])
            else:
                return None

            # Convert hours, minutes, and seconds to timedelta
            time_delta = timedelta(hours=hours, minutes=minutes, seconds=seconds).total_seconds()

        return time_delta

Merge the chat and transcript data for YouTube VoDs

In [None]:
chat_df["time"] = chat_df["time"].apply(lambda x: string_to_timedelta(x))
transcript_df["time"] = transcript_df["time"].apply(lambda x: string_to_timedelta(x))
merged_df = pd.merge_ordered(transcript_df, chat_df, on='time')
#If you have the WhisperX ASR transcript, rather than the one provided by YouTube, comment out the line above and use the line below
#merged_df = pd.merge_ordered(whisper_df1, chat_df, on='time')


Merge the chat and transcript data for Twitch VoDs

In [None]:
twitch_df["time"] = twitch_df["time"].apply(lambda x: string_to_timedelta(x))
merged_df = pd.merge_ordered(whisper_df1, twitch_df, on='time')
merged_df = merged_df[merged_df["time"].notnull()].sort_values(by="time")

In [None]:
# Save DataFrame to HTML
merged_df1 = merged_df.fillna("")
merged_df1.to_html("/content/output.html", index=False, render_links=True, escape=False)

# HTML Content to add (Bootstrap + Custom Styles + JavaScript for sortable table)
html_content = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Document</title>
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.5.2/css/bootstrap.min.css">
<style>
  .table-wrapper {
    overflow: auto;
    margin: 0 40px;
  }
  table {
    width: 100% !important;
  }
</style>
<script src="https://code.jquery.com/jquery-3.5.1.slim.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/sticky-table-headers/js/jquery.stickytableheaders.min.js"></script>
</head>
<body>
<div class="table-wrapper">
<table border="1" class="dataframe table table-striped">
  <thead class="thead-dark">
    <tr style="text-align: left;">
"""

# Read the original HTML file
with open("/content/output.html", "r") as file:
    original_html = file.read()


# Replace the first part of the HTML with our header and add the original table
updated_html = html_content + original_html.replace('<table border="1" class="dataframe">\n  <thead>\n    <tr style="text-align: right;">',
                     '<table border="1" class="dataframe table table-striped">\n <thead class="thead-dark">\n <tr style="text-align: left;">') + """
</div>
<script>
  $(document).ready(function() {
    $("table").stickyTableHeaders();
  });
</script>
</body>
</html>
"""

# Write the modified HTML back to the file
with open("/content/output.html", "w") as file:
    file.write(updated_html)

In [None]:
mer2 = merged_df.groupby(['time'])\
    .agg(chat=('message','count'),
         speech=('text','count')).reset_index()

mer2['time'] = (mer2['time'] / 60).astype(int)

# Group by the new time column and sum the other columns
result_df = mer2.groupby('time').sum().reset_index()
result_df2 = pd.melt(result_df, id_vars='time', value_vars=['chat', 'speech'])
result_df2.columns = ["time", "mode","value"]

In [None]:
sns.set_theme(style="darkgrid")
sns.set_context("talk")
plt.figure(figsize=(12, 6))
ax = sns.lineplot(data=result_df2, x='time', y='value', hue='mode')
ax.xaxis.set_major_locator(ticker.MultipleLocator(5))
ax.xaxis.set_major_formatter(ticker.ScalarFormatter())
plt.xlabel('Time in minutes')
plt.ylabel('Message Density')
plt.title('Message Density Over Time')
plt.xticks(rotation=45)
plt.tight_layout()

plt.savefig("message_density.png",dpi=600)
plt.show()