# Finetuning Magenta RT

<a href="https://colab.research.google.com/github/magenta/magenta-realtime/blob/main/notebooks/Magenta_RT_Finetune.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook shows how [Magenta RealTime](https://g.co/magenta/rt) can be customized through finetuning. You can either (1) follow the steps to train on your own audio dataset or (2) explore an existing finetuned model.

### 1. Finetuning Magenta RT on your own data
For finetuning, follow the notebook step by step. We recommend at least ~30 minutes of audio of consistent style, but you're free to experiment with something different. Finetuning allows you to build a custom model that sounds uniquely yours, so curating your own training data, and experimenting with it, is an important part of the process.

### 2. Exploring finetuned models
#### ⭐️ Holly Herndon x Magenta RT ⭐️
We're thrilled to have worked with [Holly Herndon](https://herndondryhurst.studio/) to finetune a model on her vocal style to share with all of you. To play with this model, run [Step 1](#scrollTo=RP-rwG3Uzz_1) and [Step 4](#scrollTo=94S5nEXipBAY) of the notebook, selecting `holly_finetune` as the checkpoint to load.

Compared to the [base Magenta RT demo](https://colab.research.google.com/github/magenta/magenta-realtime/blob/main/notebooks/Magenta_RT_Demo.ipynb), this demo includes additional prompt controls that allow you to steer the model towards styles that are more in-distribution with respect to the finetuning dataset.

We particularly recommend experimenting with the in-distribution controls with this model and noticing how mixing in the mean and centroids affects the generation. You might hear vocalizations that contour many the generated sounds, or, in some cases, "color" the instruments with a timbre that resembles Holly's voice (e.g. trumpet/violin oscillate between what you'd expect and her voice).

