<a href="https://colab.research.google.com/github/miaortizma/jap2srs/blob/main/V3_JAP_SONG2SRS_2024.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


Transcription approach:

- Use demucs to split vocal track from drum, bass, and other (yes 'other' is a class on it's own)


10-nov-2024:

Notes for improvement:

- integrate git

Add pitch accent to notes:
- Update many decks at the same time (scale up)
- If we have kana readings, use them to avoid mismatch between reading and OJAD
- Remove reading from raw material for rare readings (e.g at hyouri ittai)

Lyrics syncing
- Only apply furigana to kanji! Not hiragana or katana, nor endings  (i.e put furigana on ki of kita but not on ta)

**Lyrics Scraping Approach (31-Oct):**

**Quick Summary:**


1. **Initial Search:** Take the transcribed lyrics and combine the first few verses into a single string, `S`.
2. **Google Search:** Search for `S` on Google to locate the lyrics page.
3. **Extract Lyrics:** Use BeautifulSoup to fetch the entire lyrics text.
4. **Verse Splitting:** Split verses using `<br>` as the delimiter.
5. **Fuzzy Matching:** Apply fuzzy matching (e.g., Levenshtein distance) to align verses with Whisper segments.

**Details & Notes:**

- `get_text()` typically returns all paragraphs in a single string rather than separate paragraphs. As lyrics are usually displayed together on websites, using `get_text()` tends to merge them into one string. By default, BeautifulSoup replaces `<br>` with spaces, but replacing `<br>` with a unique separator helps maintain verse separation.
- After identifying the best match, fuzzy search can be further used for word- or character-level matching within segments in the final implementation.


In [None]:
%%writefile requirements.txt
pykakasi
genanki
openai-whisper
whisper-timestamped
yt-dlp==2024.10.22
fuzzywuzzy
fuzzysearch
pydub
google-colab-selenium
selenium


In [None]:
%%capture
!apt-get install fonts-noto
!pip install uv
!uv pip install --system -r requirements.txt

In [None]:
#%%capture
#!pip install "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
#!pip install -U xformers --index-url https://download.pytorch.org/whl/cu121
#!pip install -U bitsandbytes triton trl peft

# 2024.4.9


In [None]:
import yt_dlp
import whisper
import pandas as pd
import matplotlib.pyplot as plt

import re


# YouTubeから動画をダウンロードする関数
def download_youtube_video(url):

    youtube_info = get_youtube_info(url)
    song_title = youtube_info["song_title"]

    ydl_opts = {
        'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
        'outtmpl': f'{song_title}.%(ext)s',  # ファイル名のテンプレート
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info_dict = ydl.extract_info(url, download=True)
        output_filename = ydl.prepare_filename(info_dict)
    print(f"downloaded {output_filename}")

    return output_filename

def get_song_title(input_string):
    # 正規表現パターン：日本語のタイトルは「」で囲まれている
    pattern = r'「(.*?)」'
    match = re.search(pattern, input_string)

    if match:
        title = match.group(1)
    else:
        title = input_string

    title = title.replace(' ', '_')

    return title

# YouTubeからタイトルと説明を取得する関数
def get_youtube_info(url):
    ydl_opts = {
        'quiet': True,
        'format': 'bestaudio/best',
        'extract_flat': True,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=False)
        response = {
            'title': info.get('title', None),
            'description': info.get('description', None)

        }
        if 'title' in response:
            response['song_title'] = get_song_title(response['title'])

        return response

# Whisperで音声を解析し、タイムスタンプ付きの歌詞を抽出する関数
def transcribe_audio(audio_path, initial_prompt=None):
    model = whisper.load_model("base")
    result = model.transcribe(audio_path)
    return result["segments"]

# タイムスタンプ付きの歌詞をCSVファイルに保存する関数
def save_to_csv(segments, csv_path):
    data = []
    for segment in segments:
        start = segment["start"]
        end = segment["end"]
        text = segment["text"]
        data.append([start, end, text])

    df = pd.DataFrame(data, columns=["start", "end", "text"])
    df.to_csv(csv_path, index=False, encoding='utf-8')

## scraping.py
import requests
from bs4 import BeautifulSoup
from googlesearch import search
import random

# プロキシのリスト（例として一部のプロキシを使用）
proxies = [
    'http://123.123.123.123:8080',
    'http://124.124.124.124:8080',
    # 他のプロキシを追加
]

# ユーザーエージェントのリスト
user_agents = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:70.0) Gecko/20100101 Firefox/70.0',
    # 他のユーザーエージェントを追加
]

# Google検索を使用してリンクを取得する関数
def get_lyrics_links(query):
    query = f"{query} site:azlyrics.biz"
    results = search(query, stop=1)
    return list(results)

def get_soup(url):
    proxy = {'http': random.choice(proxies)}
    user_agent = {'User-Agent': random.choice(user_agents)}

    response = requests.get(url, headers=user_agent, proxies=proxy)
    soup = BeautifulSoup(response.text, 'html.parser')

    return soup

def parse_lyrics(line):
    line = line.replace("\u3000", " ")
    return line

# URLから歌詞をスクレイピングする関数
def scrape_lyrics(url):

    soup = get_soup(url)
    lyrics = []
    konten_divs = soup.find_all('div', id='konten')

    if len(konten_divs) > 1:
        next_element = konten_divs[0].find_next_sibling()
        while next_element and next_element != konten_divs[1]:
            if next_element.name == 'p':
                verses = next_element.get_text().split("\n")

                verses = [parse_lyrics(verse) for verse in verses]
                lyrics += verses
            next_element = next_element.find_next_sibling()
        return lyrics
    else:
        return None

from IPython.display import Video
import pykakasi
from moviepy.editor import VideoFileClip
import os


def kks_to_passport(text):
    kks = pykakasi.kakasi()
    kakasi_result = kks.convert(text)

    text = ""
    for item in kakasi_result:
        text += item["passport"]

    return text

