<a href="https://colab.research.google.com/github/meloman2275/magenta/blob/main/notebooks/Magenta_RT_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Magenta RT: Streaming music generation!

<a href="https://colab.research.google.com/github/magenta/magenta-realtime/blob/main/notebooks/Magenta_RT_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Magenta RealTime is a Python library for streaming music audio generation on
your local device. It is the open weights / on device companion to
[MusicFX DJ Mode](https://labs.google/fx/tools/music-fx-dj) and the
[Lyria RealTime API](https://ai.google.dev/gemini-api/docs/music-generation).

-   [Blog Post](https://g.co/magenta/rt)
-   [Repository](https://github.com/magenta/magenta-realtime)
-   [HuggingFace](https://huggingface.co/google/magenta-realtime)

### Generating audio with Magenta RT

Magenta RT generates audio in short chunks (2s) given a finite amount of past
context (10s). We use crossfading to mitigate boundary artifacts between chunks.
More details on our model are coming soon in a technical report!

![Animation of chunk-by-chunk generation in Magenta RT](https://raw.githubusercontent.com/magenta/magenta-realtime/refs/heads/main/notebooks/diagram.gif)

# Step 1: 😴 One-time setup

In [1]:
# @title **Run this cell** to install dependencies (~5 minutes)
# @markdown Make sure you are running on **`v2-8 TPU` runtime** via `Runtime > Change Runtime Type`

# @markdown Colab may prompt you to restart session. **Wait until the cell finishes running to restart**!

# Clone library
!git clone https://github.com/magenta/magenta-realtime.git

# Magenta RT requires nightly TF builds, but stable may be installed.
# Force nightly to take precedence by uninstalling and reinstalling.
# Temporary workaround until MusicCoCa supported by TF stable.
_all_tf = 'tensorflow tensorflow-cpu tensorflow-text'
_nightly_tf = 'tf-nightly==2.20.0.dev20250619 tensorflow-text-nightly==2.20.0.dev20250316'

# Install library and dependencies
# If running on TPU (recommended, runs on free tier Colab TPUs):
!pip install -e magenta-realtime/[tpu] && pip install tf2jax==0.3.8 && pip uninstall -y {_all_tf} && pip install {_nightly_tf}
# Uncomment if running on GPU (requires A100 via Colab Pro):
# !pip install -e magenta-realtime/[gpu] && pip install tf2jax==0.3.8 && pip uninstall -y {_all_tf} && pip install {_nightly_tf}

Cloning into 'magenta-realtime'...
remote: Enumerating objects: 226, done.[K
remote: Counting objects: 100% (156/156), done.[K
remote: Compressing objects: 100% (98/98), done.[K
remote: Total 226 (delta 88), reused 100 (delta 57), pack-reused 70 (from 1)[K
Receiving objects: 100% (226/226), 1.21 MiB | 8.83 MiB/s, done.
Resolving deltas: 100% (100/100), done.
Obtaining file:///content/magenta-realtime
  Installing build dependencies ... [?25l[?25hdone
  Checking if build backend supports build_editable ... [?25l[?25hdone
  Getting requirements to build editable ... [?25l[?25hdone
  Preparing editable metadata (pyproject.toml) ... [?25l[?25hdone
Collecting flaxformer@ git+https://github.com/google/flaxformer@399ea3a (from magenta_rt==0.2.0)
  Cloning https://github.com/google/flaxformer (to revision 399ea3a) to /tmp/pip-install-rhes44bh/flaxformer_aa27fb9be5b941cfb413b01dbd207a68
  Running command git clone --filter=blob:none --quiet https://github.com/google/flaxformer /tmp/

In [1]:
# @title Run this cell to select backend and initialize model (~5 minutes)
# @markdown For the open weights model, select **Magenta RT** as your backend and **leave your API key blank**.

# @markdown For improved prompt coverage, we suggest using the [Lyria RealTime API](https://ai.google.dev/gemini-api/docs/music-generation); select **LyriaRT (API)** and paste your [Gemini API Key](https://ai.google.dev/gemini-api/docs/api-key).

BACKEND = "Magenta RT (Open weights)" # @param ["Magenta RT (Open weights)", "LyriaRT (API)"]
GEMINI_API_KEY = "" # @param {"type":"string", "placeholder": "By default, you may leave this blank for the open weights model"}

# @title Run this cell to initialize model (~5 minutes)
if BACKEND.startswith("LyriaRT"):
  if len(GEMINI_API_KEY.strip()) == 0:
    raise ValueError("You must input your Gemini API key")
  !pip install google-genai
  from google import genai
  from google.genai import types as genai_types
  LRT = genai.Client(api_key=GEMINI_API_KEY, http_options={'api_version': 'v1alpha'})
else:
  from magenta_rt import system

  # Fetch all assets from HuggingFace and initialize model (~5 minutes).
  MRT = system.MagentaRT(tag="large", device="tpu:v2-8", lazy=False)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Downloading from hf: savedmodels/ssv2_48k_stereo/encoder: 100%|██████████| 5/5 [00:02<00:00,  1.68it/s]
Downloading from hf: savedmodels/ssv2_48k_stereo/decoder: 100%|██████████| 5/5 [00:02<00:00,  2.04it/s]
Downloading from hf: savedmodels/ssv2_48k_stereo/quantizer: 100%|██████████| 5/5 [00:02<00:00,  2.07it/s]
Downloading from hf: vocabularies/musiccoca_mv212f_vocab.model: 100%|██████████| 1/1 [00:00<00:00,  1.88it/s]
Downloading from hf: savedmodels/musiccoca_mv212f_cpu_novocab: 100%|██████████| 11/11 [00:04<00:00,  2.52it/s]
Downloading from hf: savedmodels/musiccoca_mv212_quan

# Step 2: 🤘 Streaming music generation! 🎵

**Run the cell below and click the `start` button to begin streaming!**

**Instructions**. Type in text prompts to control the overall style of the
generated music in real time. The sliders by the prompts change the influence of
each text prompt on the overall output. The other controls change various
aspects of the system behavior (expand below for additional information).

**Disclaimer**. Magenta RT's training data primarily consists of Western
instrumental music. As a consequence, Magenta RT has incomplete coverage of both
vocal performance and the broader landscape of rich musical traditions
worldwide. For real-time generation with broader style coverage, we suggest
users select the **LyriaRT (API)** option in Step 1. See our
[model card](https://huggingface.co/google/magenta-realtime) for more
information.

<details>
  <summary>Click to expand for additional information on the controls</summary>

*   **extra_buffering_seconds**: Increase this value if you experience audio
    drops during generation. This will come at the expense of a greater latency,
    but might help with internet connection issues. *You need to relaunch the
    cell if you choose to modify this value*.

*   **sampling options**

    *   **temperature**: This controls how *chaotic* the model behaves. Low
        temperature values (e.g., 0.9) will make the model's choices more
        predictable and stable. High values (e.g., 1.5) will encourage more
        surprising and experimental musical ideas, but can also lead to
        instability.

    *   **topk**: This parameter filters the model's vocabulary at each step. It
        forces the model to choose its next prediction only from the *k* most
        likely options.

        *   A **low `topk`** value (e.g., 40) restricts the model to a smaller,
            safer palette of options. This leads to more coherent and
            predictable music that is less likely to have dissonant errors, but
            can sometimes feel repetitive.
        *   A **high `topk`** value gives the model a much wider range of
            choices, allowing for more variety and unexpected turns. This can
            make the output more creative, but also noisier.

    *   **guidance**: This controls how strictly the generated music should
        adhere to the **text prompts**.

        *   A **higher value** will push the model to produce a textbook example
            of the chosen style, emphasizing its key characteristics.
        *   A **lower value** will treat the text prompts more as a loose
            inspiration, allowing the model more creative freedom to deviate and
            blend other influences.

*   **Reset**: stop audio, and resets the model.

*   **Text prompts**: Next to each text prompt is a slider that controls how
    much each prompt should be affecting the model. This allows the creation of
    *mixed* embeddings (try mixing synthwave and flamenco guitar together !).
    You can also type your own prompt and modify existing ones.

*   **Audio prompts**: Instead of using text to define a musical style, you can
    also upload audio references! Click on the `Upload audio file` button to
    create a new audio-based prompt. Note that only **the first 10s** of audio
    will be used. Supported formats include `.wav`, `.mp3` and `.ogg`.

</details>

In [4]:
# @title Run this cell to start demo

import abc
import asyncio
import concurrent.futures
import functools
import io
import queue
import threading
import traceback
from typing import Sequence

import IPython.display as ipd
import ipywidgets as ipw
import numpy as np
import soundfile as sf

from magenta_rt import audio as audio_lib
from magenta_rt import system
from magenta_rt.colab import prompt_types
from magenta_rt.colab import utils
from magenta_rt.colab import widgets


extra_buffering_seconds = 0  # @param {"type":"slider","min":0,"max":4,"step":0.1}
BUFFERING_AMOUNT_SAMPLES = int(np.ceil(extra_buffering_seconds * 48000))


class AudioFade:
  """Handles the cross fade between audio chunks.

  Args:
    chunk_size: Number of audio samples per predicted frame (current
      SpectroStream models produces 25Hz frames corresponding to 1920 audio
      samples at 48kHz)
    num_chunks: Number of audio chunks to fade between.
    stereo: Whether the predicted audio is stereo or mono.
  """

  def __init__(self, chunk_size: int, num_chunks: int, stereo: bool):
    fade_size = chunk_size * num_chunks
    self.fade_size = fade_size
    self.num_chunks = num_chunks

    self.previous_chunk = np.zeros(fade_size)
    self.ramp = np.sin(np.linspace(0, np.pi / 2, fade_size)) ** 2

    if stereo:
      self.previous_chunk = self.previous_chunk[:, np.newaxis]
      self.ramp = self.ramp[:, np.newaxis]

  def reset(self):
    self.previous_chunk = np.zeros_like(self.previous_chunk)

  def __call__(self, chunk: np.ndarray) -> np.ndarray:
    chunk[: self.fade_size] *= self.ramp
    chunk[: self.fade_size] += self.previous_chunk
    self.previous_chunk = chunk[-self.fade_size :] * np.flip(self.ramp)
    return chunk[: -self.fade_size]


class AudioStreamer(abc.ABC):
  """Audio streamer base class."""

  def __init__(
      self,
      sample_rate: int = 48000,
      num_channels: int = 2,
      buffer_size: int = 48000 * 2,
      extra_buffering: int = BUFFERING_AMOUNT_SAMPLES,
  ):
    self.audio_streamer = None
    self.sample_rate = sample_rate
    self.num_channels = num_channels
    self.buffer_size = buffer_size
    self.extra_buffering = extra_buffering

  def on_stream_start(self):
    """Called when the UI starts streaming."""
    if self.audio_streamer is not None:
      self.audio_streamer.reset_ring_buffer()

  def on_stream_stop(self):
    """Called when the UI stops streaming."""
    pass

  @property
  @abc.abstractmethod
  def warmup(self) -> bool:
    """Returns whether to warm up the audio streamer."""
    pass

  def reset(self):
    if self.audio_streamer is not None:
      self.audio_streamer.reset_ring_buffer()

  def start(self):
    self.audio_streamer = utils.AudioStreamer(
        self,
        rate=self.sample_rate,
        buffer_size=self.buffer_size,
        warmup=self.warmup,
        num_output_channels=self.num_channels,
        additional_buffered_samples=self.extra_buffering,
        start_streaming_callback=self.on_stream_start,
        stop_streaming_callback=self.on_stream_stop,
    )
    self.reset()

  def stop(self):
    if self.audio_streamer is not None:
      del self.audio_streamer
      self.audio_streamer = None

  def global_ui_params(self):
    return utils.Parameters.get_values()

  def get_prompts(self):
    params = self.global_ui_params()
    num_prompts = sum(map(lambda s: "prompt_value" in s, params.keys()))
    prompts = []
    for i in range(num_prompts):
      prompt_weight = params[f"prompt_weight_{i}"]
      prompt_value = params[f"prompt_value_{i}"]

      if prompt_value is None or not prompt_weight:
        continue

      match type(prompt_value):
        case prompt_types.TextPrompt:
          prompt_value = prompt_value.strip()
        case prompt_types.AudioPrompt:
          pass
        case prompt_types.EmbeddingPrompt:
          pass
        case _:
          raise ValueError(f"Unsupported prompt type: {type(prompt_value)}")

      prompts.append((prompt_value, prompt_weight))
    return prompts

  @abc.abstractmethod
  def generate(self, ui_params):
    pass

  def __call__(self, inputs):
    del inputs
    return self.generate(self.global_ui_params())


class MagentaRTStreamer(AudioStreamer):
  """Audio streamer class for our open weights Magenta RT model.

  This class holds a pretrained Magenta RT model, a cross fade state, a
  generation state and an asynchronous executor to handle the embedding of text
  prompt without interrupting the audio thread.

  Args:
    system: A MagentaRTBase instance.
  """

  def __init__(self, system: system.MagentaRTBase):
    super().__init__()
    self.system = system
    self.fade = AudioFade(chunk_size=1920, num_chunks=1, stereo=True)
    self.state = None
    self.executor = concurrent.futures.ThreadPoolExecutor()

  @property
  def warmup(self):
    return True

  @functools.cache
  def embed_style(self, style: str):
    return self.executor.submit(self.system.embed_style, style)

  @functools.cache
  def embed_audio(self, audio: tuple[float]):
    audio = audio_lib.Waveform(np.asarray(audio), 16000)
    return self.executor.submit(self.system.embed_style, audio)

  def get_style_embedding(self, force_wait: bool = False):
    prompts = self.get_prompts()
    weighted_embedding = np.zeros((768,), dtype=np.float32)
    total_weight = 0.0
    for prompt_value, prompt_weight in prompts:
      match type(prompt_value):
        case prompt_types.TextPrompt:
          if not prompt_value:
            continue
          embedding = self.embed_style(prompt_value)

        case prompt_types.AudioPrompt:
          embedding = self.embed_audio(tuple(prompt_value.value))

        case prompt_types.EmbeddingPrompt:
          embedding = prompt_value.value

        case _:
          raise ValueError(f"Unsupported prompt type: {type(prompt_value)}")

      if isinstance(embedding, concurrent.futures.Future):
        if force_wait:
          embedding.result()

        if not embedding.done():
          continue

        embedding = embedding.result()

      weighted_embedding += embedding * prompt_weight
      total_weight += prompt_weight

    if total_weight > 0:
      weighted_embedding /= total_weight

    return weighted_embedding

  def on_stream_start(self):
    self.get_style_embedding(force_wait=False)
    self.get_style_embedding(force_wait=True)
    super().on_stream_start()

  def reset(self):
    self.state = None
    self.fade.reset()
    self.embed_style.cache_clear()
    super().reset()

  def generate(self, ui_params):
    chunk, self.state = self.system.generate_chunk(
        state=self.state,
        style=self.get_style_embedding(),
        seed=None,
        **ui_params,
    )
    chunk = self.fade(chunk.samples)
    return chunk

  def stop(self):
    self.executor.shutdown(wait=True)


class LyriaRTStreamer(AudioStreamer):
  """Audio streamer for the asynchronous Lyria RealTime API.

  This class bridges the synchronous `AudioStreamer` with the async `genai`
  library by running the `asyncio` event loop in a dedicated background thread.
  It uses thread-safe queues for communication.
  """

  def __init__(self, client):  # Assuming client is genai.Client
    super().__init__()
    self.client = client
    self.prompts = {}
    self.params = {}
    self.session = None
    self.playback_state = "stopped"
    # Queues for thread-safe communication
    self.audio_queue = queue.Queue(maxsize=10)  # Buffer a few chunks
    self.update_queue = queue.Queue(maxsize=1)

    # Background thread management
    self._thread: threading.Thread | None = None
    self._stop_event = threading.Event()

  def start(self):
    """Starts the background thread for the asyncio event loop."""
    if self._thread is None:
      self._stop_event.clear()
      self._thread = threading.Thread(target=self._run_async_loop, daemon=True)
      self._thread.start()
    super().start()

  def stop(self):
    """Signals the background thread to stop and waits for it to exit."""
    if self._thread and self._thread.is_alive():
      self._stop_event.set()
      self._thread.join(timeout=5)
    self._thread = None
    super().stop()

  @property
  def warmup(self):
    # Wait for the user starts the stream before requesting the first audio chunk.
    return False

  def on_stream_start(self):
    if self.session is not None:
      asyncio.run(self.session.play())
      self.playback_state = "playing"
      super().on_stream_start()

  def on_stream_stop(self):
    if self.session is not None:
      asyncio.run(self.session.stop())
      self.playback_state = "stopped"
      self.reset()  # Drain the audio queue
      super().on_stream_stop()

  def _run_async_loop(self):
    """The target method for the background thread."""
    try:
      asyncio.run(self._manage_session())
    except Exception as e:
      print(f"Error in async loop: {e}")

  async def _manage_session(self):
    """Main async task to connect to Lyria RealTime API and handle communication."""
    from google.genai import types as genai_types  # Late import for clarity

    while not self._stop_event.is_set():
      try:
        # Establish a connection
        async with self.client.aio.live.music.connect(
            model="models/lyria-realtime-exp"
        ) as session:
          self.session = session
          # Start a background task to continuously receive audio
          receive_task = asyncio.create_task(self._receive_audio(session))

          # Wait for the first set of parameters to arrive
          initial_params = await self._get_next_update(None)
          if initial_params:
            await self._apply_params(session, initial_params, genai_types)

          if self.playback_state == "playing":
            # Resume playback on automatic reconnection.
            await session.play()

          # Main loop to process parameter updates
          while not self._stop_event.is_set() and not receive_task.done():
            latest_params = await self._get_next_update(timeout=0.1)
            if latest_params:
              await self._apply_params(session, latest_params, genai_types)

          # Reset session parameters so they are set on the next connection.
          self.prompts = {}
          self.params = {}
          receive_task.cancel()
          await asyncio.gather(receive_task, return_exceptions=True)

      except Exception as e:
        traceback.print_exc()
        print(f"Lyria RealTime session failed, will retry in 5s: {e}")
        if self._stop_event.is_set():
          break
        await asyncio.sleep(5)

      self.session = None  # The session has been closed at this point.

  async def _get_next_update(self, timeout=None):
    """Asynchronously waits for the next item in the update queue."""
    loop = asyncio.get_running_loop()
    try:
      # Use a future to bridge the sync queue with the async loop
      return await loop.run_in_executor(
          None, lambda: self.update_queue.get(timeout=timeout)
      )
    except queue.Empty:
      return None

  async def _apply_params(self, session, ui_params, genai_types):
    """Applies UI parameters to the live session."""
    prompts = self.get_prompts()
    params = {
        "temperature": ui_params.get("temperature"),
        "top_k": ui_params.get("topk"),
        "guidance": ui_params.get("guidance_weight"),
    }
    if prompts != self.prompts:
      self.prompts = prompts
      norm = sum(w for _, w in prompts) or 1.0
      await session.set_weighted_prompts(
          prompts=[
              genai_types.WeightedPrompt(text=p, weight=w / norm)
              for p, w in prompts
          ]
      )
    if params != self.params:
      self.params = params
      await session.set_music_generation_config(
          config=genai_types.LiveMusicGenerationConfig(
              temperature=params.get("temperature"),
              top_k=params.get("topk"),
              guidance=params.get("guidance_weight"),
          )
      )

  async def _receive_audio(self, session):
    """Task to receive audio chunks from the server and queue them."""
    async for message in session.receive():
      if message.server_content and message.server_content.audio_chunks:
        audio_data = message.server_content.audio_chunks[0].data
        try:
          # Extract interleaved stereo channels.
          stereo_samples = np.frombuffer(audio_data, dtype=np.int16).reshape(
              -1, 2
          )
          # Write samples to an in memory wav file to read them back as bytes.
          buffer = io.BytesIO()
          sf.write(buffer, stereo_samples, self.sample_rate, format="WAV")
          buffer.seek(0)
          samples, _ = sf.read(buffer)
          self.audio_queue.put(samples)
        except Exception as e:
          print(f"Error reading audio data: {e}")
          traceback.print_exc()
          raise e

  def reset(self):
    """Clears queues and primes the update queue with current params."""
    # Clear any stale data from queues
    while not self.audio_queue.empty():
      try:
        self.audio_queue.get_nowait()
      except queue.Empty:
        break
    while not self.update_queue.empty():
      try:
        self.update_queue.get_nowait()
      except queue.Empty:
        break

    # Prime the queue with the initial parameters to break the deadlock.
    self.update_queue.put_nowait(self.global_ui_params())

    # Call the base class reset.
    super().reset()

  def generate(self, ui_params):
    """(Synchronous) Provides audio to the streamer.

    Sends new params to the async loop and gets the next available audio chunk.
    """
    # Try to send the latest params to the async loop.
    # If the queue is full, the last update is just replaced.
    if self.update_queue.full():
      try:
        self.update_queue.get_nowait()  # Discard old update
      except queue.Empty:
        pass  # Race condition, another thread got it. Fine.
    self.update_queue.put_nowait(ui_params)

    # Block and wait for the next audio chunk from the async loop.
    try:
      return self.audio_queue.get(timeout=5)
    except queue.Empty:
      print("Audio queue timeout. Returning silence.")
      # Return silence matching the expected format if no audio is available
      return np.zeros(
          (self.sample_rate * 2, self.num_channels), dtype=np.float32
      ).tobytes()

  def __del__(self):
    self.stop()


# BUILD UI


def build_prompt_ui(default_prompts: Sequence[str], num_audio_prompt: int):
  """Add interactive prompt widgets and register them."""
  prompts = []

  for p in default_prompts:
    prompts.append(widgets.Prompt())
    prompts[-1].text.value = p

  prompts[0].slider.value = 1.0

  # add audio prompt
  for _ in range(num_audio_prompt):
    prompts.append(widgets.AudioPrompt())
    prompts[-1].slider.value = 0.0

  utils.Parameters.register_ui_elements(
      display=False,
      **{f"prompt_weight_{i}": p.slider for i, p in enumerate(prompts)},
      **{f"prompt_value_{i}": p.prompt_value for i, p in enumerate(prompts)},
  )
  return [p.get_widget() for p in prompts]


def build_sampling_option_ui():
  """Add interactive sampling option widgets and register them."""
  options = {
      "temperature": ipw.FloatSlider(
          min=0.0,
          max=4.0,
          step=0.01,
          value=1.3,
          description="temperature",
      ),
      "topk": ipw.IntSlider(
          min=0,
          max=1024,
          step=1,
          value=40,
          description="topk",
      ),
      "guidance_weight": ipw.FloatSlider(
          min=0.0,
          max=10.0,
          step=0.01,
          value=5.0,
          description="guidance",
      ),
  }

  utils.Parameters.register_ui_elements(display=False, **options)

  return list(options.values())


utils.Parameters.reset()


# Make sure setup cell was run
try:
  BACKEND
except NameError:
  raise RuntimeError("Please run the cell above.")


# Initialize streamer
if BACKEND.startswith("LyriaRT"):
  try:
    LRT
  except NameError:
    raise RuntimeError("Please run the cell above.")
  streamer = LyriaRTStreamer(LRT)
else:
  try:
    MRT
  except NameError:
    raise RuntimeError("Please run the cell above.")
  streamer = MagentaRTStreamer(MRT)


def _reset_state(*args, **kwargs):
  del args, kwargs
  streamer.reset()


reset_button = ipw.Button(description="reset")
reset_button.on_click(_reset_state)


# Building interactive UI
ipd.display(
    ipw.VBox([
        widgets.area(
            "sampling options",
            *build_sampling_option_ui(),
            reset_button,
        ),
        widgets.area(
            "prompts",
            *build_prompt_ui(
                [
                    "synthwave",
                    "flamenco guitar",
                    "",
                    "",
                ],
                num_audio_prompt=0 if BACKEND.startswith("LyriaRT") else 2,
            ),
        ),
    ])
)

streamer.start()

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

VBox(children=(Box(children=(HTML(value='<h3>sampling options</h3>'), FloatSlider(value=1.3, description='temp…

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Step 3: Understand what is happening behind the hood

Let's start by generating a short (2s) chunk of synthwave

In [None]:
import IPython.display as ipd

try:
  model = MRT
except NameError:
  model = system.MagentaRT(tag="large", device="tpu:v2-8", lazy=False)

prompt = "synthwave"
embedding = model.embed_style(prompt)

audio, state = model.generate_chunk(
    state=None,
    style=embedding,
    seed=0,
)

ipd.display(ipd.Audio(audio.samples.T, rate=audio.sample_rate))

We can generate longer sequences by concatenating generations while keeping
track of the internal state of the model. We use a crossfade time of 40ms to
concatenate audio chunks as this is the frame length used by SpectroStream when
encoding audio.

In [None]:
from magenta_rt import audio

num_chunks = 4
state = None
chunks = []

for i in range(num_chunks):
  chunk, state = model.generate_chunk(
      state=state,
      style=embedding,
      seed=i,
  )
  chunks.append(chunk)

concatenated_audio = audio.concatenate(
    chunks,
    crossfade_time=model.config.crossfade_length,
)
ipd.display(
    ipd.Audio(concatenated_audio.samples.T, rate=concatenated_audio.sample_rate)
)

At the core of Magenta RT lies the idea of changing the style embedding *during
generation* to enable smooth transitions between musical concepts. What about
transitioning from "synthwave" to "disco funk" ?

In [None]:
state = None
chunks = []

styles = [
    "synthwave",
    "disco synthwave",
    "disco",
    "disco funk",
]

for i, style in enumerate(styles):
  chunk, state = model.generate_chunk(
      state=state,
      style=model.embed_style(style),
      seed=i,
      guidance_weight=5.0,
      temperature=1.3,
  )
  chunks.append(chunk)

concatenated_audio = audio.concatenate(
    chunks,
    crossfade_time=model.config.crossfade_length,
)
ipd.display(
    ipd.Audio(concatenated_audio.samples.T, rate=concatenated_audio.sample_rate)
)

A simpler version can be done through the interpolation of musical genres in
*the embedding space*.

In [None]:
import numpy as np

state = None
chunks = []

embed_a = model.embed_style("synthwave")
embed_b = model.embed_style("disco funk")

weight = np.linspace(0, 1, 8, endpoint=True)

embeddings = embed_a[None] + weight[:, None] * (embed_b - embed_a)
embeddings = embeddings.astype(np.float32)


for i, embedding in enumerate(embeddings):
  chunk, state = model.generate_chunk(
      state=state,
      style=embedding,
      seed=i,
      guidance_weight=5.0,
      temperature=1.3,
  )
  chunks.append(chunk)

concatenated_audio = audio.concatenate(
    chunks,
    crossfade_time=model.config.crossfade_length,
)
ipd.display(
    ipd.Audio(concatenated_audio.samples.T, rate=concatenated_audio.sample_rate)
)

# License and terms

Magenta RealTime is offered under a combination of licenses: the codebase is
licensed under
[Apache 2.0](https://github.com/magenta/magenta-realtime/blob/main/LICENSE), and
the model weights under
[Creative Commons Attribution 4.0 International](https://creativecommons.org/licenses/by/4.0/legalcode).

In addition, we specify the following usage terms:

Copyright 2025 Google LLC

Use these materials responsibly and do not generate content, including outputs,
that infringe or violate the rights of others, including rights in copyrighted
content.

Google claims no rights in outputs you generate using Magenta RealTime. You and
your users are solely responsible for outputs and their subsequent uses.

Unless required by applicable law or agreed to in writing, all software and
materials distributed here under the Apache 2.0 or CC-BY licenses are
distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
either express or implied. See the licenses for the specific language governing
permissions and limitations under those licenses. You are solely responsible for
determining the appropriateness of using, reproducing, modifying, performing,
displaying or distributing the software and materials, and any outputs, and
assume any and all risks associated with your use or distribution of any of the
software and materials, and any outputs, and your exercise of rights and
permissions under the licenses.