# Separated.py 소개

In [None]:
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.

import argparse
import sys
from pathlib import Path
import subprocess

from dora.log import fatal
import torch as th
import torchaudio as ta

from .apply import apply_model, BagOfModels
from .audio import AudioFile, convert_audio, save_audio
from .pretrained import get_model_from_args, add_model_flags, ModelLoadingError


def load_track(track, device, audio_channels, samplerate):
    errors = {}
    wav = None

    try:
        wav = AudioFile(track).read(
            streams=0,
            samplerate=samplerate,
            channels=audio_channels).to(device)
    except FileNotFoundError:
        errors['ffmpeg'] = 'Ffmpeg is not installed.'
    except subprocess.CalledProcessError:
        errors['ffmpeg'] = 'FFmpeg could not read the file.'

    if wav is None:
        try:
            wav, sr = ta.load(str(track))
        except RuntimeError as err:
            errors['torchaudio'] = err.args[0]
        else:
            wav = wav.to(device)
            wav = convert_audio(wav, sr, samplerate, audio_channels)

    if wav is None:
        print(f"Could not load file {track}. "
              "Maybe it is not a supported file format? ")
        for backend, error in errors.items():
            print(f"When trying to load using {backend}, got the following error: {error}")
        sys.exit(1)
    return wav


def main():
    parser = argparse.ArgumentParser("demucs.separate",
                                     description="Separate the sources for the given tracks")
    parser.add_argument("tracks", nargs='+', type=Path, default=[], help='Path to tracks')
    add_model_flags(parser)
    parser.add_argument("-v", "--verbose", action="store_true")
    parser.add_argument("-o",
                        "--out",
                        type=Path,
                        default=Path("separated"),
                        help="Folder where to put extracted tracks. A subfolder "
                        "with the model name will be created.")
    parser.add_argument("-d",
                        "--device",
                        default="cuda" if th.cuda.is_available() else "cpu",
                        help="Device to use, default is cuda if available else cpu")
    parser.add_argument("--shifts",
                        default=1,
                        type=int,
                        help="Number of random shifts for equivariant stabilization."
                        "Increase separation time but improves quality for Demucs. 10 was used "
                        "in the original paper.")
    parser.add_argument("--overlap",
                        default=0.25,
                        type=float,
                        help="Overlap between the splits.")
    parser.add_argument("--no-split",
                        action="store_false",
                        dest="split",
                        default=True,
                        help="Doesn't split audio in chunks. This can use large amounts of memory.")
    parser.add_argument("--mp3", action="store_true",
                        help="Convert the output wavs to mp3.")
    parser.add_argument("--mp3-bitrate",
                        default=320,
                        type=int,
                        help="Bitrate of converted mp3.")

    args = parser.parse_args()

    try:
        model = get_model_from_args(args)
    except ModelLoadingError as error:
        fatal(error.args[0])

    if isinstance(model, BagOfModels):
        print(f"Selected model is a bag of {len(model.models)} models. "
              "You will see that many progress bars per track.")
    model.to(args.device)
    model.eval()

    out = args.out / args.name
    out.mkdir(parents=True, exist_ok=True)
    print(f"Separated tracks will be stored in {out.resolve()}")
    for track in args.tracks:
        if not track.exists():
            print(
                f"File {track} does not exist. If the path contains spaces, "
                "please try again after surrounding the entire path with quotes \"\".",
                file=sys.stderr)
            continue
        print(f"Separating track {track}")
        wav = load_track(track, args.device, model.audio_channels, model.samplerate)

        ref = wav.mean(0)
        wav = (wav - ref.mean()) / ref.std()
        sources = apply_model(model, wav[None], shifts=args.shifts, split=args.split,
                              overlap=args.overlap, progress=True)[0]
        sources = sources * ref.std() + ref.mean()

        track_folder = out / track.name.rsplit(".", 1)[0]
        track_folder.mkdir(exist_ok=True)
        for source, name in zip(sources, model.sources):
            source = source.cpu()
            stem = str(track_folder / name)
            if args.mp3:
                stem += ".mp3"
            else:
                stem += ".wav"
            save_audio(source, stem, model.samplerate)