def parse_pykakasi_result(kakasi_result):
    """
    Add whitespace before kanji so that furigana parases correctly on phones
    """
    furigana_text = ""
    for item in kakasi_result:
        if item["orig"] == item["hira"] or item["orig"] == item["kana"]:
            furigana_text += item["orig"]
        else:
            furigana = item["hira"]
            furigana_text +=  " " + item["orig"] + f"[{furigana}]"

    furigana_text.strip()

    return furigana_text

def create_assets(df_transcription, youtube_url):
    """
    GenAnki or anki doesn't like assets that their path is inside subfolders,
    The files need to be in the ./ folder
    """

    kks = pykakasi.kakasi()

    video_info = get_youtube_info(youtube_url)
    song_title = video_info["song_title"]

    assets_metadata = []

    for i, row in df_transcription.iterrows():
        start_time = row["start"]
        end_time = row["end"]
        text = row["text"]

        kakasi_result = kks.convert(text)

        furigana_text = parse_pykakasi_result(kakasi_result)

        song_title = kks_to_passport(song_title)
        #os.makedirs(song_title, exist_ok=True)

        segment_audio_path = f"{song_title}_segment_{i}_audio.mp3"
        segment_video_path = f"{song_title}_segment_{i}.mp4"

        video = VideoFileClip(RAW_FILE_NAME)
        clip = video.subclip(start_time, end_time)

        #if save_video:
        #    clip.write_videofile(segment_video_path, codec="libx264")
        if segment_audio_path not in os.listdir("./"):
            clip.audio.write_audiofile(segment_audio_path)

        pitch_graph_path = f"{song_title}_segment_{i}_pitch" # get_pitched_text adds .png

        pitch_graph_path = get_pitched_text(text, filename=pitch_graph_path)

        assets_metadata.append({
            "expression": text,
            "furigana": furigana_text,
            "audio": segment_audio_path,
            "pitch_graph": pitch_graph_path
        })

    return assets_metadata


from time import sleep
import os
from bs4 import BeautifulSoup
import google_colab_selenium as gs
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
custom_options = Options()
# Add your custom options here
custom_options.add_argument('--lang=ja-JP')  # 日本語に設定
custom_options.add_argument("--enable-javascript")


driver = gs.Chrome(options=custom_options)

#this function downloads the pitch accent graph,
#the pitch accent graphs are saved in the pitch_graph folder as png
def get_pitched_text(text,filename=None):
# https://github.com/Tarikhoza/anki_add_ojad_pitch_plugin
    if "pitch_graph" not in os.listdir():
        os.mkdir("pitch_graph")

    #remove html tags and &nbsp;
    text = BeautifulSoup(f"<div>{text}</div>", "html.parser" ).get_text().replace("\xa0","")

    if filename == None:
        filename = text
    if f"{filename}.png" in os.listdir("./"):
        return f"{filename}.png"
    try:
        print("Getting graph from OJAD", text)
        driver.get("https://www.gavo.t.u-tokyo.ac.jp/ojad/phrasing")

        #putting text into the input field
        input_element = driver.find_element(By.ID,"PhrasingText")
        input_element.send_keys(text);

        #pressing the submit button and waiting for 5 seconds
        submit_button = driver.find_element(By.ID, "phrasing_submit_wrapper").find_element(By.TAG_NAME,"input")
        submit_button.click()
        sleep(5)

        #change css to remove unnecessery elements from page before making screenshot
        driver.execute_script("""
            var styleElement = document.createElement('style');
            styleElement.innerText = `input{display:none;} font{display:none} select{display:none} *{padding:0; margin:0;} #phrasing_main{width:fit-content} .ds_t{display:none}`
            document.body.appendChild(styleElement);
            const elements = document.querySelectorAll('*');
            elements.forEach(element => {
              element.style.cssText = 'background-color:white';
            });
        """)

        #making a screenshot of the generated pitch accent graph
        driver.save_screenshot("test.png")
        driver.find_element(By.ID,"phrasing_main").screenshot(f"{filename}.png")
        return f"{filename}.png"

    except Exception as e:
        print("Error: ", e)


import hashlib

def generate_deck_id(song_title):
  """Generates a deck ID based on a hash of the song title."""
  hash_object = hashlib.md5(song_title.encode())
  hex_dig = hash_object.hexdigest()
  deck_id = int(hex_dig, 16) % (1 << 30)  # Ensure it's within the desired range
  return deck_id

## anki_utils.py
import genanki
#import random
#print(random.randrange(1 << 30, 1 << 31))

ANKI_MODEL_NO = 2133834403

class Subs2SRSNote(genanki.Note):
  @property
  def guid(self):
    return genanki.guid_for(self.fields[0], self.fields[1])

SONG_MODEL = genanki.Model(
  ANKI_MODEL_NO,
  'Song2SRS',
  fields=[
    {'name': 'Expression'},
    {'name': 'Furigana'},
    {'name': 'Audio'},
    {'name': 'Pitch'}
  ],
  templates=[
    {
      'name': 'Card',
      'qfmt': '{{Expression}}<br>',
      'afmt': '{{FrontSide}}<hr id="answer">{{furigana:Furigana}}<br>{{Audio}}<br>{{Pitch}}',
    },
  ]
)

def create_anki_deck(song_title, assets):

    song_deck = genanki.Deck(
        ANKI_MODEL_NO,
        song_title
    )

    media_files = []

    for i, asset in enumerate(assets):

        audio_path = asset["audio"]
        pitch_graph_path = asset["pitch_graph"]

        song_note = genanki.Note(
            model=SONG_MODEL,
            fields=[
                asset["expression"],
                asset["furigana"],
                f"[sound:{audio_path}]",
                f'<img src="{pitch_graph_path}">'
            ]
        )

        song_deck.add_note(song_note)

        media_files.append(audio_path)
        media_files.append(pitch_graph_path)

    anki_package = genanki.Package(song_deck)
    anki_package.media_files = media_files

    return anki_package

