## VOICEVOX on Colab
利用前にVOICEVOXの利用規約をお読みください。  
https://zunko.jp/con_ongen_kiyaku.html

### Google Driveとの連携
Google Driveと連携することによって、VOICEVOXエンジンのダウンロードを省略することができ、起動時間を短縮できるようになります。   
voicevox_on_colabフォルダ内を使用します。その他のフォルダは読み書きしません。  
現時点では約1.1GB容量を消費します。（将来的に増える可能性があります）     

#### 実行方法
↓の再生ボタンをクリックします。  
URLと入力欄が表示されるので、URLのリンク先に飛んで認証を済ませ、発行されたコードをコピーして入力欄に貼り付けてください。  
`Mounted at /content/drive`と表示されればOKです。

In [None]:
#@title
from google.colab import drive
import os
if os.path.isdir("/content/drive"):
  print("Mounted at /content/drive")
else:
  drive.mount("/content/drive")

### メインプログラム
↓の再生ボタンをクリックすると実行できます。

In [None]:
#@title
import copy
import random
import re
import sys
import os
import psutil
import time
import multiprocessing
import subprocess

import requests
import json
from tempfile import TemporaryFile

from ipywidgets import HBox, VBox, Button, Text, HTML, Layout, FloatSlider, IntSlider, Tab, Box, Dropdown, IntProgress
from IPython.display import display, Audio, update_display, clear_output
from google.colab import files

clear_output()
progbar_desc = HTML(value="必要モジュールをインストールしています...")
init_progressbar = IntProgress(value=0,min=0,max=3)
display(VBox([progbar_desc, init_progressbar]))
try:
  import py7zr
except ImportError:
  subprocess.run(["pip", "install", "py7zr==0.16.3"])
  try:
    import py7zr
  except ImportError:
    print("プログラムのダウンロードに失敗しました。再度実行してください。")
    sys.exit(1)
init_progressbar.value = 1

class IntSliderWithId(IntSlider):
  def __init__(self, **kwargs):
    self.id = kwargs["id"]
    super().__init__(**kwargs)

class FloatSliderWithId(FloatSlider):
  def __init__(self, **kwargs):
    self.id = kwargs["id"]
    super().__init__(**kwargs)

class ButtonWithId(Button):
  def __init__(self, **kwargs):
    self.id = kwargs["id"]
    super().__init__(**kwargs)

class DropdownWithId(Dropdown):
  def __init__(self, **kwargs):
    self.id = kwargs["id"]
    super().__init__(**kwargs)

class TextWithId(Text):
  def __init__(self, **kwargs):
    self.id = kwargs["id"]
    super().__init__(**kwargs)

class EditingTexts:
  pass

class Defines:
  speakers = {
      (0, 50021): "四国めたん(あまあま)",
      (2, 50021): "四国めたん(ノーマル)",
      (4, 50021): "四国めたん(セクシー)",
      (6, 50021): "四国めたん(ツンツン)",
      (1, 50021): "ずんだもん(あまあま)",
      (3, 50021): "ずんだもん(ノーマル)",
      (5, 50021): "ずんだもん(セクシー)",
      (7, 50021): "ずんだもん(ツンツン)"
  }