#### Loading your own finetuned model
If you have already finetuned your own model in a previous session and would like to load it in the demo, run [Step 1](#scrollTo=RP-rwG3Uzz_1) and follow the instructions in [Step 4](#scrollTo=94S5nEXipBAY).

# Step 1: 😴 One-time setup

In [None]:
# @title **Run this cell** to install dependencies (~5 minutes)
# @markdown Make sure you are running on **`v5e-1 TPU` runtime** via `Runtime > Change Runtime Type`

# @markdown Colab may prompt you to restart session. **Wait until the cell finishes running to restart**!

# Clone library
!git clone https://github.com/magenta/magenta-realtime.git

# Install library and dependencies
# If running on TPU (recommended, runs on free tier Colab TPUs):
!pip install -e magenta-realtime/[tpu] && pip install tf2jax==0.3.8
# Uncomment if running on GPU (requires A100 via Colab Pro):
# !pip install -e magenta-realtime/[gpu] && pip install tf2jax==0.3.8

!sed -i '/import tensorflow_text as tf_text/d' /usr/local/lib/python3.12/dist-packages/seqio/vocabularies.py
!sed -i "s|device_kind == 'TPU v4 lite'|device_kind == 'TPU v4 lite' or device_kind == 'TPU v5 lite'|g" /usr/local/lib/python3.12/dist-packages/t5x/partitioning.py

# Step 2: 📁 Prepare the training data

In [None]:
# @title Run this cell to load and process the training data

# @markdown Upload your training data to a Google Drive folder or directly to Colab using the file browser on the left, and specify the name of the directory below in `AUDIO_FOLDER_NAME`.
# @markdown For example, if you put your audio in a folder called "Guitar" in the root directory on Google Drive, select `AUDIO_SOURCE: "drive"` and set `AUDIO_FOLDER_NAME: "Guitar"`. You can also reference subdirectories on Google Drive, e.g., `AUDIO_FOLDER_NAME: "MyAudio/Guitar"`.

AUDIO_SOURCE = "drive" # @param ["colab","drive"]
AUDIO_FOLDER_NAME = ""  #@param {type:"string", "placeholder": "Name of the top-level folder containing your audio data"}
AUDIO_EXTENSIONS = "wav,mp3,flac,ogg"  #@param {type:"string"}
FILTER_QUIET_AUDIO = False # @param {type:"boolean"}

import os
import seqio
import pathlib
import numpy as np
import t5x
import clu.data
import tensorflow as tf
import tensorflow.data as tf_data
import tensorflow.io as tf_io
from matplotlib import pyplot as plt
from google.colab import drive
from tqdm.notebook import tqdm
from sklearn.cluster import KMeans
from IPython.display import display, Audio

from magenta_rt.finetune import data
from magenta_rt.finetune import tasks
from magenta_rt import audio as audio_lib


if AUDIO_SOURCE == "drive":
  drive.mount('/content/drive', force_remount=True)
  audio_dir = f'/content/drive/MyDrive/{AUDIO_FOLDER_NAME}'
else:
  audio_dir = f'/content/{AUDIO_FOLDER_NAME}'

AUDIO_DIR = pathlib.Path(audio_dir)
if not AUDIO_DIR.is_dir():
  raise FileNotFoundError(f"Audio directory {audio_dir} does not exist")

# Find audio paths
print(f"Searching {AUDIO_DIR} for audio files...")
audio_extensions = [e.strip() for e in AUDIO_EXTENSIONS.split(',') if len(e.strip()) > 0]
if len(audio_extensions) == 0:
  raise ValueError("No audio extensions specified")
AUDIO_PATHS = []
for e in audio_extensions:
  AUDIO_PATHS.extend(list(AUDIO_DIR.glob(f'**/*.{e}')))
AUDIO_PATHS = sorted(AUDIO_PATHS)

# Help user
if len(AUDIO_PATHS) == 0:
  raise FileNotFoundError(f"No audio files found in {audio_dir} with extensions {audio_extensions}")
else:
  print(f"Found {len(AUDIO_PATHS)} audio files in {audio_dir}. A few examples:")
  for p in AUDIO_PATHS[:5]:
    print('-' * 80)
    print(p)
    display(Audio(p))
  print('...')


TASK_NAME = str(AUDIO_DIR.stem)
OUTPUT_DIR = str(pathlib.Path(pathlib.Path.cwd() / 'mrt_finetune'))
if not os.path.exists(OUTPUT_DIR):
  os.makedirs(OUTPUT_DIR)
OUTPUT_PATTERN = f'{OUTPUT_DIR}/{TASK_NAME}_examples.recordio'


print("Tokenizing the training data...")
featurizer = data.Featurizer(
    filter_quiet=FILTER_QUIET_AUDIO,
    min_clip_seconds=2,
    include_style_embeddings=True,
)

records_count = 0
with tf_io.TFRecordWriter(OUTPUT_PATTERN) as file_writer:
  for audio_path in tqdm(AUDIO_PATHS):
    audio_input = audio_lib.Waveform.from_file(audio_path)
    tokenized_iter = featurizer.process(audio_input)
    for tokenized_example in tokenized_iter:
      records_count += 1
      file_writer.write(tokenized_example.SerializeToString())

print(f'{records_count} records written')
feaurized_audio_length = (records_count) * 30
print(f'Total duration of featurized audio: {feaurized_audio_length:.0f} seconds ({(feaurized_audio_length/60):.1f} minutes)')


print(f"Registering new Seqio task...")
if TASK_NAME in seqio.TaskRegistry.names():
  seqio.TaskRegistry.remove(TASK_NAME)
  seqio.TaskRegistry.remove(TASK_NAME+"_eval")

tasks.register_task(
    name=TASK_NAME,
    split_to_filepattern={
        'train': OUTPUT_PATTERN,
        'validation': OUTPUT_PATTERN,
    },
    reader_cls=tf_data.TFRecordDataset,
    acoustic_key='acoustic_tokens',
    style_key='style_tokens',
    encoder_codec_rvq_depth=4,
    decoder_codec_rvq_depth=16,
    max_prompt_secs=10,
)

print(f"{TASK_NAME} SeqIO task registered")


print("Computing statistics on the finetuning style embeddings...")
def decode_fn(record_bytes):
  return tf.io.parse_single_example(
      record_bytes,
      {"style_embeddings": tf.io.FixedLenFeature([], dtype=tf.string)}
  )

audio_style_embeddings = []
for batch in tf.data.TFRecordDataset([OUTPUT_PATTERN]).map(decode_fn):
  style_embeds = tf.io.parse_tensor(batch['style_embeddings'], out_type=tf.float32).numpy()
  audio_style_embeddings.append(np.mean(style_embeds, axis=0))
audio_style_embeddings = np.array(audio_style_embeddings)
np.save(f'{OUTPUT_DIR}/{TASK_NAME}_style_embeddings.npy', audio_style_embeddings)
mean_style_embed = np.mean(audio_style_embeddings, axis=0)
kmeans = KMeans(n_clusters=5, random_state=0, n_init=10)
kmeans.fit(audio_style_embeddings)
cluster_centroids = kmeans.cluster_centers_

# Step 3: 🔧 Finetune

In [None]:
# @title Run this cell to start finetuning

# @markdown If you'd like to save the experiment outputs (checkpoints,
# @markdown preprocessed data, style embeddings for in-distribution steering) to Google Drive so you can load the
# @markdown finetuned model in a future session tick the
# @markdown `COPY_EXPERIMENT_TO_DRIVE` box before running this cell.

from datetime import datetime

from magenta_rt.finetune import finetuner

EXPERIMENT_NAME = "" # @param {type: "string", "placeholder": "Name of your finetuning experiment (optional)"}
MODEL_SIZE = 'large' # @param ['base', 'large']
FINETUNING_STEPS = 6000 # @param
SAVE_CKP_PERIOD = 1000 # @param
COPY_EXPERIMENT_TO_DRIVE = False # @param {type:"boolean"}

if not EXPERIMENT_NAME:
  EXPERIMENT_NAME = datetime.now().strftime("%Y%m%d_%H%M")
MODEL_OUTPUT_DIR = f"{OUTPUT_DIR}/{EXPERIMENT_NAME}"

def get_ds_iterator(
    mixture_or_task_name: str,
    batch_size: int = 8,
    use_cached_tasks: bool = False,
    shuffle=True,
    ):
  train_dataset_cfg = t5x.utils.DatasetConfig(
      mixture_or_task_name = mixture_or_task_name,
      task_feature_lengths = {'inputs': 1006, 'targets': 800},
      split = 'train',
      batch_size = batch_size,
      shuffle = shuffle,
      use_cached = use_cached_tasks,
      pack = True,
      module = None,
      seed = 42,
  )

  train_ds = t5x.utils.get_dataset(
      cfg=train_dataset_cfg,
      shard_id=0,
      num_shards=1,
      feature_converter_cls=seqio.EncDecFeatureConverter,
  )
  train_iter = clu.data.dataset_iterator.TfDatasetIterator(train_ds, checkpoint=False)
  return train_iter

def plot_training_curves(training_summary, save=True):
  num_plots = len(training_summary.keys())
  fig, axs = plt.subplots(1, num_plots, figsize=(5*num_plots, 5))
  fig.suptitle(f"Training curves for {EXPERIMENT_NAME}")
  for i, (k, v) in enumerate(training_summary.items()):
    axs[i].plot([i.value for i in v])
    axs[i].set_xlabel('Step')
    axs[i].set_ylabel(k)
  if save:
    plt.savefig(f"{MODEL_OUTPUT_DIR}/training_curves.png")

print("Setting up the finetuner...")
MRTFinetuner = finetuner.MagentaRTFinetuner(
    tag=MODEL_SIZE,
    output_dir=MODEL_OUTPUT_DIR,
)

print("Training...")
train_iter = get_ds_iterator(TASK_NAME)

MRTFinetuner.train(
    train_iter=train_iter,
    num_steps=FINETUNING_STEPS,
    save_ckpt_period=SAVE_CKP_PERIOD,
  )

MRTFinetuner.train_summary

plot_training_curves(
    {
        'Loss': MRTFinetuner.loss,
        'Accuracy': MRTFinetuner.accuracy,
    }
)

if COPY_EXPERIMENT_TO_DRIVE:
  print("Copying experiment to Google Drive...")
  drive.mount('/content/drive', force_remount=True)
  np.save(f'{MODEL_OUTPUT_DIR}/mean_style_embed.npy', mean_style_embed)
  np.save(f'{MODEL_OUTPUT_DIR}/cluster_centroids.npy', cluster_centroids)
  if not os.path.exists(f'/content/drive/MyDrive/{MODEL_OUTPUT_DIR}'):
    ! cp -r {MODEL_OUTPUT_DIR} /content/drive/MyDrive/
  else:
    print("Experiment already exists in Google Drive")

# Step 4: 🎚 Play with the finetuned model

In [None]:
# @markdown If you'd like to load a model from a previous experiment,
# @markdown specify path and model size below and run this cell.
# @markdown You can load an experiment from Google Drive
# @markdown (`EXPERIMENT_SOURCE = "drive"`) or directly from Colab
# @markdown (`EXPERIMENT_SOURCE = "colab"`). If you choose colab,
# @markdown you'll need to upload the experiment folder through the file browser
# @markdown on the left.

EXPERIMENT_SOURCE = "colab" # @param ["colab","drive"]
PREVIOUS_EXPERIMENT_FOLDER = "" # @param {type: "string", "placeholder": "Type the path to the experiment folder here, e.g. mrt_finetune/my_experiment_name"}
FINETUNED_MODEL_SIZE = 'large' # @param ['base', 'large']

In [None]:
# @title Select the checkpoint

import os
from google.colab import drive
from ipywidgets import widgets
from IPython.display import display

try:
  PREVIOUS_EXPERIMENT_FOLDER
except:
  PREVIOUS_EXPERIMENT_FOLDER = ""

try:
  MODEL_OUTPUT_DIR
except:
  MODEL_OUTPUT_DIR = ""


if PREVIOUS_EXPERIMENT_FOLDER:
  if EXPERIMENT_SOURCE == "drive":
    drive.mount('/content/drive', force_remount=True)
    MODEL_OUTPUT_DIR = f'/content/drive/MyDrive/{PREVIOUS_EXPERIMENT_FOLDER}'
  else:
    MODEL_OUTPUT_DIR = f'/content/{PREVIOUS_EXPERIMENT_FOLDER}'
  MODEL_SIZE = FINETUNED_MODEL_SIZE
  mean_style_embed = np.load(f"{MODEL_OUTPUT_DIR}/mean_style_embed.npy")
  cluster_centroids = np.load(f"{MODEL_OUTPUT_DIR}/cluster_centroids.npy")

if MODEL_OUTPUT_DIR:
  checkpoints = [int(dir.split("_")[1]) for dir in os.listdir(MODEL_OUTPUT_DIR) if dir.startswith("checkpoint_")]
  checkpoints.sort()
else:
  checkpoints = []
checkpoints = ["holly_finetune"] + checkpoints

checkpoint_to_load = widgets.Dropdown(
    options=checkpoints,
    value=checkpoints[0],
    description='Checkpoint:',
    disabled=False,
)

display(checkpoint_to_load)

**Run the cell below and click the `start` button to begin streaming!**

**Instructions**. Type in text prompts or upload an audio file to control the overall style of the generated music in real time. The sliders change the influence of each prompt on the overall output. The other controls change various aspects of the system behavior (expand below for additional information).

⭐ **Finetuned model - extra features** ⭐ This demo includes additional prompt controls compared to the [main Magenta RT demo](https://github.com/magenta/magenta-realtime/blob/main/notebooks/Magenta_RT_Demo.ipynb) that allow you to steer the model towards styles that are more in-distribution with respect to the finetuning dataset. This can be beneficial when the finetuning data has a narrow coverage of sounds compared to pre-training (as is usually the case). Guiding the model towards in-distribution styles can turn the limited size and diversity of the finetuning data into a feature rather than a bug: out-of-domain prompts mixed with in-distribution embeddings often result in audio that retains some of the original prompt direction, while also being "infused" with sonic characteristics typical of the finetuning domain. Alongside the mean style embedding, we expose some cluster centroids as additional in-distribution prompts to achieve similar effects on more compact subspaces of the overall finetuning space.

**Disclaimer**. Magenta RT's training data primarily consists of Western
instrumental music. As a consequence, Magenta RT has incomplete coverage of both
vocal performance and the broader landscape of rich musical traditions
worldwide.

<details>
  <summary>Click to expand for additional information on the controls</summary>

*   **extra_buffering_seconds**: Increase this value if you experience audio
    drops during generation. This will come at the expense of a greater latency,
    but might help with internet connection issues. *You need to relaunch the
    cell if you choose to modify this value*.

*   **sampling options**

    *   **temperature**: This controls how *chaotic* the model behaves. Low
        temperature values (e.g., 0.9) will make the model's choices more
        predictable and stable. High values (e.g., 1.5) will encourage more
        surprising and experimental musical ideas, but can also lead to
        instability.

    *   **topk**: This parameter filters the model's vocabulary at each step. It
        forces the model to choose its next prediction only from the *k* most
        likely options.

        *   A **low `topk`** value (e.g., 40) restricts the model to a smaller,
            safer palette of options. This leads to more coherent and
            predictable music that is less likely to have dissonant errors, but
            can sometimes feel repetitive.
        *   A **high `topk`** value gives the model a much wider range of
            choices, allowing for more variety and unexpected turns. This can
            make the output more creative, but also noisier.

    *   **guidance**: This controls how strictly the generated music should
        adhere to the **text prompts**.

        *   A **higher value** will push the model to produce a textbook example
            of the chosen style, emphasizing its key characteristics.
        *   A **lower value** will treat the text prompts more as a loose
            inspiration, allowing the model more creative freedom to deviate and
            blend other influences.

*   **Reset**: stop audio, and resets the model.

*   **In-distribution steering**: This allows you to steer the model towards the finetuning style distribution.
    The first slider controls the weight of the embedding corresponding to the mean style prompt, the remaining five each correspond to cluster centroids.

*   **Text prompts**: Next to each text prompt is a slider that controls how
    much each prompt should be affecting the model. This allows the creation of
    *mixed* embeddings (try mixing synthwave and flamenco guitar together !).
    You can also type your own prompt and modify existing ones.

*   **Audio prompts**: Instead of using text to define a musical style, you can
    also upload audio references! Click on the `Upload audio file` button to
    create a new audio-based prompt. Note that only **the first 10s** of audio
    will be used. Supported formats include `.wav`, `.mp3` and `.ogg`.

</details>

In [None]:
# @title **Run this cell** to load the selected checkpoint and start the demo

import abc
import concurrent.futures
import functools
from typing import Sequence

import IPython.display as ipd
import ipywidgets as ipw
from magenta_rt import asset
from magenta_rt import audio as audio_lib
from magenta_rt import system
from magenta_rt.colab import prompt_types
from magenta_rt.colab import utils
from magenta_rt.colab import widgets
import numpy as np

extra_buffering_seconds = 0  # @param {"type":"slider","min":0,"max":4,"step":0.1}
BUFFERING_AMOUNT_SAMPLES = int(np.ceil(extra_buffering_seconds * 48000))

if checkpoint_to_load.value == "holly_finetune":
  checkpoint_path = "checkpoints/llm_large_holly_finetune.tar"
  checkpoint_dir = asset.fetch(checkpoint_path, is_dir=True, extract_archive=True, source="gcp")
  mean_style_embed_path = "finetune_features/holly_mean_style_embedding.npy"
  cluster_centroids_path = "finetune_features/holly_cluster_centroids.npy"
  mean_style_embed = np.load(asset.fetch(mean_style_embed_path, source="gcp"))
  cluster_centroids = np.load(asset.fetch(cluster_centroids_path, source="gcp"))
else:
  checkpoint_dir = f"{MODEL_OUTPUT_DIR}/checkpoint_{checkpoint_to_load.value}"
print("Loading checkpoint from ", checkpoint_dir)

MRT = system.MagentaRT(
    tag="large" if checkpoint_to_load.value == "holly_finetune" else MODEL_SIZE,
    lazy=False,
    checkpoint_dir=checkpoint_dir,
)


class AudioFade:
  """Handles the cross fade between audio chunks.

  Args:
    chunk_size: Number of audio samples per predicted frame (current
      SpectroStream models produces 25Hz frames corresponding to 1920 audio
      samples at 48kHz)
    num_chunks: Number of audio chunks to fade between.
    stereo: Whether the predicted audio is stereo or mono.
  """

  def __init__(self, chunk_size: int, num_chunks: int, stereo: bool):
    fade_size = chunk_size * num_chunks
    self.fade_size = fade_size
    self.num_chunks = num_chunks

    self.previous_chunk = np.zeros(fade_size)
    self.ramp = np.sin(np.linspace(0, np.pi / 2, fade_size)) ** 2

    if stereo:
      self.previous_chunk = self.previous_chunk[:, np.newaxis]
      self.ramp = self.ramp[:, np.newaxis]

  def reset(self):
    self.previous_chunk = np.zeros_like(self.previous_chunk)

  def __call__(self, chunk: np.ndarray) -> np.ndarray:
    chunk[: self.fade_size] *= self.ramp
    chunk[: self.fade_size] += self.previous_chunk
    self.previous_chunk = chunk[-self.fade_size :] * np.flip(self.ramp)
    return chunk[: -self.fade_size]


class AudioStreamer(abc.ABC):
  """Audio streamer base class."""

  def __init__(
      self,
      sample_rate: int = 48000,
      num_channels: int = 2,
      buffer_size: int = 48000 * 2,
      extra_buffering: int = BUFFERING_AMOUNT_SAMPLES,
  ):
    self.audio_streamer = None
    self.sample_rate = sample_rate
    self.num_channels = num_channels
    self.buffer_size = buffer_size
    self.extra_buffering = extra_buffering

  def on_stream_start(self):
    """Called when the UI starts streaming."""
    if self.audio_streamer is not None:
      self.audio_streamer.reset_ring_buffer()

  def on_stream_stop(self):
    """Called when the UI stops streaming."""
    pass

  @property
  @abc.abstractmethod
  def warmup(self) -> bool:
    """Returns whether to warm up the audio streamer."""
    pass

  def reset(self):
    if self.audio_streamer is not None:
      self.audio_streamer.reset_ring_buffer()

  def start(self):
    self.audio_streamer = utils.AudioStreamer(
        self,
        rate=self.sample_rate,
        buffer_size=self.buffer_size,
        warmup=self.warmup,
        num_output_channels=self.num_channels,
        additional_buffered_samples=self.extra_buffering,
        start_streaming_callback=self.on_stream_start,
        stop_streaming_callback=self.on_stream_stop,
    )
    self.reset()

  def stop(self):
    if self.audio_streamer is not None:
      del self.audio_streamer
      self.audio_streamer = None

  def global_ui_params(self):
    return utils.Parameters.get_values()

  def get_prompts(self):
    params = self.global_ui_params()
    num_prompts = sum(map(lambda s: "prompt_value" in s, params.keys()))
    prompts = []
    for i in range(num_prompts):
      prompt_weight = params[f"prompt_weight_{i}"]
      prompt_value = params[f"prompt_value_{i}"]

      if prompt_value is None or not prompt_weight:
        continue

      match type(prompt_value):
        case prompt_types.TextPrompt:
          prompt_value = prompt_value.strip()
        case prompt_types.AudioPrompt:
          pass
        case prompt_types.EmbeddingPrompt:
          pass
        case _:
          raise ValueError(f"Unsupported prompt type: {type(prompt_value)}")

      prompts.append((prompt_value, prompt_weight))
    return prompts

  @abc.abstractmethod
  def generate(self, ui_params):
    pass

  def __call__(self, inputs):
    del inputs
    return self.generate(self.global_ui_params())


class MagentaRTStreamer(AudioStreamer):
  """Audio streamer class for our open weights Magenta RT model.

  This class holds a pretrained Magenta RT model, a cross fade state, a
  generation state and an asynchronous executor to handle the embedding of text
  prompt without interrupting the audio thread.

  Args:
    system: A MagentaRTBase instance.
    mean_style_embed: Mean embedding of the finetuning data for in-distribution
      steering (optional).
    cluster_centroids: Cluster centroids of the finetuning data embeddings for
      in-distribution steering (optional).
  """

  def __init__(
      self,
      system: system.MagentaRTBase,
      mean_style_embed: np.ndarray | None = None,
      cluster_centroids: Sequence[np.ndarray] | None = None,
  ):
    super().__init__()
    self.system = system
    self.fade = AudioFade(chunk_size=1920, num_chunks=1, stereo=True)
    self.state = None
    self.executor = concurrent.futures.ThreadPoolExecutor()
    self.mean_style_embed = mean_style_embed
    self.cluster_centroids = cluster_centroids

  @property
  def warmup(self):
    return True

  @functools.cache
  def embed_style(self, style: str):
    return self.executor.submit(self.system.embed_style, style)

  @functools.cache
  def embed_audio(self, audio: tuple[float]):
    audio = audio_lib.Waveform(np.asarray(audio), 16000)
    return self.executor.submit(self.system.embed_style, audio)

  def get_style_embedding(self, force_wait: bool = False):
    prompts = self.get_prompts()
    weighted_embedding = np.zeros((768,), dtype=np.float32)
    total_weight = 0.0
    for prompt_value, prompt_weight in prompts:
      match type(prompt_value):
        case prompt_types.TextPrompt:
          if not prompt_value:
            continue
          embedding = self.embed_style(prompt_value)

        case prompt_types.AudioPrompt:
          embedding = self.embed_audio(tuple(prompt_value.value))

        case prompt_types.EmbeddingPrompt:
          embedding = prompt_value.value

        case _:
          raise ValueError(f"Unsupported prompt type: {type(prompt_value)}")

      if isinstance(embedding, concurrent.futures.Future):
        if force_wait:
          embedding.result()

        if not embedding.done():
          continue

        embedding = embedding.result()

      weighted_embedding += embedding * prompt_weight
      total_weight += prompt_weight

    if self.mean_style_embed is not None:
      params = self.global_ui_params()
      training_weight = params["mean"]
      weighted_embedding += self.mean_style_embed * training_weight
      total_weight += training_weight

    if self.cluster_centroids is not None:
      params = self.global_ui_params()
      for i, centroid in enumerate(self.cluster_centroids):
        centroid_weight = params[f"centroid_{i}"]
        weighted_embedding += centroid * centroid_weight
        total_weight += centroid_weight

    if total_weight > 0:
      weighted_embedding /= total_weight

    return weighted_embedding

  def on_stream_start(self):
    self.get_style_embedding(force_wait=False)
    self.get_style_embedding(force_wait=True)
    super().on_stream_start()

  def reset(self):
    self.state = None
    self.fade.reset()
    self.embed_style.cache_clear()
    super().reset()

  def generate(self, ui_params):
    chunk, self.state = self.system.generate_chunk(
        state=self.state,
        style=self.get_style_embedding(),
        seed=None,
        **ui_params,
    )
    chunk = self.fade(chunk.samples)
    return chunk

  def stop(self):
    self.executor.shutdown(wait=True)


# BUILD UI


def build_prompt_ui(default_prompts: Sequence[str], num_audio_prompt: int):
  """Add interactive prompt widgets and register them."""
  prompts = []

  for p in default_prompts:
    prompts.append(widgets.Prompt())
    prompts[-1].text.value = p

  prompts[0].slider.value = 1.0

  # add audio prompt
  for _ in range(num_audio_prompt):
    prompts.append(widgets.AudioPrompt())
    prompts[-1].slider.value = 0.0

  utils.Parameters.register_ui_elements(
      display=False,
      **{f"prompt_weight_{i}": p.slider for i, p in enumerate(prompts)},
      **{f"prompt_value_{i}": p.prompt_value for i, p in enumerate(prompts)},
  )
  return [p.get_widget() for p in prompts]


def build_steering_option_ui(num_centroids):
  """Add interactive steering option widgets and register them."""
  options = {
      "mean": ipw.FloatSlider(
          min=0.0,
          max=2.0,
          step=0.01,
          value=1.0,
          description="mean",
          layout=ipw.Layout(width="500px"),
      ),
  }

  for i in range(num_centroids):
    options[f"centroid_{i}"] = ipw.FloatSlider(
        min=0.0,
        max=2.0,
        step=0.01,
        value=0.0,
        description=f"centroid {i+1}",
        layout=ipw.Layout(width="500px"),
    )

  utils.Parameters.register_ui_elements(display=False, **options)

  return list(options.values())


def build_sampling_option_ui():
  """Add interactive sampling option widgets and register them."""
  options = {
      "temperature": ipw.FloatSlider(
          min=0.0,
          max=4.0,
          step=0.01,
          value=1.3,
          description="temperature",
      ),
      "topk": ipw.IntSlider(
          min=0,
          max=1024,
          step=1,
          value=40,
          description="topk",
      ),
      "guidance_weight": ipw.FloatSlider(
          min=0.0,
          max=10.0,
          step=0.01,
          value=5.0,
          description="guidance",
      ),
  }

  utils.Parameters.register_ui_elements(display=False, **options)

  return list(options.values())


utils.Parameters.reset()

# Initialize streamer
streamer = MagentaRTStreamer(
    MRT,
    mean_style_embed=mean_style_embed,
    cluster_centroids=cluster_centroids,
)


def _reset_state(*args, **kwargs):
  del args, kwargs
  streamer.reset()


reset_button = ipw.Button(description="reset")
reset_button.on_click(_reset_state)


# Building interactive UI
ipd.display(
    ipw.VBox([
        widgets.area(
            "sampling options",
            *build_sampling_option_ui(),
            reset_button,
        ),
        widgets.area(
            "in-distribution steering",
            *build_steering_option_ui(len(cluster_centroids)),
        ),
        widgets.area(
            "prompts",
            *build_prompt_ui(
                [
                    "synthwave",
                    "flamenco guitar",
                    "",
                    "",
                ],
                num_audio_prompt=2,
            ),
        ),
    ])
)

streamer.start()


# License and disclaimer

Magenta RealTime is offered under a combination of licenses: the codebase is
licensed under
[Apache 2.0](https://github.com/magenta/magenta-realtime/blob/main/LICENSE),
and the model weights under
[Creative Commons Attribution 4.0 International](https://creativecommons.org/licenses/by/4.0/legalcode).

In addition, we specify the following usage terms:

Copyright 2025 Google LLC

Use these materials responsibly and do not generate content, including outputs,
that infringe or violate the rights of others, including rights in copyrighted
content.

Google claims no rights in outputs you generate using Magenta RealTime. You and
your users are solely responsible for outputs and their subsequent uses.

Unless required by applicable law or agreed to in writing, all software and
materials distributed here under the Apache 2.0 or CC-BY licenses are
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied. See the licenses for the specific language governing
permissions and limitations under those licenses. You are solely responsible for
determining the appropriateness of using, reproducing, modifying, performing,
displaying or distributing the software and materials, and any outputs, and
assume any and all risks associated with your use or distribution of any of the
software and materials, and any outputs, and your exercise of rights and
permissions under the licenses.