def create_anki_deck_from_assets(assets, youtube_url):

    video_info = get_youtube_info(youtube_url)
    song_title = video_info["song_title"]

    song_deck_package = create_anki_deck(song_title, assets)


    package_path = f'{song_title}.apkg'
    song_deck_package.write_to_file(package_path)

    print(f"Deck saved to {package_path}")

    return package_path

# prompt: using torchaudio, convert a mp4 to a .wav, and load the .wav and embed it in the notebook to validate the conversion

import torchaudio
import os

def convert_mp4_to_wav(mp4_path, wav_path):
  """Converts an MP4 file to WAV using torchaudio."""
  waveform, sample_rate = torchaudio.load(mp4_path)
  torchaudio.save(wav_path, waveform, sample_rate)


def load_and_embed_wav(wav_path):
  """Loads a WAV file and embeds it in the notebook for validation."""
  waveform, sample_rate = torchaudio.load(wav_path)
  print(f"Loaded WAV file: {wav_path}")
  print(f"Sample rate: {sample_rate}")


In [None]:
# @title Download and transcribe video
youtube_url = "https://www.youtube.com/watch?v=eKoD2CRr_KA&ab_channel=yuzuofficial" # @param {type:"string"}

# 実行部分
video_path = 'video.mp4'
csv_path = 'lyrics.csv'

In [None]:
from IPython.display import Video

RAW_FILE_NAME = download_youtube_video(youtube_url)

video_info = get_youtube_info(youtube_url)
song_title = video_info["song_title"]

print(f"{song_title=} {RAW_FILE_NAME=}")

In [None]:
from IPython.display import Audio

from torchaudio.pipelines import HDEMUCS_HIGH_MUSDB_PLUS
from torchaudio.utils import download_asset
import torch
import pydub
import whisper
import whisper_timestamped
from torchaudio.transforms import Fade
import os
import logging
from tqdm import tqdm
import json
import pandas as pd

CONVERTED_FILE_NAME = RAW_FILE_NAME.split('.')[0] + '.wav'
VOCALS_FILE_NAME = "vocals.wav"

SPLITS_DIR_NAME = "splits"
SPLITS_TIMESTAMPS_FILE_NAME = "timestamps.txt"
TRANSCRIPTION_FILE_NAME = "transcription.json"

SPLITS_PADDING = 2000  # ms


logger = logging.getLogger(__name__)


def separate_sources(
    model,
    mix,
    segment=10.0,
    overlap=0.1,
    device=None,
):
    """
    Apply model to a given mixture. Use fade, and add segments together in order to add model segment by segment.

    Args:
        segment (int): segment length in seconds
        device (torch.device, str, or None): if provided, device on which to
            execute the computation, otherwise `mix.device` is assumed.
            When `device` is different from `mix.device`, only local computations will
            be on `device`, while the entire tracks will be stored on `mix.device`.
    """
    if device is None:
        device = mix.device
    else:
        device = torch.device(device)

    batch, channels, length = mix.shape

    chunk_len = int(sample_rate * segment * (1 + overlap))
    start = 0
    end = chunk_len
    overlap_frames = overlap * sample_rate
    fade = Fade(fade_in_len=0, fade_out_len=int(overlap_frames), fade_shape="linear")

    final = torch.zeros(batch, len(model.sources), channels, length, device=device)

    while start < length - overlap_frames:
        logger.debug(f"Demucs source separation: {start=}")
        chunk = mix[:, :, start:end]
        with torch.no_grad():
            out = model.forward(chunk)
        out = fade(out)
        final[:, :, :, start:end] += out
        if start == 0:
            fade.fade_in_len = int(overlap_frames)
            start += int(chunk_len - overlap_frames)
        else:
            start += chunk_len
        end += chunk_len
        if end >= length:
            fade.fade_out_len = 0
    return final

def extract_voice(target_location: str = './') -> str:
    """
    https://pytorch.org/audio/main/tutorials/hybrid_demucs_tutorial.html#spectrograms-and-audio
    """
    logger.info("Extracting vocals")
    bundle = HDEMUCS_HIGH_MUSDB_PLUS
    model = bundle.get_model()
    device = torch.device("cpu")
    model.to(device)

    # We download the audio file from our storage. Feel free to download another file and use audio from a specific path
    song_file = os.path.join(target_location, CONVERTED_FILE_NAME)
    waveform, sample_rate = torchaudio.load(
        song_file
    )  # replace SAMPLE_SONG with desired path for different song
    waveform = waveform.to(device)

    if sample_rate != 44100:
        logger.warn("Warn: Resampling to 44100Hz", sample_rate=sample_rate)
        waveform = torchaudio.functional.resample(waveform, sample_rate, 44100)
        sample_rate = 44100

    # parameters
    segment: int = 10
    overlap = 0.1

    ref = waveform.mean(0)
    waveform = (waveform - ref.mean()) / ref.std()  # normalization

    sources = separate_sources(
        model,
        waveform[None],
        device=device,
        segment=segment,
        overlap=overlap,
    )[0]
    sources = sources * ref.std() + ref.mean()

    sources_list = model.sources
    sources = list(sources)

    audios = dict(zip(sources_list, sources))

    output_file_name = os.path.join(target_location, VOCALS_FILE_NAME)
    torchaudio.save(output_file_name, audios["vocals"], sample_rate)

    return output_file_name