class MainGUI:
  def __init__(self):
    # ヘッダー関係
    self.btn_texts = Button(description="文章一覧")
    self.btn_play = Button(description="再生")
    self.btn_write_out = Button(description="出力")
    self.btn_setting = Button(description="設定")
    self.btn_texts.on_click(self.synthesis_main)
    self.btn_write_out.on_click(self.write_out)
    self.btn_play.on_click(self.play)
    self.btn_setting.on_click(self.setting)
    css = """
    <style>
    audio {
      display: none;
    }
    </style>
    """
    self.header = VBox(
      [
        HBox(
          [
            self.btn_texts,
            self.btn_play,
            self.btn_write_out,
            self.btn_setting
          ]
        ),
        HTML(value="<hr>"+css)
      ]
    )
    clear_output()
    self.display_handle = display(VBox([self.header, VBox([])]), display_id=True)
    # 新規の入力欄
    self.input_new_text = Text(placeholder="音声合成したい文章を入力", layout=Layout(width="30em"))
    self.btn_confirm_new_text = Button(description="追加", layout=Layout(width="auto", border="solid 1px"))
    self.btn_confirm_new_text.on_click(self.add_text)
    # Audioqueryなど情報を格納する
    # valueはAQ、speaker、port, 元文章のタプル
    self.synth_datas = {}
    self.synthesis_main(None)
    self.last_datas_key = None
    self.default_speaker = 0
    self.output_count = 0

  def lock_widget(s):
    def _lock_widget(func):
      def wrapper(*args, **kwargs):
        widgets_name = {
            "play": args[0].btn_play,
            "confirm_new_text": args[0].btn_confirm_new_text,
            "setting": args[0].btn_setting,
            "write_out": args[0].btn_write_out,
        }
        widgets_name[s].disabled = True
        func(*args, **kwargs)
        widgets_name[s].disabled = False
      return wrapper
    return _lock_widget

  def update_layout(self, body):
    clear_output()
    self.display_handle.update(VBox([self.header, body]))

  def update_default_speaker(self, change):
    self.default_speaker = change.new

  @lock_widget("write_out")
  def write_out(self, b):
    if self.last_datas_key in self.synth_datas:
      ret, wav = self.get_synth_result(
        self.synth_datas[self.last_datas_key][0],
        self.synth_datas[self.last_datas_key][1],
        self.synth_datas[self.last_datas_key][2]
      )
      if not ret:
        return
      with open("tmp_output.wav", "wb") as f:
        f.write(wav)
      self.output_count += 1
      fixed_filename = re.sub(r'[\\/:*?"<>|]+', '', self.synth_datas[self.last_datas_key][3])
      output_name = f"{str(self.output_count).zfill(3)}_{Defines.speakers[(self.synth_datas[self.last_datas_key][1], self.synth_datas[self.last_datas_key][2])]}_{fixed_filename}.wav"
      subprocess.run(["ffmpeg", "-y", "-i", "tmp_output.wav", "-ar", "44100", output_name])
      files.download(output_name)

  @lock_widget("play")
  def play(self, b):
    if self.last_datas_key in self.synth_datas:
      ret, wav = self.get_synth_result(
        self.synth_datas[self.last_datas_key][0],
        self.synth_datas[self.last_datas_key][1],
        self.synth_datas[self.last_datas_key][2]
      )
      if not ret:
        return
      with open("tmp.wav", "wb") as f:
        f.write(wav)
      subprocess.run(["ffmpeg", "-y", "-i", "tmp.wav", "-ar", "44100", "tmp2.wav"])
      display(Audio("tmp2.wav", autoplay=True))

  def synthesis_main(self, b):
    texts = []
    for k in self.synth_datas.keys():
      stb = ButtonWithId(
        id=k,
        description=f'{Defines.speakers[(self.synth_datas[k][1],self.synth_datas[k][2])]} > {self.synth_datas[k][3]}',
        layout=Layout(width="fit-content", margin="5px 0 5px 0")
      )
      stb.on_click(self.synthesis)
      texts.append(stb)
    texts.append(HBox([self.input_new_text, self.btn_confirm_new_text]))
    self.update_layout(VBox(texts))
  
  @lock_widget("setting")
  def setting(self, b):
    title = HTML(value="<h1>設定</h1>")
    def_sp_cdd_opt = []
    for i in range(len(Defines.speakers)):
      def_sp_cdd_opt.append((Defines.speakers[list(Defines.speakers.keys())[i]], i))
    def_sp_cdd = Dropdown(
      options=def_sp_cdd_opt,
      value=self.default_speaker
    )
    def_sp_cdd.observe(self.update_default_speaker, names="value")
    set_default_speaker_desc = HTML(value="文章追加時に選択されるキャラを設定します。")
    self.update_layout(VBox([title, set_default_speaker_desc, def_sp_cdd]))

  def update_base_param(self, change):
    new_audio_query = copy.deepcopy(self.synth_datas[change.owner.id][0])
    if change.owner.description == "話速":
      new_audio_query["speedScale"] = change.new
    elif change.owner.description == "音高":
      new_audio_query["pitchScale"] = change.new
    elif change.owner.description == "抑揚":
      new_audio_query["intonationScale"] = change.new
    elif change.owner.description == "音量":
      new_audio_query["volumeScale"] = change.new
    elif change.owner.description == "開始無音":
      new_audio_query["prePhonemeLength"] = change.new
    elif change.owner.description == "終了無音":
      new_audio_query["postPhonemeLength"] = change.new
    self.synth_datas[change.owner.id] = (
      new_audio_query,
      self.synth_datas[change.owner.id][1],
      self.synth_datas[change.owner.id][2],
      self.synth_datas[change.owner.id][3]
    )

  def update_pitch(self, change):
    new_audio_query = copy.deepcopy(self.synth_datas[change.owner.id[0]][0])
    new_audio_query["accent_phrases"][change.owner.id[1]]["moras"][change.owner.id[2]]["pitch"] = change.new
    self.synth_datas[change.owner.id[0]] = (
      new_audio_query,
      self.synth_datas[change.owner.id[0]][1],
      self.synth_datas[change.owner.id[0]][2],
      self.synth_datas[change.owner.id[0]][3]
    )

  def update_acc(self, change):
    old_accs = copy.deepcopy(self.synth_datas[change.owner.id[0]][0]["accent_phrases"])
    old_accs[change.owner.id[1]]["accent"] = change.new
    ret, new_accs = self.get_mora_data(
      old_accs,
      self.synth_datas[change.owner.id[0]][1],
      self.synth_datas[change.owner.id[0]][2]
    )
    if not ret:
      return
    for vb in self.editors:
      try:
        if vb.children[0].children[0].id == change.owner.id:
          for p in vb.children[0].children[1].children[0].children:
            p.value = new_accs[p.id[1]]["moras"][p.id[2]]["pitch"]
      except AttributeError:
        pass
  
  def update_speaker(self, change):
    new_speaker, new_port = list(Defines.speakers.keys())[change.new]
    ret, audio_query = self.get_audio_query(self.synth_datas[change.owner.id][3], new_speaker, new_port)
    if not ret:
      return
    self.synth_datas[change.owner.id] = (audio_query, new_speaker, new_port, self.synth_datas[change.owner.id][3])
    self.synthesis(ButtonWithId(id=change.owner.id))

  def synthesis(self, b):
    self.last_datas_key = b.id
    synthesis_text = TextWithId(id=b.id, value=self.synth_datas[b.id][3], continuous_update=False)
    synthesis_text.observe(self.renew_text, names="value")
    cdd_opt = []
    for i in range(len(Defines.speakers)):
      cdd_opt.append((Defines.speakers[list(Defines.speakers.keys())[i]], i))
    cdd = DropdownWithId(
      id=b.id,
      options=cdd_opt,
      value=list(Defines.speakers.keys()).index(
        (self.synth_datas[b.id][1], self.synth_datas[b.id][2])
      )
    )
    cdd.observe(self.update_speaker, names="value")
    ss = FloatSliderWithId(
      id=b.id,
      value=self.synth_datas[b.id][0]["speedScale"],
      min=0.5,
      max=2.0,
      step=0.01,
      description="話速"
    )
    ss.observe(self.update_base_param, names="value")
    ps = FloatSliderWithId(
      id=b.id,
      value=self.synth_datas[b.id][0]["pitchScale"],
      min=-0.15,
      max=0.15,
      step=0.01,
      description="音高"
    )
    ps.observe(self.update_base_param, names="value")
    ints = FloatSliderWithId(
      id=b.id,
      value=self.synth_datas[b.id][0]["intonationScale"],
      min=0,
      max=2,
      step=0.01,
      description="抑揚"
    )
    ints.observe(self.update_base_param, names="value")
    vs = FloatSliderWithId(
      id=b.id,
      value=self.synth_datas[b.id][0]["volumeScale"],
      min=0,
      max=2,
      step=0.01,
      description="音量"
    )
    vs.observe(self.update_base_param, names="value")
    prepl = FloatSliderWithId(
      id=b.id,
      value=self.synth_datas[b.id][0]["prePhonemeLength"],
      min=0,
      max=1.5,
      step=0.01,
      description="開始無音"
    )
    prepl.observe(self.update_base_param, names="value")
    postpl = FloatSliderWithId(
      id=b.id,
      value=self.synth_datas[b.id][0]["postPhonemeLength"],
      min=0,
      max=1.5,
      step=0.01,
      description="終了無音"
    )
    postpl.observe(self.update_base_param, names="value")
    p1 = VBox([synthesis_text, cdd, ss, ps, ints, vs, prepl, postpl], layout=Layout(width="auto"))

    self.editors = []
    editors_width = 0
    for i in range(len(self.synth_datas[b.id][0]["accent_phrases"])):
      pitch = []
      for j in range(len(self.synth_datas[b.id][0]["accent_phrases"][i]["moras"])):
        if self.synth_datas[b.id][0]["accent_phrases"][i]["moras"][j]["pitch"] == 0:
          p = FloatSliderWithId(
            id=(b.id, i, j),
            value=self.synth_datas[b.id][0]["accent_phrases"][i]["moras"][j]["pitch"],
            min=0,
            max=6.5,
            step=0.01,
            description=self.synth_datas[b.id][0]["accent_phrases"][i]["moras"][j]["text"],
            orientation='vertical',
            disabled=True
          )
        else:
          p = FloatSliderWithId(
            id=(b.id, i, j),
            value=self.synth_datas[b.id][0]["accent_phrases"][i]["moras"][j]["pitch"],
            min=3,
            max=6.5,
            step=0.01,
            description=self.synth_datas[b.id][0]["accent_phrases"][i]["moras"][j]["text"],
            orientation='vertical',
            disabled=False
          )
        p.observe(self.update_pitch, names="value")
        pitch.append(p)
      acc = IntSliderWithId(
        id=(b.id, i),
        value=self.synth_datas[b.id][0]["accent_phrases"][i]["accent"],
        min=1,
        max=len(self.synth_datas[b.id][0]["accent_phrases"][i]["moras"]),
        step=1,
        orientation="horizontal",
        readout=False,
        continuous_update=False,
        layout=Layout(
           width=f'{len(self.synth_datas[b.id][0]["accent_phrases"][i]["moras"])*75-2}px',
           padding="0px 26px 0px 26px"
         )
      )
      acc.observe(self.update_acc, names="value")
      acc_width = int(acc.layout.width[:-2])+20
      editors_width += acc_width
      self.editors.append(
        Box(
          [
            VBox(
              [
                acc,
                Box(
                  [HBox(pitch)],
                  layout=Layout(
                      width=f"{acc_width}px"
                  )
                )
              ]
            )
          ],
          width=f"{acc_width}px"
        )
      )
      if self.synth_datas[b.id][0]["accent_phrases"][i]["pause_mora"] is not None:
        self.editors.append(
          VBox(
            [
              FloatSlider(
                value=0,
                min=0,
                orientation="horizontal",
                disabled=True,
                readout=False,
                layout=Layout(
                  width="0px",
                  padding="0px 25px 0px 25px"
                )
              ),
              FloatSlider(
                value=0,
                min=0,
                description=self.synth_datas[b.id][0]["accent_phrases"][i]["pause_mora"]["text"],
                orientation="vertical",
                disabled=True
              )
            ],
            layout=Layout(overflow="visible")
          )
        )
        editors_width += 80
    p2 = Box(
      children=[HBox(self.editors)],
      layout=Layout(
        overflow="hidden",
        display="block",
        width=f"{editors_width}px",
      )
    )

    tab = Tab()
    tab.children = [p1,p2]
    tab.set_title(0, "基本パラメータ")
    tab.set_title(1, "アクセント・ピッチ")
    self.update_layout(tab)

  def renew_text(self, change):
    ret, audio_query = self.get_audio_query(
      change.new,
      self.synth_datas[change.owner.id][1],
      self.synth_datas[change.owner.id][2]
    )
    if not ret:
      return
    self.synth_datas[change.owner.id] = (
      audio_query,
      self.synth_datas[change.owner.id][1],
      self.synth_datas[change.owner.id][2],
      change.new
    )
    self.synthesis(ButtonWithId(id=change.owner.id))

  @lock_widget("confirm_new_text")
  def add_text(self, b):
    new_text = self.input_new_text.value
    if new_text == "":
      return
    self.input_new_text.value = ""
    while True:
      new_key = random.randrange(999999)
      if new_key not in self.synth_datas.keys():
        break
    speaker, port = list(Defines.speakers.keys())[self.default_speaker]
    ret, audio_query = self.get_audio_query(new_text, speaker, port)
    if not ret:
      return
    self.synth_datas[new_key] = (audio_query, speaker, port, new_text)
    self.synthesis_main(None)

  def get_audio_query(self, text, speaker, port):
    try:
      r = requests.post(f"http://127.0.0.1:{port}/audio_query", params={"text":text, "speaker":speaker})
    except Exception as e:
      self.update_layout(HTML(value=f"通信エラーが発生しました。<br>{e}"))
      return False, {}
    if r.status_code != 200:
      self.update_layout(HTML(value=f"通信エラーが発生しました。(status_code={r.status_code})"))
      return False, {}
    audio_query = r.json()
    if len(audio_query["accent_phrases"]) == 0:
      self.update_layout(HTML(value="読みが空です。再度入力しなおしてください。"))
      return False, {}
    return True, audio_query
  
  def get_mora_data(self, acc, speaker, port):
    try:
      r = requests.post(f"http://127.0.0.1:{port}/mora_data", params={"speaker":speaker}, data=json.dumps(acc))
    except Exception as e:
      self.update_layout(HTML(value=f"通信エラーが発生しました。<br>{e}"))
      return False, []
    if r.status_code != 200:
      self.update_layout(HTML(value=f"通信エラーが発生しました。(status_code={r.status_code})"))
      return False, []
    return True, r.json()

  def get_synth_result(self, audio_query, speaker, port):
    try:
      r = requests.post(f"http://127.0.0.1:{port}/synthesis", params={"speaker":speaker}, data=json.dumps(audio_query))
    except Exception as e:
      self.update_layout(HTML(value=f"通信エラーが発生しました。<br>{e}"))
      return False, b""
    if r.status_code != 200:
      self.update_layout(HTML(value=f"通信エラーが発生しました。(status_code={r.status_code})"))
      return False, b""
    return True, r.content

