In [1]:
#!pip install git+https://github.com/openai/whisper.git
!pip install git+https://github.com/guillaumekln/faster-whisper.git
!pip install pytube
!pip install niconico.py

Collecting git+https://github.com/guillaumekln/faster-whisper.git
  Cloning https://github.com/guillaumekln/faster-whisper.git to /tmp/pip-req-build-x4c054vd
  Running command git clone --filter=blob:none --quiet https://github.com/guillaumekln/faster-whisper.git /tmp/pip-req-build-x4c054vd
  Resolved https://github.com/guillaumekln/faster-whisper.git to commit 5c17de17713f65929c7c33add3a9735ff75a945c
  Preparing metadata (setup.py) ... [?25l[?25hdone


In [2]:
#import whisper
from faster_whisper import WhisperModel
from pytube import YouTube, Channel
from niconico import NicoNico
import pandas as pd
from random import shuffle, randint
import concurrent.futures as cf
import os
from tqdm import tqdm
import gc
import logging
from joblib import Parallel, delayed

In [3]:
from google.colab import drive
drive.mount("/content/drive")

db_dir = "/content/drive/MyDrive/kato-db/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
import requests
import re
from niconico import Cookies
from datetime import datetime, timedelta

def _make_cookie_tuple(name: str, cookie: str, domain = ".nicovideo.jp"):
    cookies = Cookies()
    cookies[name] = cookie
    for key, value in (
        ("domain", domain), ("path", "/"),
        ("expires", (datetime.now() + timedelta(days=365)).strftime("%a, %d-%b-%Y %X"))
    ):
        cookies[name][key] = value
    return cookies

def from_string(user_session: str):
    """ニコニコ動画上での認証済みのクッキーの値を直接指定しクラスを作成します。

    Parameters
    ----------
    user_session : str
        ユーザーセッションです。"""
    return _make_cookie_tuple("user_session", user_session)

def login(self, mail: str, password: str) -> NicoNico:
    """メールアドレスとパスワードを用いてログインを行います。
    二段階認証が有効になっているアカウントではログインすることができません。
    クッキーの中身を直接置き換える方法で認証をしてください。

    Parameters
    ----------
    mail : str
        ログインする際のメールアドレスもしくは電話番号です。
    password : str
        ログインする際のパスワードです。

    Raises
    ------
    LoginFailureException"""
    session = requests.session()

    res = session.post(
        "https://secure.nicovideo.jp/secure/login?site=niconico",
        params={
            "mail": mail,
            "password": password
        })

    #if res.headers.get("x-niconico-authflag") == ('1' or '3'):
    self.cookies = from_string(session.cookies.get("user_session"))
    return self
    # else:
    #     title_ptn = re.compile('<title>(.*?)</title>')
    #     title = title_ptn.search(res.text)
    #     if title:
    #         if "2段階認証" in title.group(1):
    #             raise LoginFailureException("Two-step verification is not supported.")
    # raise LoginFailureException("Login failed.")

In [5]:
def time_print(text):
  current_time = datetime.now()
  print(f"[{current_time.strftime('%H:%M:%S')}]: {text}")

In [6]:
'''
video_links.csvから書き起こしを行うクラス
'''
class Transcriber:
  model_num_to_name = {
      -1: "none",
      0: "tiny",
      1: "base",
      2: "small",
      3: "medium",
      4: "large-v2"
  }

  def __init__(self, model = 4):
    self.df_videos = pd.read_csv(db_dir + "video_links.csv")
    self.model = model   #使用するwhisperモデル
    #self.whisper_model = whisper.load_model(Transcriber.model_num_to_name[model])
    #self.whisper_model = WhisperModel('zh-plus/faster-whisper-large-v2-japanese-5k-steps', device="cuda", compute_type="float16")
    self.whisper_model = WhisperModel("large-v2", device="cuda", compute_type="float16")

  '''
  作動する。途中中断しても進捗は保存される。
  mode
  0->未書き起こしを対象に
  1->改善対象（モデルが下位のもの）を対象に
  '''
  def run(self, mode = 0):
    if mode == 0:
      index_target = self.get_untranscribeds()
    elif mode == 1:
      index_target = self.get_improvables()

    with cf.ThreadPoolExecutor(max_workers=2) as executor:
      futures = []
      for index in index_target:
        row = self.df_videos.iloc[index]
        futures.append(executor.submit(self.transcribe, index, row["link"], self.model))

      for future in cf.as_completed(futures):
        # with open(output_file_name, "a") as f:
        #   f.write("\n" + str(future.result()))
        gc.collect()

    #Parallel(n_jobs=2)([delayed(self.transcribe)(index, self.df_videos.iloc[index], self.model) for index in index_target])

    # for index in tqdm(index_target):
    #     row = self.df_videos.iloc[index]
    #     self.transcribe(index, row["link"], self.model)
    #     # self.df_videos.at[index, "transcribed"] = self.model
    #     # self.df_videos.to_csv(db_dir + "video_links.csv")

    #     with open(output_file_name, "a") as f:
    #       f.write("\n" + str(index))

        #print("saved:", index)

  '''
  未書き起こしの動画のインデックス一覧を取得。一覧はシャッフルされている
  '''
  def get_untranscribeds(self):
    index_untranscribed = list(self.df_videos[self.df_videos["transcribed"] == -1].index)
    shuffle(index_untranscribed)
    return index_untranscribed

  '''
  self.modelの方が上位のデータのインデックス一覧を取得。一覧はシャッフルされる。
  '''
  def get_improvables(self):
    index_improvable = list(self.df_videos[self.df_videos["transcribed"] < self.model].index)
    shuffle(index_improvable)
    return index_improvable

  '''
  書き起こしを行う。書き起こしファイルの保存、video_links.csvの更新も行われる
  '''
  def transcribe(self, index, link, model):
    time_print("start: " + str(index))

    #動画をダウンロード
    audio_file_name = str(index) + ".mp4"
    self.download_video(link, audio_file_name)

    #書き起こし
    #transcription = self.whisper_model.transcribe(audio_file, language = "ja")["segments"]
    segments, _ = self.whisper_model.transcribe(audio_file_name, language="ja")

    #データ整理
    transcription = []
    for segment in segments:
      transcription.append([segment.start, segment.end, segment.text])

    #ダウンロードした動画を削除（容量のため）
    os.remove(audio_file_name)

    #書き起こしファイルの保存
    transcription_file_name = db_dir + str(index) + "-" + str(model) + ".csv"
    df_transcription = pd.DataFrame(data=transcription, columns=['start', 'end', 'text'])
    df_transcription.to_csv(transcription_file_name, index = False)

    time_print("transcribed: " + str(index))

    return index

  def download_video(self, link, file_name):
    if "youtube" in link:
      YouTube(link).streams.filter(only_audio=True).first().download(filename=file_name)
    elif "nicovideo" in link:
      client = NicoNico()
      with open(db_dir + "nico_pass.txt", "r") as f:
        email, password = f.read().split()
      login(client, email, password)
      with client.video.get_video(link) as video:
        video.download(file_name)



In [None]:
model = 4

if __name__ == "__main__":
  transcriber =  Transcriber(model = model)
  transcriber.run()

[08:56:15]: start: 489
[08:56:15]: start: 746