def split(target_location: str = './') -> str:
    logger.info("Splitting vocals")
    vocals_file = os.path.join(target_location, VOCALS_FILE_NAME)
    sound = pydub.AudioSegment.from_file(vocals_file, format="wav")
    chunk_timestamps = pydub.silence.detect_nonsilent(
        sound, min_silence_len=5000, silence_thresh=-32
    )

    chunk_timestamps = [
        (
            max(chunk_timestamps[i][0] - SPLITS_PADDING, 0),
            min(chunk_timestamps[i][1] + SPLITS_PADDING, len(sound)),
        )
        for i in range(len(chunk_timestamps))
    ]
    chunks = [
        sound[chunk_timestamps[i][0] : chunk_timestamps[i][1]]
        for i in range(len(chunk_timestamps))
    ]

    splits_dir = os.path.join(target_location, SPLITS_DIR_NAME)
    if not os.path.exists(splits_dir):
        os.makedirs(splits_dir)

    logger.info("Splitting vocals", chunks_count=len(chunks))
    for i in range(len(chunks)):
        chunk = chunks[i]

        chunk.export(
            os.path.join(splits_dir, f"{i}.wav"),
            format="wav",
        )

    with open(os.path.join(splits_dir, SPLITS_TIMESTAMPS_FILE_NAME), "w") as f:
        for timestamp in chunk_timestamps:
            f.write(f"{timestamp[0]} {timestamp[1]}\n")

    return os.path.join(target_location, SPLITS_DIR_NAME)


def transcribe_segments(target_location: str = './', language: str = 'japanese'):
    logger.info("Transcribing audio")
    splits_dir = os.path.join(target_location, SPLITS_DIR_NAME)
    model = whisper_timestamped.load_model("openai/whisper-small", device="cuda")
    logger.debug("Model loaded")

    # read timestamp delays
    with open(os.path.join(splits_dir, SPLITS_TIMESTAMPS_FILE_NAME), "r") as f:
        chunk_timestamps = [
            [float(i) for i in line.split(" ")]
            for line in f.readlines()
            if line.strip()
        ]
    logger.debug("Timestamps loaded", chunk_timestamps=chunk_timestamps)

    full_transcription = {"segments": [], "text": ""}

    whisper_model = whisper.load_model("small")

    transcription_total = []

    # list files in splits_dir
    for i in tqdm(range(len(chunk_timestamps)), desc="Transcribing"):
        logger.info(f"transcribing split {i}")
        file_name = f"{i}.wav"

        transcription = whisper_model.transcribe(
            audio=os.path.join(splits_dir, file_name),
            temperature=0,
            condition_on_previous_text=False,
            verbose=True,
            language=language
        )

        timestamp_adjustment = chunk_timestamps[i][0] / 1000

        data = []
        for segment in transcription["segments"]:
            start = segment["start"] + timestamp_adjustment
            end = segment["end"] + timestamp_adjustment
            text = segment["text"]
            data.append([start, end, text])

        transcription_total += data


    csv_path = 'transcription.csv'


    df = pd.DataFrame(transcription_total, columns=["start", "end", "text"])
    df.to_csv(csv_path, index=False, encoding='utf-8')



def transcribe(target_location: str = './', language: str = 'japanese'):
    logger.info("Transcribing audio")
    splits_dir = os.path.join(target_location, SPLITS_DIR_NAME)
    model = whisper_timestamped.load_model("openai/whisper-small", device="cuda")
    logger.debug("Model loaded")

    # read timestamp delays
    with open(os.path.join(splits_dir, SPLITS_TIMESTAMPS_FILE_NAME), "r") as f:
        chunk_timestamps = [
            [float(i) for i in line.split(" ")]
            for line in f.readlines()
            if line.strip()
        ]
    logger.debug("Timestamps loaded", chunk_timestamps=chunk_timestamps)

    full_transcription = {"segments": [], "text": ""}

    # list files in splits_dir
    for i in tqdm(range(len(chunk_timestamps)), desc="Transcribing"):
        file_name = f"{i}.wav"

        result = whisper_timestamped.transcribe(
            model,
            temperature=0,
            audio=os.path.join(splits_dir, file_name),
            task="transcribe",
            condition_on_previous_text=False,
            language=language,
        )
        timestamp_adjustment = chunk_timestamps[i][0] / 1000

        result_adjusted = {
            "text": result["text"],
            "segments": [
                {
                    **s,
                    "end": s["end"] + timestamp_adjustment,
                    "start": s["start"] + timestamp_adjustment,
                    "words": [
                        {
                            **w,
                            "end": w["end"] + timestamp_adjustment,
                            "start": w["start"] + timestamp_adjustment,
                        }
                        for w in s["words"]
                    ],
                }
                for s in result["segments"]
            ],
        }

        full_transcription = {
            "text": full_transcription["text"] + result_adjusted["text"],
            "segments": full_transcription["segments"] + result_adjusted["segments"],
        }

    with open(os.path.join(target_location, TRANSCRIPTION_FILE_NAME), "w") as f:
        json.dump(full_transcription, f)

"""
DONT DELETE
extract_voice()
transcribe_segments()
pd.read_csv("transcription.csv")
transcribe(language="japanese")
with open(
    os.path.join('./', TRANSCRIPTION_FILE_NAME), "r"
) as f:
    data = json.load(f)
segments = [d["text"] for d in data["segments"]]

segments"""

In [None]:
convert_mp4_to_wav(RAW_FILE_NAME, CONVERTED_FILE_NAME)

# Apply whisper

In [None]:
from moviepy.editor import AudioFileClip
from IPython.display import Audio
import torchaudio

def show_audio_segment(audio_file_path, start_time, end_time):
    """
    Loads an audio file, extracts a segment, and plays it.

    Args:
        audio_file_path: Path to the audio file.
        start_time: Start time of the segment in seconds.
        end_time: End time of the segment in seconds.
    """

    video = AudioFileClip(audio_file_path)
    clip = video.subclip(start_time, end_time)
    clip.write_audiofile("test.wav")
    waveform, sample_rate = torchaudio.load("test.wav")
    return Audio(waveform, rate=sample_rate)

from moviepy.editor import AudioFileClip
from IPython.display import Audio

show_audio_segment(CONVERTED_FILE_NAME, 5, 13)

In [None]:
"""import torch, gc

del whisper_model
gc.collect()
torch.cuda.empty_cache()"""

In [None]:

if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")