if __name__ == "__main__":
    main()


# 해석

In [None]:
def main():
    
    # command line에 입력할 옵션들의 **집합 생성**
    parser = argparse.ArgumentParser("demucs.separate",
                                     description="Separate the sources for the given tracks")
    # 옵션 집합 parser에 노래(track) 경로 옵션 추가
    parser.add_argument("tracks", nargs='+', type=Path, default=[], help='Path to tracks')
    
    # 아래 해석-1 셀 참조
    # 추가적인 훈련한 mdx 양자화 모델을 default로 사용
    add_model_flags(parser)
    
    # 옵션 추가 (주요 옵션만 설명)
    # -d : cpu or cuda(=default)
    # --mp3 : output을 mp3로 변환 (default는 wav)
    parser.add_argument("-v", "--verbose", action="store_true")
    parser.add_argument("-o",
                        "--out",
                        type=Path,
                        default=Path("separated"),
                        help="Folder where to put extracted tracks. A subfolder "
                        "with the model name will be created.")
    parser.add_argument("-d",
                        "--device",
                        default="cuda" if th.cuda.is_available() else "cpu",
                        help="Device to use, default is cuda if available else cpu")
    parser.add_argument("--shifts",
                        default=1,
                        type=int,
                        help="Number of random shifts for equivariant stabilization."
                        "Increase separation time but improves quality for Demucs. 10 was used "
                        "in the original paper.")
    parser.add_argument("--overlap",
                        default=0.25,
                        type=float,
                        help="Overlap between the splits.")
    parser.add_argument("--no-split",
                        action="store_false",
                        dest="split",
                        default=True,
                        help="Doesn't split audio in chunks. This can use large amounts of memory.")
    parser.add_argument("--mp3", action="store_true",
                        help="Convert the output wavs to mp3.")
    parser.add_argument("--mp3-bitrate",
                        default=320,
                        type=int,
                        help="Bitrate of converted mp3.")
    # 입력 받은 커맨드 옵션을 args 변수로 생성
    args = parser.parse_args()

    try:
        # 요약: AWS는 모델 4개 로드, 로컬은 정하기 나름
        # 해석-2 셀 참조
        model = get_model_from_args(args)
    except ModelLoadingError as error:
        fatal(error.args[0])
    
    # 모델 몇개 로드했는지 검사 및 프린트
    if isinstance(model, BagOfModels):
        print(f"Selected model is a bag of {len(model.models)} models. "
              "You will see that many progress bars per track.")
    # default는 cuda
    model.to(args.device)
    model.eval()
    
    # 결과물 저장 폴더 경로 / 모델 이름 
    out = args.out / args.name
    out.mkdir(parents=True, exist_ok=True)
    print(f"Separated tracks will be stored in {out.resolve()}")
    
    # 입력 받은 노래(track)가 1개 이상이어도 처리할 수 있게끔 for loop 사용
    for track in args.tracks:
        if not track.exists():
            print(
                f"File {track} does not exist. If the path contains spaces, "
                "please try again after surrounding the entire path with quotes \"\".",
                file=sys.stderr)
            continue
        print(f"Separating track {track}")
        #### 여기부터 중요 ###########################################################
        # AWS pretrained 모델의 경우 2채널, 44100sr 사용해서 wave 로드
        # 로컬은 정하기 나름. wave 로드
        # 해석-5 셀 참조
        wav = load_track(track, args.device, model.audio_channels, model.samplerate)
        
        # wav 정규화
        ref = wav.mean(0)
        wav = (wav - ref.mean()) / ref.std()
        
        # 해석-6 셀 참조
        # 요약 : 8초 단위로 wave 잘라서 model 실행
        sources = apply_model(model, wav[None], shifts=args.shifts, split=args.split,
                              overlap=args.overlap, progress=True)[0]
        # output 정규화 해제(원 데이터와 같은 scale로 복구)
        sources = sources * ref.std() + ref.mean()
        
        # 저장할 경로 설정
        track_folder = out / track.name.rsplit(".", 1)[0]
        track_folder.mkdir(exist_ok=True)
        for source, name in zip(sources, model.sources):
            source = source.cpu()
            stem = str(track_folder / name)
            if args.mp3:
                stem += ".mp3"
            else:
                stem += ".wav"
            save_audio(source, stem, model.samplerate)

