# MusicGen Remixer
### Remix the music into another styles with MusicGen Chord
[MusicGen Remixer](https://replicate.com/sakemin/musicgen-remixer) is an app based on MusicGen Chord. Users can upload a music track with vocals, type in the text description prompt, and the app will create a new background track based on the input and then make a remixed music output.
This Jupyter notebook breaks down the process of MusicGen Remixer, by calling separate Replicate API calls and processing the outputs of the API calls.

## Setup

### Install Replicate client & python packages with pip

In [None]:
!pip install replicate numpy pydub requests scipy

### Authenticate Replicate API token
Run the cell below and type in your Replicate token. You can check your token [here](https://replicate.com/account).

In [None]:
from getpass import getpass
import os

REPLICATE_API_TOKEN = getpass()
os.environ["REPLICATE_API_TOKEN"] = REPLICATE_API_TOKEN

### Import packages

In [None]:
import replicate
from pathlib import Path
import urllib.request, json
import numpy as np
import requests
from pydub import AudioSegment
from io import BytesIO
from scipy.signal import resample
import IPython.display as ipd

### Declare functions

In [None]:
def download_audio_and_load_as_numpy(url):
    """
    Downloads an audio file (MP3 or WAV) from a given URL and loads it into a NumPy array.

    Parameters:
    url (str): The URL of the audio file to download.

    Returns:
    np.ndarray: The audio data as a NumPy array.
    int: The sample rate of the audio file.
    """
    # Download the audio file
    response = requests.get(url)
    response.raise_for_status()

    # Read the audio file as a byte stream
    audio_file = BytesIO(response.content)

    # Determine the file format from the URL
    file_format = url.split(".")[-1].lower()

    # Load the audio file using pydub
    if file_format == "mp3":
        audio = AudioSegment.from_mp3(audio_file)
    elif file_format == "wav":
        audio = AudioSegment.from_wav(audio_file)
    else:
        raise ValueError("Unsupported file format: only MP3 and WAV are supported.")

    # Convert the audio to a NumPy array with a channel dimension
    channel_count = audio.channels
    audio_data = np.array(audio.get_array_of_samples())
    if channel_count == 2:
        audio_data = audio_data.reshape(-1, 2)
    else:
        audio_data = audio_data.reshape(-1, 1)

    return audio_data, audio.frame_rate


def save_numpy_as_audio(audio_data, sample_rate, output_filename):
    """
    Saves a NumPy array as an audio file (MP3 or WAV).

    Parameters:
    audio_data (np.ndarray): The audio data to save.
    sample_rate (int): The sample rate of the audio data.
    output_filename (str): The name of the output file.
    """
    # Determine the file format from the output filename
    file_format = output_filename.split(".")[-1].lower()

    # Normalize audio
    audio_data = normalize_audio(audio_data)

    # Determine the number of channels based on the shape of the audio data
    channels = audio_data.shape[1] if audio_data.ndim > 1 else 1

    # Convert the NumPy array to an AudioSegment
    audio_segment = AudioSegment(
        audio_data.tobytes(),
        frame_rate=sample_rate,
        sample_width=audio_data.dtype.itemsize,
        channels=channels,
    )

    # Export the AudioSegment as an audio file
    audio_segment.export(output_filename, format=file_format)


def resample_audio(audio_data, original_sample_rate, new_sample_rate):
    """
    Resamples the audio data to a new sample rate.

    Parameters:
    audio_data (np.ndarray): The audio data to resample.
    original_sample_rate (int): The original sample rate of the audio data.
    new_sample_rate (int): The new sample rate to resample to.

    Returns:
    np.ndarray: The resampled audio data.
    """
    # Calculate the number of samples in the resampled audio
    num_original_samples = audio_data.shape[0]
    resample_ratio = new_sample_rate / original_sample_rate
    num_new_samples = int(num_original_samples * resample_ratio)

    # Resample the audio data
    resampled_audio = resample(audio_data, num_new_samples)

    return resampled_audio


def normalize_audio(audio_data):
    """
    Normalizes the audio data in a NumPy array to a range of -1 to 1.

    Parameters:
    audio_data (np.ndarray): The audio data to normalize.

    Returns:
    np.ndarray: The normalized audio data.
    """
    # Find the maximum absolute value in the audio data
    max_val = np.max(np.abs(audio_data))

    # Normalize the audio data to the range [-1, 1]
    normalized_audio = audio_data / max_val

    # Scale to int16 range and convert
    max_int16 = np.iinfo(np.int16).max
    normalized_audio_scaled = np.clip(
        normalized_audio * max_int16, -max_int16, max_int16
    ).astype(np.int16)

    return normalized_audio_scaled


def mix_audio_volumes(audio1, audio2, weight1=0.5, weight2=0.5):
    """
    Mixes two audio numpy arrays with given weights to ensure even volume.

    Parameters:
    audio1 (np.ndarray): The first audio data to mix.
    audio2 (np.ndarray): The second audio data to mix.
    weight1 (float): The weight for the first audio data.
    weight2 (float): The weight for the second audio data.

    Returns:
    np.ndarray: The mixed audio data.
    """
    if audio1.shape != audio2.shape:
        raise ValueError("Both audio arrays must have the same shape")

    # Normalize each audio array
    audio1_normalized = audio1 / np.max(np.abs(audio1))
    audio2_normalized = audio2 / np.max(np.abs(audio2))

    # Apply weights and mix
    mixed_audio = (audio1_normalized * weight1) + (audio2_normalized * weight2)
    mixed_audio_normalized = mixed_audio / np.max(np.abs(mixed_audio))

    # Scale to int16 range and convert
    max_int16 = np.iinfo(np.int16).max
    mixed_audio_scaled = np.clip(mixed_audio * max_int16, -max_int16, max_int16).astype(
        np.int16
    )

    return mixed_audio_scaled


def int16_scale(audio):
    # Scale to int16 range and convert
    max_int16 = np.iinfo(np.int16).max
    audio_scaled = np.clip(audio * max_int16, -max_int16, max_int16).astype(np.int16)

    return audio_scaled

## Set your inputs

In [None]:
prompt = "<your prompt>"
audio_path = "/your/audio/input.mp3"  # mp3 or wav
model_version = "chord"  # chord, chord-large, stereo-chord, stereo-chord-large
beat_sync_threshold = (
    None  # 0.75 is a good value, when `None`, automatically set to `1.1/(bpm/60)`
)
output_path = "output"
upscale = False  # Whether to upscale the audio to 48kHz with AudioSR
mix_weight = 0.7  # The weight for the generated instrumental track when mixing with the vocal.(0~1)

In [None]:
Path(output_path).mkdir(parents=True, exist_ok=True)
(Path(output_path) / "inter_process").mkdir(parents=True, exist_ok=True)

In [None]:
if "stereo" in model_version:
    channel = 2
else:
    channel = 1

## Get BPM and downbeat analysis of input audio, using [All-In-One Music Structure Analyzer](https://replicate.com/sakemin/all-in-one-music-structure-analyzer)

In [None]:
time_analysis_url = replicate.run(
    "sakemin/all-in-one-music-structure-analyzer:001b4137be6ac67bdc28cb5cffacf128b874f530258d033de23121e785cb7290",
    input={"music_input": Path(audio_path)},
)

time_analysis_url

### Download the output JSON and get BPM and downbeat values

In [None]:
with urllib.request.urlopen(time_analysis_url[0]) as url:
    data = json.load(url)

time_analysis = data

bpm = time_analysis["bpm"]
input_downbeats = time_analysis["downbeats"]

print("BPM:", bpm)
print("Downbeats:", input_downbeats)

### Set `beat_sync_threshold` when it is `None`

In [None]:
if not beat_sync_threshold or beat_sync_threshold == -1:
    if bpm is not None:
        beat_sync_threshold = 1.1 / (int(bpm) / 60)
    else:
        beat_sync_threshold = 0.75

## Separate vocal track out of instrumental track, using [Demucs](https://replicate.com/cjwbw/demucs)

In [None]:
track_urls = replicate.run(
    "cjwbw/demucs:25a173108cff36ef9f80f854c162d01df9e6528be175794b81158fa03836d953",
    input={
        "audio": Path(audio_path),
        "stem": "vocals",
        "shifts": 2,  # higher values for better quality, but takes more time
        "float32": True,
        "output_format": "mp3",
    },
)

track_urls

### Download the separated vocal track

In [None]:
vocal_track, vocal_sr = download_audio_and_load_as_numpy(track_urls["vocals"])
vocal_sr, vocal_track.shape

### Save the vocal track in mp3 format

In [None]:
vocal_path = (
    str(Path(output_path) / "inter_process" / Path(audio_path).name.rsplit(".", 1)[0])
    + "_vocals.mp3"
)
save_numpy_as_audio(vocal_track, vocal_sr, vocal_path)

### Download the separated instrumental track

In [None]:
instrumental_track, instrumental_sr = download_audio_and_load_as_numpy(
    track_urls["other"]
)
instrumental_sr, instrumental_track.shape

### Save the instrumental track in mp3 format

In [None]:
instrumental_path = (
    str(Path(output_path) / "inter_process" / Path(audio_path).name.rsplit(".", 1)[0])
    + "_inst.mp3"
)
save_numpy_as_audio(instrumental_track, instrumental_sr, instrumental_path)

## Generate a new instrumental track with the prompt and the original instrumental track as input, using [MusicGen-Stereo-Chord](https://replicate.com/sakemin/musicgen-stereo-chord)
### This might take a while

In [None]:
generated_instrumental_url = replicate.run(
    "sakemin/musicgen-stereo-chord:fbdc5ef7200220ed300015d9b4fd3f8e620f84547e970b23aa2be7f2ff366a5b",
    input={
        "model_version": model_version,
        "prompt": prompt + ", bpm: " + str(bpm),
        "audio_chords": Path(instrumental_path),
        "duration": int(instrumental_track.shape[0] / instrumental_sr),
    },
)

print(generated_instrumental_url)

### Download the generated instrumental track

In [None]:
(
    generated_instrumental_track,
    generated_instrumental_sr,
) = download_audio_and_load_as_numpy(generated_instrumental_url)
generated_instrumental_sr, generated_instrumental_track.shape

### Save the generated instrumental track in mp3 format

In [None]:
generated_instrumental_path = (
    str(Path(output_path) / "inter_process" / Path(audio_path).name.rsplit(".", 1)[0])
    + f"_{prompt}"
    + "_generated_inst.mp3"
)
save_numpy_as_audio(
    generated_instrumental_track, generated_instrumental_sr, generated_instrumental_path
)

## Sample rate matching(Choose one of the 2 options below)
- Choose one of the two ways given below

### 1. Force upsample the generated track to the input sample rate

In [None]:
if not upscale:
    resampled_instrumental_track = resample_audio(
        generated_instrumental_track, generated_instrumental_sr, vocal_sr
    )
    resampled_instrumental_track = int16_scale(
        normalize_audio(resampled_instrumental_track)
    )
    print(resampled_instrumental_track.shape, vocal_track.shape)

### 2. Upscale the tracks to 48khz with [Audio-Super-Resolution](https://replicate.com/sakemin/audiosr-long-audio)

In [None]:
if upscale:
    resampled_instrumental_url = replicate.run(
        "sakemin/audiosr-long-audio:44b37256d8d2ade24655f05a0d35128642ca90cbad0f5fa0e9bfa2d345124c8c",
        input={"input_file": Path(generated_instrumental_path)},
    )
    print(resampled_instrumental_url)

In [None]:
if upscale:
    (
        resampled_instrumental_track,
        resampled_instrumental_sr,
    ) = download_audio_and_load_as_numpy(resampled_instrumental_url)
    print(resampled_instrumental_sr, resampled_instrumental_track.shape)

In [None]:
if upscale:
    resampled_vocal_url = replicate.run(
        "sakemin/audiosr-long-audio:44b37256d8d2ade24655f05a0d35128642ca90cbad0f5fa0e9bfa2d345124c8c",
        input={"input_file": Path(vocal_path)},
    )
    print(resampled_vocal_url)

In [None]:
if not upscale:
    vocal_track, vocal_sr = download_audio_and_load_as_numpy(resampled_vocal_url)
    print(vocal_sr, vocal_track.shape)

### Save the resampled instrumental track in mp3 format

In [None]:
resampled_instrumental_path = (
    str(Path(output_path) / "inter_process" / Path(audio_path).name.rsplit(".", 1)[0])
    + f"_{prompt}"
    + "_resampled_inst.mp3"
)
save_numpy_as_audio(resampled_instrumental_track, vocal_sr, resampled_instrumental_path)

## Beat synchronization

### Get BPM and downbeat analysis of generated audio, using [All-In-One Music Structure Analyzer](https://replicate.com/sakemin/all-in-one-music-structure-analyzer)

In [None]:
output_time_analysis_url = replicate.run(
    "sakemin/all-in-one-music-structure-analyzer:001b4137be6ac67bdc28cb5cffacf128b874f530258d033de23121e785cb7290",
    input={"music_input": Path(resampled_instrumental_path)},
)

output_time_analysis_url

### Download the output JSON and get downbeat value

In [None]:
with urllib.request.urlopen(output_time_analysis_url[0]) as url:
    data = json.load(url)

output_time_analysis = data

generated_downbeats = output_time_analysis["downbeats"]

print("Downbeats:", generated_downbeats)

### Align the downbeats pair-wise

In [None]:
aligned_generated_downbeats = []
aligned_input_downbeats = []

In [None]:
for generated_downbeat in generated_downbeats:
    input_beat = min(
        input_downbeats, key=lambda x: abs(generated_downbeat - x), default=None
    )
    if input_beat is None:
        continue
    print(generated_downbeat, input_beat)
    if (
        len(aligned_input_downbeats) != 0
        and int(input_beat * vocal_sr) == aligned_input_downbeats[-1]
    ):
        print("Dropped")
        continue
    if abs(generated_downbeat - input_beat) > beat_sync_threshold:
        input_beat = generated_downbeat
        print("Replaced")
    aligned_generated_downbeats.append(int(generated_downbeat * vocal_sr))
    aligned_input_downbeats.append(int(input_beat * vocal_sr))

In [None]:
wav_length = resampled_instrumental_track.shape[-2]
downbeat_offset = aligned_input_downbeats[0] - aligned_generated_downbeats[0]
if downbeat_offset > 0:
    resampled_instrumental_track = np.concatenate(
        [np.zeros([1, channel, int(downbeat_offset)]), resampled_instrumental_track],
        dim=-1,
    )
    for i in range(len(aligned_generated_downbeats)):
        aligned_generated_downbeats[i] = (
            aligned_generated_downbeats[i] + downbeat_offset
        )
aligned_generated_downbeats = [0] + aligned_generated_downbeats + [wav_length]
aligned_input_downbeats = [0] + aligned_input_downbeats + [wav_length]

In [None]:
s_ap = ""
for i in range(len(aligned_generated_downbeats) - 1):
    s_ap += (
        str(aligned_generated_downbeats[i])
        + ":"
        + str(aligned_input_downbeats[i])
        + ", "
    )
s_ap += str(aligned_generated_downbeats[-1]) + ":" + str(aligned_input_downbeats[-1])
s_ap

### Apply dynamic time-stretching on the generated instrumental track, using [PyTSMod](https://replicate.com/sakemin/pytsmod)

In [None]:
time_stretched_instrumental_track_url = replicate.run(
    "sakemin/pytsmod:41b355721c8a7ed501be7fd89e73631e7c07d75e1c94b1372c1c119b0774cdae",
    input={
        "audio_input": Path(resampled_instrumental_path),
        "s_ap": s_ap,
        "absolute_frame": True,
    },
)
time_stretched_instrumental_track_url

### Download the time-stretched instrumental track

In [None]:
(
    time_stretched_instrumental_track,
    time_stretched_instrumental_sr,
) = download_audio_and_load_as_numpy(time_stretched_instrumental_track_url)
time_stretched_instrumental_sr, time_stretched_instrumental_track.shape

### Save the time-stretched instrumental track in mp3 format

In [None]:
time_stretched_instrumental_path = (
    str(Path(output_path) / "inter_process" / Path(audio_path).name.rsplit(".", 1)[0])
    + f"_{prompt}"
    + "_time_stretched_inst.mp3"
)
save_numpy_as_audio(
    time_stretched_instrumental_track,
    time_stretched_instrumental_sr,
    time_stretched_instrumental_path,
)

## Combine the generated instrumental track and the original vocal track

### Pad the generated track's length

In [None]:
pad = vocal_track.shape[0] - time_stretched_instrumental_track.shape[0]
pad

In [None]:
if pad > 0:
    padded_instrumental_track = np.pad(
        time_stretched_instrumental_track, ((0, pad), (0, 0)), "constant"
    )
else:
    padded_instrumental_track = time_stretched_instrumental_track[
        : vocal_track.shape[0]
    ]

### Make the number of channels consistent

In [None]:
if channel == 1 and vocal_track.shape[1] == 2:
    padded_instrumental_track = np.repeat(padded_instrumental_track, 2, axis=1)
if channel == 2 and vocal_track.shape[1] == 1:
    vocal_track = np.repeat(vocal_track, 2, axis=1)

### Mix and normalize two tracks

In [None]:
mixed_track = mix_audio_volumes(
    padded_instrumental_track, vocal_track, weight1=mix_weight, weight2=1 - mix_weight
)

## Play the remixed track

In [None]:
ipd.Audio(mixed_track.T, rate=time_stretched_instrumental_sr)

## Save the remixed track

In [None]:
remixed_path = (
    str(Path(output_path) / Path(audio_path).name.rsplit(".", 1)[0])
    + f"_{prompt}"
    + "_remixed.mp3"
)
save_numpy_as_audio(mixed_track, time_stretched_instrumental_sr, remixed_path)