In [None]:
model = whisper_timestamped.load_model("openai/whisper-medium", device=device)

transcription = whisper_timestamped.transcribe(
    model,
    temperature=0,
    audio=CONVERTED_FILE_NAME,
    task="transcribe",
    condition_on_previous_text=False,
    language="japanese",
    verbose=True,
)

In [None]:
"""import whisper

whisper_model = whisper.load_model("small", device="cuda")

transcription = whisper_model.transcribe(
    audio=CONVERTED_FILE_NAME,
    temperature=0,
    condition_on_previous_text=False,
    verbose=True,
    language="japanese"
)"""

In [None]:

from typing import List, Dict, Optional
from pydantic import BaseModel


class Word(BaseModel):
    text: str
    start: float
    end: float
    confidence: float


class TranscriptionSegment(BaseModel):
    id: int
    seek: int
    start: float
    end: float
    text: str
    tokens: List[int]
    temperature: float
    avg_logprob: float
    compression_ratio: float
    no_speech_prob: float
    confidence: float
    words: List[Word]

import unicodedata
import pykakasi
from fuzzywuzzy import fuzz


def fuzzy_match_text(all_text, text_to_match):
    best_match = None
    best_score = 0

    for line in all_text:
        line = line.strip()
        if line:
            score = fuzz.ratio(line, text_to_match)
            if score > best_score:
                best_score = score
                best_match = line

    return best_match, best_score

def translate_to_kana(text):
    kks = pykakasi.kakasi()
    kks_result = kks.convert(text)
    result = ""
    for _ in kks_result:
        # Break english sentences into different words
        if _["kana"] == _["orig"]:
            result += _["orig"]
        elif _["hira"] == _["orig"]:
            result += _["hira"]
        else:
            result += _["hira"]
    return result


def translate_to_romaji(text):
    return ' '.join(translate_to_romaji_list(text))

def translate_to_romaji_list(text):
    kks = pykakasi.kakasi()
    result = kks.convert(text)
    romaji = []
    for _ in result:
        # Break english sentences into different words
        if _["hepburn"] == _["orig"]:
            romaji += _["orig"].split(" ")
        else:
            romaji += [_["hepburn"]]
    return romaji

def get_kakasi_results(text):
    kks = pykakasi.kakasi()
    result = kks.convert(text)
    romaji = []
    original = []
    for _ in result:
        # Break english sentences into different words
        if _["hepburn"] == _["orig"]:
            original += _["orig"].split(" ")
            romaji += _["orig"].split(" ")
        else:
            original += [_["orig"]]
            romaji += [_["hepburn"]]


    return list(zip(original, romaji))

# iterative approach:
# - after finding the best match
# - add the previous segment of transcriptions test, and the next one as well, if they exists (handle edge cases)
# - also, apply pikakasi to turn it into romaji, to ease the search, try adding 1 to 5 tokenes from both the previous and the next segment
# - this is combinatorial so we'd have 6 * 6 (considering the 0 add in either direction) possible variations to get the best fuzzy score from

def beam_search(verse, transcription_texts, current_index):

    current_match = transcription_texts[current_index]
    best_match_romaji = translate_to_romaji(current_match)
    # Initialize tracking for the best combination

    verse_romaji = translate_to_romaji(verse)
    best_combination = best_match_romaji
    best_prev_tokens = 0
    best_next_tokens = 0
    best_score = fuzz.ratio(best_match_romaji, verse_romaji)

    combinations = [best_match_romaji]

    previous_segment = translate_to_romaji_list(transcription_texts[current_index - 1]) if current_index > 0 else []
    next_segment = translate_to_romaji_list(transcription_texts[current_index + 1]) if current_index < len(transcription_texts) - 1 else []

    max_prev_tokens = min(len(previous_segment), 10)
    max_next_tokens = min(len(next_segment), 10)

    for i in range(max_prev_tokens + 1):  # Up to max_prev_tokens from previous segment
        for j in range(max_next_tokens + 1):  # Up to max_next_tokens from next segment
            # Start with the original match
            combination = best_match_romaji

            # Add tokens from the previous segment if available
            if len(previous_segment) > 0 and i > 0:
                combination = ' '.join(previous_segment[-i:]) + ' ' + combination

            # Add tokens from the next segment if available
            if len(next_segment) > 0 and j > 0:
                combination = combination + ' ' + ' '.join(next_segment[:j])

            # Calculate the fuzzy score for this combination
            score = fuzz.ratio(combination, verse_romaji)

            # Update if this combination is the best so far
            if score > best_score:
                best_combination = combination
                best_score = score
                best_prev_tokens = i
                best_next_tokens = j

    return best_combination, current_index, best_prev_tokens, best_next_tokens, best_score


In [None]:
# todo: save as json
transcription_segments = [TranscriptionSegment(**segment) for segment in transcription["segments"]]

In [None]:
transcription_segments

In [None]:
csv_path = "test.csv"

In [None]:
data = []
for segment in transcription_segments:
    start = segment.start
    end = segment.end
    text = segment.text
    data.append([start, end, text])



df = pd.DataFrame(data, columns=["start", "end", "text"])
df.to_csv(csv_path, index=False, encoding='utf-8')

In [None]:
df_transcription = pd.read_csv(csv_path)

In [None]:
df_transcription

In [None]:
query = ' '.join(df_transcription.iloc[2:5]["text"].tolist())

result = list(search(f"{query} site:.jp", stop=3))
result

print(result)

proxy = {'http': random.choice(proxies)}
user_agent = {'User-Agent': random.choice(user_agents)}

url = result[0]


In [None]:

driver = gs.Chrome(options=custom_options)

driver.get(url)
html_content = driver.page_source


In [None]:
#response = requests.get(url, headers=user_agent, proxies=proxy)
soup = BeautifulSoup(html_content, 'html.parser')