## 해석-1

In [None]:
def add_model_flags(parser):
    # 아래 입력한 옵션(로컬 모델 or AWS pretrained 모델) 중 하나만 쓸 수 있음. 두 개 입력시 error 발생.
    # 아무것도 입력하지 않으면, default는 -n의 mdx_extra_q 모델임
    # https://docs.python.org/ko/3/library/argparse.html
    group = parser.add_mutually_exclusive_group(required=False)
    group.add_argument("-s", "--sig", help="Locally trained XP signature.")
    group.add_argument("-n", "--name", default="mdx_extra_q",
                       help="Pretrained model name or signature. Default is mdx_extra_q.")
    parser.add_argument("--repo", type=Path,
                        help="Folder containing all pre-trained models for use with -n.")


## 해석-2

In [None]:
# add_model_flags에서 정한 옵션에 따라 모델을 불러올 수 있게 함수 생성
# 로컬 모델 vs AWS에 있는 pretrained 모델 
# repo 정보 추가
def get_model_from_args(args):
    """
    Load local model package or pre-trained model.
    """
    return get_model(name=args.name, repo=args.repo)


def get_model(name: str,
              repo: tp.Optional[Path] = None):
    """`name` must be a bag of models name or a pretrained signature
    from the remote AWS model repo or the specified local repo if `repo` is not None.
    """
    if name == 'demucs_unittest':
        return demucs_unittest()
    model_repo: ModelOnlyRepo
        
    # repo를 지정하지 않는다면 AWS에서 pretrained 모델을 가져온다
    if repo is None:
        remote_files = [line.strip()
                        for line in (REMOTE_ROOT / 'files.txt').read_text().split('\n')
                        if line.strip()]
        model_repo = RemoteRepo(ROOT_URL, remote_files)
        # 해석-3 셀 참조
        # 요약 : 모델 4개 불러오기 + 모델 소스/채널/sr 같은지 검사
        bag_repo = BagOnlyRepo(REMOTE_ROOT, model_repo)
    
    # 로컬 모델이어서 repo가 있다면 경로를 가져와서 모델 불러온다
    else:
        if not repo.is_dir():
            fatal(f"{repo} must exist and be a directory.")
        model_repo = LocalRepo(repo)
        bag_repo = BagOnlyRepo(repo, model_repo)
    any_repo = AnyModelRepo(model_repo, bag_repo)
    return any_repo.get_model(name)

# 해석-3

In [None]:
class BagOnlyRepo:
    """Handles only YAML files containing bag of models, leaving the actual
    model loading to some Repo.
    """
    def __init__(self, root: Path, model_repo: ModelOnlyRepo):
        self.root = root
        self.model_repo = model_repo
        self.scan()
    
    # 아래 셀에 논리흐름 기재했음
    def scan(self):
        self._bags = {}
        # root 폴더 경로에서 파일 명을 generator 형식(listdir와 다른점)으로 읽어 들인다
        for file in self.root.iterdir():
            if file.suffix == '.yaml':
                # AWS 서버의 yaml 파일 안의 stem 옵션을 index로 준다
                # 그 인덱스에 yaml 파일을 삽입한다
                self._bags[file.stem] = file

    def has_model(self, name: str) -> bool:
        return name in self._bags

    def get_model(self, name: str) -> BagOfModels:
        try:
            yaml_file = self._bags[name]
        except KeyError:
            raise ModelLoadingError(f'{name} is neither a single pre-trained model or '
                                    'a bag of models.')
        bag = yaml.safe_load(open(yaml_file))
        signatures = bag['models']
        # AWS는 모델 4개 사용
        models = [self.model_repo.get_model(sig) for sig in signatures]
        weights = bag.get('weights')
        segment = bag.get('segment')
        # 해석-4 셀 참조
        return BagOfModels(models, weights, segment)

