In [None]:
import os
from typing import Union, Tuple
from IPython.display import Audio, display, HTML
import torch

use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

# Example from MUSDB18 Dataset

In [None]:
import musdb

In [None]:
path = '~/MUSDB18/MUSDB18-7'
expanded_path = os.path.expanduser(path)

if os.path.exists(expanded_path):
    print(f"The path {expanded_path} exists, so we will use examples in the path.")
    mus = musdb.DB(root=expanded_path, download=False, subsets='test', )
else:
    print(f"The path {expanded_path} does not exist, so need to download.")
    mus = musdb.DB(download=True, subsets='test', )

In [None]:
track = mus[49]

In [None]:
print(track.name)
display(Audio(track.audio.T, rate=track.rate))

### Source-Separate the selected track

In this section we use `openunmix` from [open-unmix-pytorch github](https://github.com/sigsep/open-unmix-pytorch).
More details can be found from this [page](https://sigsep.github.io/open-unmix/).
I have learned a lot from its [Colab Notebook](https://colab.research.google.com/drive/1mijF0zGWxN-KaxTnd0q6hayAlrID5fEQ#scrollTo=dvUDAibHbzA5).


In [None]:
from openunmix import predict

In [None]:
estimates = predict.separate(
    torch.as_tensor(track.audio).float(),
    rate=track.rate,
    device=device
)

In [None]:
for target, estimate in estimates.items():
    print(target)
    audio = estimate.detach().cpu().numpy()[0]
    display(Audio(audio, rate=track.rate))

# Download YouTube video

In [None]:
from pytube import YouTube

In [None]:
def construct_embed_url(vid_key: str, t_start: Union[int, None], t_stop: Union[int, None]) -> str:
    if t_start and t_stop:
        assert t_start < t_stop, f"Wait, the start timestamp `t_start` ({t_start}) is supposed to be less than the end timestamp `t_stop` ({t_stop})"

    if vid_key:
        embed_url = f"https://www.youtube.com/embed/{vid_key}?rel=0"
    if t_start:
        embed_url += f"&start={t_start}"
    if t_stop:
        embed_url += f"&end={t_stop}"
    embed_url += "&;controls=0&;showinfo=0"
    return embed_url

# Create a YouTube object and get the video stream with the highest resolution
def download_youtube_video_by_key(vid_key: str, 
                                  output_dir: str='', 
                                  out_filename: str='clip', 
                                  file_ext: str='mp4', 
                                  audio_only: bool=False
                                 ) -> Tuple[str, str]:
    vid_url = f'https://youtu.be/{vid_key}'

    yt = YouTube(vid_url)
    download_filename = f'{out_filename}.{file_ext}'
    
    if audio_only:
        ## get audio only
        stream = yt.streams.get_audio_only(subtype=file_ext)
    else:
        ## get both video and audio
        stream = yt.streams.get_highest_resolution()

    # Download the video
    stream.download(output_dir, filename=download_filename)
    
    return output_dir, download_filename

# Use ffmpeg to extract the desired section of the video and audio
def crop_clip(output_dir: str, input_filename: str, output_filename: str=None) -> Tuple[str, str]:
    if output_filename is None:
        filename, ext = input_filename.split('.')
        output_filename = f'{filename}_cropped.{ext}'
    cmd = f"yes y | ffmpeg -i {os.path.join(output_dir, input_filename)} -ss {t_start} -to {t_stop} -c copy {os.path.join(output_dir, output_filename)}"
    os.system(cmd)
    return output_dir, output_filename

In [None]:
vid_key = input("YouTube video hash (e.g. 'ZmUENUZx2w0' for 'https://youtu.be/ZmUENUZx2w0'): ")
assert vid_key is not None and isinstance(vid_key, str) and len(vid_key) > 0, "We need the video hash here, please."

In [None]:
t_start = input("time starts (second), please leave it blank for None: ") or None
t_stop = input("time ends (second), please leave it blank for None: ") or None

t_start = int(t_start) if t_start else None
t_stop = int(t_stop) if t_stop else None

print(f'video start:{t_start} and end:{t_stop}')

In [None]:
output_dir = input("output directory e.g. '~/Downloads/Love_On_Top': ") or 'temp_out'
output_dir = os.path.expanduser(output_dir)
print(f'output_dir: {output_dir}')

In [None]:
embed_url = construct_embed_url(vid_key, t_start, t_stop)
print(embed_url)

In [None]:
# Display YouTube
print('Preview')
HTML('<iframe width="560" height="315" src=' + embed_url + 'frameborder="0" allowfullscreen></iframe>')

In [None]:
output_dir, download_filename = download_youtube_video_by_key(vid_key, output_dir=output_dir)
# Show a success message
print(f"The video has been downloaded to {os.path.join(output_dir, download_filename)}.")

In [None]:
# crop the clip
if (t_start is not None) or (t_stop is not None):
    output_dir, output_filename = crop_clip(output_dir, download_filename)
else:
    output_filename = download_filename

clip_path = os.path.join(output_dir, output_filename)
print(f'clip_path: {clip_path}')

### Extract mp3 out of video

In [None]:
from moviepy.video.io.VideoFileClip import VideoFileClip

In [None]:
def change_file_extension(input_path: str, new_ext: str):
    # construct output mp3 path
    path_tokens = input_path.split('.')
    path_tokens[-1] = new_ext
    output_path = '.'.join(path_tokens)
    return output_path
    
def extract_mp3_from_video(vid_input_path: str):
    mp3_output_path = change_file_extension(vid_input_path, 'mp3')
    print(f"input_path: {vid_input_path} -> output_path: {mp3_output_path}")

    # Extract the audio from the video
    video = VideoFileClip(vid_input_path)
    audio = video.audio

    # Save the audio to a file
    print('converting to mp3...')
    audio.write_audiofile(mp3_output_path)
    print(f'Writing to mp3 file {mp3_output_path} completed')
    
    return mp3_output_path

In [None]:
vid_input_path = clip_path
mp3_output_path = extract_mp3_from_video(vid_input_path)

In [None]:
# ### output image sequences (Taking way too long)
# video_dir_out = os.path.join(output_dir,'video')

# if not os.path.exists(video_dir_out):
#     os.makedirs(video_dir_out)
#     print("Directory created:", video_dir_out)
# else:
#     print("Directory already exists:", video_dir_out)
    
# video.write_images_sequence(os.path.join(video_dir_out, 'frame_%04d.png'), fps=30, withmask=False)

### convert mp3 to numpy array

This project leverage a lot from [stempeg](https://github.com/faroit/stempeg) when read / write multiple tracks.

In [None]:
import numpy as np
import stempeg
from pydub import AudioSegment

In [None]:
def mp3_to_wav(mp3_path: str, wav_path: str=None):
    if wav_path is None:
        wav_path = mp3_path.replace('.mp3', '.wav')
    print(f'wav output: {wav_path}')

    # Load the mp3 file
    mp3_audio = AudioSegment.from_file(mp3_path)

    # Export the audio as a wav file
    mp3_audio.export(wav_path, format='wav')
    
    return wav_path

In [None]:
wav_output_path = mp3_to_wav(mp3_output_path)

In [None]:
# convert from .wav to numpy array
audio_tensor, samplerate = stempeg.read_stems(
    wav_output_path,
    dtype=np.float32
)
display(Audio(audio_tensor.T, rate=samplerate))

# Run Source-Separattion Model

In [None]:
# source-separate audio
print('Separating the sources...')
estimates = predict.separate(
    torch.as_tensor(audio_tensor).float(),
    rate=samplerate,
    device=device
)

In [None]:
# Display each separate source
for target, estimate in estimates.items():
    print(target)
    display(Audio(estimate.detach().cpu().numpy()[0], rate=samplerate))

In [None]:
# prepare dictionary for each component
estimates_numpy = {}
for target, estimate in estimates.items():
    estimates_numpy[target] = torch.squeeze(estimate).detach().cpu().numpy().T

In [None]:
# write each component to mp3
stempeg.write_stems(
    (output_dir, ".mp3"),
    estimates_numpy,
    sample_rate=samplerate,
    writer=stempeg.FilesWriter(multiprocess=True, output_sample_rate=44100),
)

## Remix

In [None]:
# Make Karaoke -- sum everything except vocals
acc = np.sum(
    [audio.detach().cpu().numpy()[0] for target, audio in estimates.items() if not target=='vocals'],
    axis=0
)
print('Karaoke')
display(Audio(acc, rate=track.rate))

In [None]:
# Only vocals, drums and bass
remix = np.sum(
    [
        0.05 * estimates['vocals'].detach().cpu().numpy()[0],
        estimates['drums'].detach().cpu().numpy()[0],
        estimates['bass'].detach().cpu().numpy()[0],
    ],
    axis=0
)
print('Vocals + Drums + Bass')
display(Audio(remix, rate=track.rate))

In [None]:
# Write remix to mp3 on disk
stempeg.write_stems(
    (output_dir, ".mp3"),
    {"remix_vocals_drums_bass": remix.T},
    sample_rate=samplerate,
    writer=stempeg.FilesWriter(multiprocess=True, output_sample_rate=44100),
)

# Output the new video with new audio

In [None]:
# Merge audio to the video and write to disk
new_audio_path = os.path.join(output_dir, "remix_vocals_drums_bass.mp3")
output_video_path = os.path.join(output_dir, "output_video.mp4")
cmd = f"yes y | ffmpeg -i {vid_input_path} -i {new_audio_path} -map 0:v -map 1:a -c:v copy -c:a aac {output_video_path}"
os.system(cmd)