# https://stackoverflow.com/questions/61421079/beautifulsoup-get-text-ignoring-line-breaks-br/61423104
delimiter = '###'                           # unambiguous string
for line_break in soup.findAll('br'):       # loop through line break tags
    line_break.replaceWith(delimiter)

all_soup_text = soup.get_text('###').splitlines()

In [None]:
all_soup_text

In [None]:
all_lyrics = ' '.join(df_transcription["text"].tolist())
best_match_whole_lyrics, best_score = fuzzy_match_text(all_soup_text, all_lyrics)

best_match_whole_lyrics = unicodedata.normalize('NFKC', best_match_whole_lyrics)
verses = best_match_whole_lyrics.split("###")

verses = [verse for verse in verses if len(verse) > 0]

seen = set()
verses = [verse for verse in verses if verse not in seen and not seen.add(verse)]


In [None]:
verses

In [None]:
transcription_texts = df_transcription["text"].tolist()

romaji_transcriptions = [translate_to_romaji(_) for _ in transcription_texts]

best_combinations = []

for i_verse, verse in enumerate(verses):

    best_score = 0

    verse_romaji = translate_to_romaji(verse)

    scores = [(i, fuzz.ratio(_, verse_romaji)) for i, _ in enumerate(romaji_transcriptions)]

    scores = sorted(scores, key=lambda x : x[1], reverse=True)

     # Track best match across all beam search results
    final_index = -1
    final_best_comination = ""
    final_prev_tokens = 0
    final_next_tokens = 0
    final_best_score = 0

    # Apply beam search to top candidates
    for index, initial_score in scores[:5]:
        combination, _, prev_tokens, next_tokens, score = beam_search(verse, transcription_texts, index)

        if score >= final_best_score:
            final_index = index
            final_best_comination = combination
            final_prev_tokens = prev_tokens
            final_next_tokens = next_tokens
            final_best_score = score

    print(f"Verse: {verse} {i_verse}")
    print(f"Transcription Index: {final_index}")
    print(f"Verse romaji: {verse_romaji}")
    print(f"Best combination: '{final_best_comination}'")
    print(f"Score: {best_score}")
    print(f"Tokens from previous: {final_prev_tokens}, Tokens from next: {final_next_tokens}")
    print(f"Final Best Score: {final_best_score}\n")

    print("-" * 50)

    best_combinations.append((verse, final_index, final_prev_tokens, final_next_tokens))

    # Output the best combination with scores and token counts






In [None]:
def compile_best_combination(
    transcription_segments: List[TranscriptionSegment],
    verse: str,
    index: int,
    prev_tokens: int,
    next_tokens: int
) -> None:
    """
    Compiles the best transcription segment combination for a given verse by
    analyzing adjacent transcription segments and their token information.

    Args:
        transcription_segments (List[TranscriptionSegment]): List of transcription segments.
        verse (str): The target verse to match.
        index (int): Current segment index.
        prev_tokens (int): Number of tokens to consider from the previous segment.
        next_tokens (int): Number of tokens to consider from the next segment.

    Returns:
        None: This function modifies the `transcription_segments` in place.
    """

    segment = transcription_segments[index]
    transcription_text = segment.text
    transcription_start = segment.start
    transcription_end = segment.end

    words_prepended = ""
    words_added = ""

    # Handling the previous segment case if prev_tokens is specified
    if prev_tokens and index > 0:
        previous_segment_romaji = translate_to_romaji_list(transcription_segments[index - 1].text)

        kks_previous = get_kakasi_results(transcription_segments[index - 1].text)

        words_to_prepend = ''.join(word[0] for word in kks_previous[-prev_tokens:])
        print("Words to prepend from previous segment:", words_to_prepend)

        bef_segment = transcription_segments[index - 1]
        words_prepended = ""

        for word in reversed(bef_segment.words):
            words_prepended = word.text + words_prepended
            transcription_start = word.start
            if words_to_prepend in words_prepended:
                break

    # Handling the next segment case if next_tokens is specified
    if next_tokens and index < len(transcription_segments) - 1:
        next_segment_romaji = translate_to_romaji_list(transcription_segments[index + 1].text)

        kks_transcription = get_kakasi_results(transcription_segments[index + 1].text)
        originals = [word[0] for word in kks_transcription]

        next_segment = transcription_segments[index + 1]
        words_to_add = ''.join(originals[:next_tokens])


        for word in next_segment.words:
            words_added += word.text
            transcription_end = word.end
            if words_to_add in words_added:
                break


    transcription_start -= 0.25
    transcription_end += 0.25

    transcription_text = words_prepended + transcription_text + words_added

    print(f"{verse=}")

    print(f"{translate_to_romaji(verse)=}")

    print(f"{translate_to_romaji(transcription_text)=} {transcription_start=} {transcription_end=}")

    return (verse, transcription_text, transcription_start, transcription_end)


#verse, transcription_text, transcription_start, transcription_end = compile_best_combination(transcription_segments, *best_combinations[5])

In [None]:

class Transcription(BaseModel):
    text: str
    start: float
    end: float

final_transcription = []
errors = []

for _ in best_combinations:
    verse, transcription_text, transcription_start, transcription_end = compile_best_combination(transcription_segments, *_)

    _data = {
        "text": verse,
        "whisper_text": transcription_text,
        "start": transcription_start,
        "end": transcription_end,
        "fuzz_ratio": fuzz.ratio(translate_to_romaji(verse), translate_to_romaji(transcription_text))
    }

    if _data["fuzz_ratio"] < 60:
        errors.append(_data)
    else:
        final_transcription.append(_data)


In [None]:

transcriptions_for_cards = pd.DataFrame(final_transcription).groupby("text").first().reset_index().sort_values(by="start").reset_index(drop=True)

transcriptions_for_cards

In [None]:
errors_df = pd.DataFrame(errors).groupby("text").first().reset_index().sort_values(by="start").reset_index(drop=True)
errors_df