In [9]:
import collections
from pathlib import Path

path = Path('.')
list(path.iterdir())

[PosixPath('.DS_Store'),
 PosixPath('Frequency Positional Encoding - Test.ipynb'),
 PosixPath('corrupted excel file.ipynb'),
 PosixPath('Positional encoding.ipynb'),
 PosixPath('Separated 코드 분석.ipynb'),
 PosixPath('2021년 공문접수(맟춤).xlsx'),
 PosixPath('.ipynb_checkpoints'),
 PosixPath('aliasing test.ipynb')]

# 해석-4

In [None]:
class BagOfModels(nn.Module):
    def __init__(self, models: tp.List[Model],
                 weights: tp.Optional[tp.List[tp.List[float]]] = None,
                 segment: tp.Optional[float] = None):
        """
        Represents a bag of models with specific weights.
        You should call `apply_model` rather than calling directly the forward here for
        optimal performance.

        Args:
            models (list[nn.Module]): list of Demucs/HDemucs models.
            weights (list[list[float]]): list of weights. If None, assumed to
                be all ones, otherwise it should be a list of N list (N number of models),
                each containing S floats (S number of sources).
            segment (None or float): overrides the `segment` attribute of each model
                (this is performed inplace, be careful is you reuse the models passed).
        """
        super().__init__()
        assert len(models) > 0
        first = models[0]
        
        # 핵심은 여러 개의 모델이 모두 같은
        # 소스, sr, 채널을 같게끔 조정하는 것
        for other in models:
            assert other.sources == first.sources
            assert other.samplerate == first.samplerate
            assert other.audio_channels == first.audio_channels
            if segment is not None:
                other.segment = segment

        self.audio_channels = first.audio_channels
        self.samplerate = first.samplerate
        self.sources = first.sources
        self.models = nn.ModuleList(models)

        if weights is None:
            weights = [[1. for _ in first.sources] for _ in models]
        else:
            assert len(weights) == len(models)
            for weight in weights:
                assert len(weight) == len(first.sources)
        self.weights = weights

    def forward(self, x):
        raise NotImplementedError("Call `apply_model` on this.")

# 해석-5

In [None]:
def load_track(track, device, audio_channels, samplerate):
    errors = {}
    wav = None

    try:
        wav = AudioFile(track).read(
            streams=0,
            samplerate=samplerate,
            channels=audio_channels).to(device)
    except FileNotFoundError:
        errors['ffmpeg'] = 'Ffmpeg is not installed.'
    except subprocess.CalledProcessError:
        errors['ffmpeg'] = 'FFmpeg could not read the file.'

    if wav is None:
        try:
            # 오류 나도 한 번 더 기회줘라
            # track이 str type이 아니어서 오류난 것일 수 있다
            wav, sr = ta.load(str(track))
        except RuntimeError as err:
            errors['torchaudio'] = err.args[0]
        else:
            wav = wav.to(device)
            wav = convert_audio(wav, sr, samplerate, audio_channels)
    # 여러 경우를 고려해봤는데 이건 loading 에러다
    if wav is None:
        print(f"Could not load file {track}. "
              "Maybe it is not a supported file format? ")
        for backend, error in errors.items():
            print(f"When trying to load using {backend}, got the following error: {error}")
        sys.exit(1)
    return wav

# 해석-6