def get_voicevox_engine(save_path):
  global init_progressbar
  global progbar_desc
  progbar_desc.value = "VOICEVOXエンジンをダウンロードしています..."
  try:
    r = requests.get("https://github.com/Hiroshiba/voicevox_engine/releases/download/0.8.0/linux-cpu.7z.001")
    if r.status_code != 200:
      raise Exception(f"エンジンのダウンロードに失敗しました({r.status_code})。")
  except Exception:
    init_progressbar.bar_style = "danger"
    print("VOICEVOXエンジンのダウンロードに失敗しました。ランタイムを出荷時状態にリセットし、再度実行してください。")
    raise
  progbar_desc.value = "VOICEVOXエンジンを展開しています..."
  with TemporaryFile() as f:
    f.write(r.content)
    f.seek(0)
    with py7zr.SevenZipFile(f, "r") as f:
      f.extractall(path=save_path)
  os.rename(save_path+"linux-cpu", save_path+"voicevox")
  subprocess.run(["chmod", "755", save_path+"voicevox/run"])

def setup_engine():
  global init_progressbar
  global progbar_desc
  progbar_desc.value = "VOICEVOXエンジンをセットアップしています..."
  ret_path = {}
  if os.path.isdir("drive/MyDrive/voicevox_on_colab/engines/voicevox"):
    ret_path["VOICEVOX"] = "drive/MyDrive/voicevox_on_colab/engines/voicevox/run"
  elif os.path.isdir("drive/MyDrive") and psutil.disk_usage('./drive/MyDrive/').free > 1.2 * 1024 ** 3:
    get_voicevox_engine("drive/MyDrive/voicevox_on_colab/engines/")
    ret_path["VOICEVOX"] = "drive/MyDrive/voicevox_on_colab/engines/voicevox/run"
  elif os.path.isdir("engines/voicevox"):
    ret_path["VOICEVOX"] = "engines/voicevox/run"
  else:
    get_voicevox_engine("engines/")
    ret_path["VOICEVOX"] = "engines/voicevox/run"
  init_progressbar.value += 1
  return ret_path

def wait_engine():
  global init_progressbar
  global progbar_desc
  for port in ((50021,)):
    progbar_desc.value = f"エンジンの起動を待っています({port})..."
    while True:
      time.sleep(0.1)
      try:
        r = requests.get(f"http://127.0.0.1:{port}/speakers", timeout=0.2)
      except Exception:
        continue
      break
  init_progressbar.value += 1

subprocs = {}
engine_paths = setup_engine()
for k in engine_paths.keys():
  _p = multiprocessing.Process(target=subprocess.run, args=([engine_paths[k]],), daemon=True)
  _p.start()
  subprocs[k] = _p
wait_engine()
_=MainGUI()