In [None]:
for i, transcription in transcriptions_for_cards.iterrows():

    verse = transcription["text"]

    print(verse)
    print(translate_to_kana(verse))
    print(translate_to_romaji(verse))

    display(show_audio_segment(CONVERTED_FILE_NAME, transcription["start"], transcription["end"]))

    break

In [None]:
test = transcriptions_for_cards.iloc[0]
test

In [None]:
import cv2
import numpy as np
from moviepy.editor import VideoFileClip
import PIL as pil

def get_keyframe(video_path, start_time, end_time):
    video = VideoFileClip(video_path).subclip(start_time, end_time)
    sift = cv2.SIFT_create()
    keyframes = []

    for t in np.linspace(0, video.duration, num=10):  # Sample 10 frames across the segment
        frame = video.get_frame(t)
        gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
        kp = sift.detect(gray, None)
        keyframes.append((t, len(kp), frame))

    # Choose the frame with the most features (highest keypoint count)
    keyframe = max(keyframes, key=lambda x: x[1])[2]
    return keyframe

def extact_segment_thumbnails(df_transcription, youtube_url):

    video_info = get_youtube_info(youtube_url)

    song_title = video_info["song_title"]

    for i, row in df_transcription.iterrows():
        start_time = row["start"]
        end_time = row["end"]
        text = row["text"]
        # extract an interesting thumbnail


        keyframe = get_keyframe(RAW_FILE_NAME, start_time, end_time)

        thumbnail_filename = f"{song_title}_segment_{i}_thumbnail.png"

        # Convert keyframe to an image and save it
        thumbnail_image = pil.Image.fromarray(cv2.cvtColor(keyframe, cv2.COLOR_BGR2RGB))
        thumbnail_image.save(thumbnail_filename)

        return thumbnail_filename


        break

thumbnail = extact_segment_thumbnails(transcriptions_for_cards, youtube_url)

In [None]:
from IPython.display import display, Image

display(Image(filename=thumbnail))


In [None]:
assets = create_assets(transcriptions_for_cards, youtube_url)

In [None]:
assets

In [None]:
package_path = create_anki_deck_from_assets(assets, youtube_url)

In [None]:
from google.colab import files
files.download(package_path)

In [None]:
raise ValueError("stop here")

In [None]:
# Great Days_segment_33_audio.mp3

segment = 'Great Days_segment_33_audio.mp3'



# prompt: load video.wav into an spectograph, identify the f_0 pitch accent of japanese audio

import librosa
import librosa.display
import matplotlib.pyplot as pl
import os
import numpy as np

import torchaudio
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np

# Load WAV and plot fundamental frequency on spectrogram
def plot_f0_on_spectrogram(wav_path):
    y, sr = librosa.load(wav_path)

    # Compute the spectrogram
    D = librosa.stft(y)
    S_db = librosa.amplitude_to_db(np.abs(D), ref=np.max)

    # Plot spectrogram
    plt.figure(figsize=(14, 5))
#    librosa.display.specshow(S_db, sr=sr, x_axis='time', y_axis='log')
  #  plt.colorbar(format='%+2.0f dB')
 #   plt.title('Spectrogram with Fundamental Frequency (f₀)')

    # Compute f₀ using librosa's pyin
    f0, voiced_flag, voiced_probs = librosa.pyin(y, fmin=librosa.note_to_hz('C2'), fmax=librosa.note_to_hz('C7'))

    # Overlay f₀ on spectrogram
    times = librosa.times_like(f0)
    plt.plot(times, f0, color='black', label="Fundamental Frequency (f₀)")
    plt.legend(loc="upper right")
    plt.show()

# File paths
segment = 'Great Days_segment_33_audio.mp3'
wav_path = segment.replace('.mp3', '.wav')

# Convert and plot
convert_mp4_to_wav(segment, wav_path)
#plot_f0_on_spectrogram(wav_path)

# Whisper base model experimentation

- Temperature 0 had a strange behaviour: after 8 sentences, it repeats the same transcriptions 149 times

# Experiment with pykakasi

In [None]:
import pykakasi

text = "雲のひれ間に射す 光がほら降り注ぎ"


kks = pykakasi.kakasi()

kakasi_result = kks.convert(text)

furigana_text = ""
for item in kakasi_result:
    if item["orig"] == item["hira"] or item["orig"] == item["kana"]:
        furigana_text += item["orig"]
    else:
        furigana = item["hira"]
        furigana_text +=  " " + item["orig"] + f"[{furigana}]"

furigana_text.strip()

In [None]:
kakasi_result

# Demucs mix separation


https://pytorch.org/audio/main/tutorials/hybrid_demucs_tutorial.html#run-model

In [None]:
pip install mir_eval

In [None]:
bundle = HDEMUCS_HIGH_MUSDB_PLUS

model = bundle.get_model()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model.to(device)

sample_rate = bundle.sample_rate

print(f"Sample rate: {sample_rate}")

In [None]:




def plot_spectrogram(stft, title="Spectrogram"):
    magnitude = stft.abs()
    spectrogram = 20 * torch.log10(magnitude + 1e-8).numpy()
    _, axis = plt.subplots(1, 1)
    axis.imshow(spectrogram, cmap="viridis", vmin=-60, vmax=0, origin="lower", aspect="auto")
    axis.set_title(title)
    plt.tight_layout()

In [None]:
waveform, sample_rate = torchaudio.load(wav_path)  # replace SAMPLE_SONG with desired path for different song
waveform = waveform.to(device)
mixture = waveform

# parameters
segment: int = 10
overlap = 0.1

print("Separating track")

ref = waveform.mean(0)
waveform = (waveform - ref.mean()) / ref.std()  # normalization

sources = separate_sources(
    model,
    waveform[None],
    device=device,
    segment=segment,
    overlap=overlap,
)[0]
sources = sources * ref.std() + ref.mean()

sources_list = model.sources
sources = list(sources)

audios = dict(zip(sources_list, sources))