In [None]:
# 모델 실행 + command에서 progress bar 표시
def apply_model(model, mix, shifts=1, split=True,
                overlap=0.25, transition_power=1., progress=False):
    """
    Apply model to a given mixture.

    Args:
        shifts (int): if > 0, will shift in time `mix` by a random amount between 0 and 0.5 sec
            and apply the oppositve shift to the output. This is repeated `shifts` time and
            all predictions are averaged. This effectively makes the model time equivariant
            and improves SDR by up to 0.2 points.
        split (bool): if True, the input will be broken down in 8 seconds extracts
            and predictions will be performed individually on each and concatenated.
            Useful for model with large memory footprint like Tasnet.
        progress (bool): if True, show a progress bar (requires split=True)
    """
    # 모델이 여러개일 경우 for loop를 통해 일일이 계산하도록 세팅
    if isinstance(model, BagOfModels):
        # Special treatment for bag of model.
        # We explicitely apply multiple times `apply_model` so that the random shifts
        # are different for each model.
        estimates = 0
        totals = [0] * len(model.sources)
        for sub_model, weight in zip(model.models, model.weights):
            out = apply_model(
                sub_model, mix,
                shifts=shifts, split=split, overlap=overlap,
                transition_power=transition_power, progress=progress)
            # 이전 단계의 가중치로 다음 단계 out 계산
            for k in range(out.shape[0]):
                out[k] *= weight[k]
                totals[k] += weight[k]
            estimates += out
        for k in range(estimates.shape[0]):
            estimates[k] /= totals[k]
        return estimates

    assert transition_power >= 1, "transition_power < 1 leads to weird behavior."
    device = mix.device
    batch, channels, length = mix.shape
    
    # wave 8초 단위로 잘라서 계산 진행
    if split:
        out = th.zeros(batch, len(model.sources), channels, length, device=device)
        sum_weight = th.zeros(length, device=device)
        segment = int(model.samplerate * model.segment)
        stride = int((1 - overlap) * segment)
        offsets = range(0, length, stride)
        scale = stride / model.samplerate
        if progress:
            offsets = tqdm.tqdm(offsets, unit_scale=scale, ncols=120, unit='seconds')
        # We start from a triangle shaped weight, with maximal weight in the middle
        # of the segment. Then we normalize and take to the power `transition_power`.
        # Large values of transition power will lead to sharper transitions.
        
        # 가중치 결합
        weight = th.cat([th.arange(1, segment // 2 + 1),
                         th.arange(segment - segment // 2, 0, -1)]).to(device)
        assert len(weight) == segment
        # If the overlap < 50%, this will translate to linear transition when
        # transition_power is 1.
        weight = (weight / weight.max())**transition_power
        for offset in offsets:
            chunk = TensorChunk(mix, offset, segment)
            chunk_out = apply_model(model, chunk, shifts=shifts, split=False)
            chunk_length = chunk_out.shape[-1]
            out[..., offset:offset + segment] += weight[:chunk_length] * chunk_out
            sum_weight[offset:offset + segment] += weight[:chunk_length]
            offset += segment
        assert sum_weight.min() > 0
        out /= sum_weight
        return out
    
    # frequency 관련 추가 설정임
    # default는 0.1
    elif shifts:
        max_shift = int(0.5 * model.samplerate)
        mix = tensor_chunk(mix)
        padded_mix = mix.padded(length + 2 * max_shift)
        out = 0
        for _ in range(shifts):
            offset = random.randint(0, max_shift)
            shifted = TensorChunk(padded_mix, offset, length + max_shift - offset)
            shifted_out = apply_model(model, shifted, shifts=0, split=False)
            out += shifted_out[..., max_shift - offset:]
        out /= shifts
        return out
    else:
        if hasattr(model, 'valid_length'):
            valid_length = model.valid_length(length)
        else:
            valid_length = length
        mix = tensor_chunk(mix)
        padded_mix = mix.padded(valid_length)
        with th.no_grad():
            out = model(padded_mix)
        return center_trim(out, length)