In [None]:
N_FFT = 4096
N_HOP = 4
stft = torchaudio.transforms.Spectrogram(
    n_fft=N_FFT,
    hop_length=N_HOP,
    power=None,
)

In [None]:

segment_start = 10
segment_end = 15

frame_start = segment_start * sample_rate
frame_end = segment_end * sample_rate

drums_spec = audios["drums"][:, frame_start:frame_end].cpu()

bass_spec = audios["bass"][:, frame_start:frame_end].cpu()

vocals_spec = audios["vocals"][:, frame_start:frame_end].cpu()

other_spec = audios["other"][:, frame_start:frame_end].cpu()

mix_spec = mixture[:, frame_start:frame_end].cpu()

In [None]:
# Mixture Clip
plot_spectrogram(stft(mix_spec)[0], "Spectrogram - Mixture")
Audio(mix_spec, rate=sample_rate)

In [None]:
def output_results(predicted_source: torch.Tensor, source: str):
    plot_spectrogram(stft(predicted_source)[0], f"Spectrogram - {source}")
    return Audio(predicted_source, rate=sample_rate)


output_results(vocals_spec, "vocals")

In [None]:
%pip install anki

In [None]:
#!rm -rf /content/japanese
!rm /content/japanese

In [None]:
import anki
from anki.collection import ImportAnkiPackageOptions, ImportAnkiPackageRequest
import os


col_name = "japanese"
if os.path.exists(col_name):
    os.remove(col_name)
    #os.remove(f"{col_name}-wal")

col = anki.collection.Collection(col_name)
os.remove(col_name)
os.remove(f"{col_name}-wal")


col.import_anki_package(
    ImportAnkiPackageRequest(
        package_path="./Japanese__1.Kanji Study Words.apkg",
        options=ImportAnkiPackageOptions(
            with_scheduling=True, with_deck_configs=True
        ),
    )
)

In [None]:
# TODO: deck id changes on loads
# Get deck_id
col.decks.all_names_and_ids()

In [None]:
deck_id = 1730494531756

In [None]:
# Get node ids
card_ids = col.decks.cids(deck_id)
note_types = set()
for card_id in card_ids:
    note_type = col.get_card(card_id).note_type()
    note_types |= set([(note_type["id"], note_type["name"])])


list(note_types)

In [None]:
# Select a note type
note_type_id = 1713569808917

note_model = col.models.get(note_type_id)

pitch = col.models.new_field("Pitch")

pitch["ord"] = len(note_model['flds'])
col.models.add_field(note_model, pitch)

In [None]:
note_ids = []
deck_card_ids = col.decks.cids(deck_id)
for cid in deck_card_ids:
    c = col.get_card(cid)
    if c.note_type()['id'] == note_type_id and c.nid not in note_ids:
        note_ids.append(c.nid)


choices = [nt['name'] for nt in col.models.get(note_type_id)['flds']]

choices

In [None]:
col.models.update_dict(note_model)

In [None]:
expr_fld = "Sentence"
reading_fld = "SentenceFurigana"
output_fld = "Pitch"

In [None]:
import google_colab_selenium as gs

from selenium.webdriver.chrome.options import Options
custom_options = Options()
# Add your custom options here
custom_options.add_argument('--lang=ja-JP')  # 日本語に設定
custom_options.add_argument("--enable-javascript")

driver = gs.Chrome(options=custom_options)

In [None]:
import shutil

not_found_list = []
num_updated = 0
num_already_done = 0
num_svg_fail = 0

media_files = []

for nid in note_ids:
    # set up note access
    note = col.get_note(nid)

    filename = f"{nid}_pitch"

    if len(note.fields) == note._field_index(output_fld):
        note.fields.append('')


    # check for existing illustrations
    has_auto_accent = '<!-- accent_start -->' in note[output_fld]
    has_manual_accent = '<!-- user_accent_start -->' in note[output_fld]
    if has_auto_accent or has_manual_accent:
        # already has a pitch accent illustration
        num_already_done += 1
        media_files.append(f'{filename}.png')
        continue
    # determine accent pattern
    expr = note[expr_fld].strip()
    reading = note[reading_fld].strip()
    # remove brackets from furigana anotations and remove spaces
    reading = "".join(re.split("\[|\]", reading)[::2])

    # generate png on OJAD website

    img = get_pitched_text(reading, filename = filename)
    if not img:
        num_svg_fail += 1
        continue
    if len(note[output_fld]) > 0:
        separator = '<br><hr><br>'
    else:
        separator = ''

    media_files.append(f'{filename}.png')

    # extend and save note
    note[output_fld] = (
        '{}<!-- accent_start -->{}{}<!-- accent_end -->'
        ).format(note[output_fld], separator, f"<img src='{img}'/>")  # add img
    col.update_note(note)
    num_updated += 1


media_folder = col.media.dir()

print(media_folder)


In [None]:

media_files = []

for nid in note_ids:
    filename = f"{nid}_pitch"
    media_files.append(f'{filename}.png')

In [None]:
media_files

In [None]:
for file in media_files:
    shutil.copyfile(f"./{file}", f"{media_folder}/{file}")

In [None]:
col.decks.get(deck_id)["name"]

In [None]:
col.decks.rename(deck_id, 'Japanese::___Kanji Study Words')

In [None]:
print("hola")

In [None]:
from anki.collection import DeckIdLimit, NoteIdsLimit
import os

#os.makedirs("test")

export_options = ExportAnkiPackageOptions(
    with_scheduling=True, with_deck_configs=True, with_media=True
)

# Deck Limit only seems to work with a single deck,
# So you could put the decks that you want to export into a subdeck of a bigger one
# Or, concatenate the list of notes ids for the few decks that you want to update
# to avoid deck layout changes
export_limit = DeckIdLimit(deck_id=deck_id)


export_limit = NoteIdsLimit(note_ids=note_ids)

col.export_anki_package(
    out_path="test/japanese_pitch4.apkg",
    options=export_options,
    limit=export_limit
)