# Imports

In [1]:
%load_ext extensions
%cd_repo_root

'/mnt/batch/tasks/shared/LS_root/mounts/clusters/rubchume2/code/Users/rubchume/VoiceCloningFakeAudioDetection'

In [2]:
from contextlib import contextmanager
import importlib
import os
from pathlib import Path
import random
import re
import sys
from typing import Iterable, List

from azure.ai.ml import MLClient
from azureml.core import Workspace, Dataset as AzureCoreDataset, Datastore
from IPython.display import Audio
from azure.identity import DefaultAzureCredential
import mlflow
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from pydub import AudioSegment
import pytorch_lightning as pl
import torch
import torchaudio
from torch.utils.data import Dataset
from transformers import AutoModelForAudioClassification, TrainingArguments, Trainer

import directory_structure

  return torch._C._cuda_getDeviceCount() if nvml_count < 0 else nvml_count


# Utilities

In [3]:
def reproduce_audio_file_with_pydub(audio_file):
    audio = AudioSegment.from_file(audio_file)
    display(audio)
    

def reproduce_audio_from_pcm_samples(pcm_samples: np.array, sample_rate: int):
    audio = Audio(data=pcm_samples, rate=sample_rate, autoplay=True)
    display(audio)
    

def get_relative_path(origin, destination):
    go_up_path = "../"
    
    origin_absolute = Path(origin).resolve()
    destination_absolute = Path(destination).resolve()
    
    common_path = Path(os.path.commonpath([origin_absolute, destination_absolute]))
    from_origin_to_common_path = Path(go_up_path * (len(origin_absolute.parts) - len(common_path.parts)))
    from_common_path_to_destination = destination_absolute.relative_to(common_path)
    return from_origin_to_common_path / from_common_path_to_destination


@contextmanager
def suppress_error_print():
    original_stderr = sys.stderr
    sys.stderr = open(os.devnull, 'w')
    try:
        yield
    finally:
        sys.stderr.close()
        sys.stderr = original_stderr


def parse_datastore_uri(datastore_uri):
    match_object = re.match(
        r"^azureml://subscriptions/[^/]+/resourcegroups/[^/]+/workspaces/[^/]+/datastores/(?P<datastore>[^/]+)/paths/(?P<relative_path>.+)$",
        datastore_uri
    )

    datastore_name = match_object.group("datastore")
    relative_path = match_object.group("relative_path")
    return datastore_name, relative_path


def mount_data_asset(data_asset):
    datastore_name, relative_path = parse_datastore_uri(data_asset.path)
    workspace = Workspace.from_config()
    datastore = Datastore.get(workspace, datastore_name)
    dataset = AzureCoreDataset.File.from_files(path=(datastore, relative_path))
    return dataset.mount()
    

@contextmanager
def mounted_data_asset(name, version=None, label="latest"):
    ml_client = MLClient.from_config(credential=DefaultAzureCredential())
    data_asset = ml_client.data.get(name=name, version=version, label=label)
    
    with suppress_error_print():
        mounted_path = mount_data_asset(data_asset)
        mounted_path.start()

    yield mounted_path.mount_point
    mounted_path.stop()

    
import itertools
import runpy


@contextmanager
def cli_arguments(**arguments):
    original_arguments = sys.argv
    sys.argv = kwargs_to_command_line_arguments(**arguments)
    try:
        yield
    finally:
        sys.argv = original_arguments


def kwargs_to_command_line_arguments(**kwargs):
    return [None] + list(itertools.chain.from_iterable([
        (f"--{key}", str(value))
        for key, value in kwargs.items()
    ]))


class WorkingDirectoryOn:
    def __init__(self, working_directory):
        self.working_directory = working_directory
        self.original_working_directory = os.getcwd()
        
    def __enter__(self):
        os.chdir(self.working_directory)
    
    def __exit__(self, exception_type, exception_value, traceback):
        os.chdir(self.original_working_directory)
        
        
@contextmanager
def relative_paths_from(origin, paths):
    yield (
        get_relative_path(origin, path)
        for path in paths
    )
    
    
def import_module(module_file):
    module_path = str(Path(module_file).with_suffix("")).replace("/", ".")
    data_module = importlib.import_module(module_path)
    importlib.reload(data_module)
    return data_module

# Setup

In [4]:
experiment_name = "classification_wav2vec"

project_source_path = directory_structure.classification_source_path / experiment_name
Path(project_source_path).mkdir(exist_ok=True, parents=True)

pipeline_path = directory_structure.job_definitions_path / experiment_name
Path(pipeline_path).mkdir(exist_ok=True, parents=True)

project_source_path_relative = get_relative_path(pipeline_path, project_source_path)

In [5]:
real_voices_path = directory_structure.data_path / "Common Voice Full/cv-corpus-15.0-2023-09-08/en"
real_voices_info_file = real_voices_path / "selected.csv"

real_info = pd.read_csv(real_voices_info_file).iloc[:, 0].map(
    lambda path: str(real_voices_path / "clips" / path)
)
cloned_info = pd.Series(
    [
        str(path)
        for path in (directory_structure.audio_output_path / "OOTB-YourTTS/TIMITexamples").glob("*.wav")
    ],
    name="path"
)

# Step: Prepare data

## Create script

In [6]:
prepare_common_voice_audio_list_script_name = "prepare_common_voice_audio_list.py"

In [7]:
%%writefile {project_source_path}/{prepare_common_voice_audio_list_script_name}
from pathlib import Path

import pandas as pd

from utils import make_command


@make_command
def main(common_voice_dataset, files_info_tsv, audio_files_csv):
    validated_tsv_path = Path(common_voice_dataset) / files_info_tsv
    pd.read_csv(validated_tsv_path, delimiter="\t").path.to_csv(audio_files_csv, header=False, index=False)
    
    
if __name__ == "__main__":
    main()

Overwriting src/classification/classification_wav2vec/prepare_common_voice_audio_list.py


## Test script locally

In [8]:
# sys.modules.pop("prepare_common_voice_audio_list", None)

# ml_client = MLClient.from_config(credential=DefaultAzureCredential())
# common_voice_data_asset = ml_client.data.get(name="CommonVoiceDeltaSegment15", label="latest")

# with mounted_data_asset(common_voice_data_asset) as common_voice_mount_path:
#     with WorkingDirectoryOn(project_source_path):
#         with relative_paths_from(
#             project_source_path,
#             ["temp/selected.csv"]
#         ) as (audio_files_csv,):
#             with cli_arguments(
#                 common_voice_dataset=common_voice_mount_path,
#                 files_info_tsv="en/validated.tsv",
#                 audio_files_csv=audio_files_csv
#             ):
#                 runpy.run_path(prepare_common_voice_audio_list_script_name, run_name='__main__')

## Create component definition

### Environment

In [9]:
prepare_data_environment_name = "prepare-common-voice-audio-list-environment"

In [10]:
%%rendertemplate {directory_structure.environments_path}/{prepare_data_environment_name}.yaml
$schema: https://azuremlschemas.azureedge.net/latest/environment.schema.json
name: [[prepare_data_environment_name]]
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
conda_file: conda.yaml
description: Environment created for data preparation in voice classification

'job_definitions/environments/prepare-common-voice-audio-list-environment.yaml'

In [11]:
# !az ml environment create --file {directory_structure.environments_path}/{prepare_data_environment_name}.yaml

### Component

In [12]:
prepare_data_component_name = "prepare_common_voice_audio_list.yaml"

In [13]:
%%rendertemplate {pipeline_path}/{prepare_data_component_name}
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command

name: prepare_common_voice_data_for_prediction
display_name: Prepare data for prediction

inputs:
    common_voice_dataset:
        type: uri_folder
        mode: ro_mount
    files_info_tsv:
        type: string
    
outputs:
    audio_files_csv:
        type: uri_file
        mode: rw_mount

code: [[project_source_path_relative]]
command: >-
    python [[prepare_common_voice_audio_list_script_name]]
    --common_voice_dataset ${{inputs.common_voice_dataset}}
    --files_info_tsv ${{inputs.files_info_tsv}}
    --audio_files_csv ${{outputs.audio_files_csv}}

environment: azureml:[[prepare_data_environment_name]]@latest

'job_definitions/classification_wav2vec/prepare_common_voice_audio_list.yaml'

# Step: Predictions

## Create script

In [24]:
%%writefile {project_source_path}/predict.py
import argparse
import inspect
import itertools
from pathlib import Path
import re
import shutil
import tempfile

from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential
import pandas as pd
from tqdm.notebook import tqdm
import torch
from torch.utils.data import DataLoader

from audio_binary_dataset import AudioDumbDataset
from cloned_audio_detector import ClonedAudioDetector
from utils import get_workspace, mounted_datastore, upload_files_to_datastore


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


def make_command(function):
    parser = argparse.ArgumentParser()
    for parameter_name, parameter in inspect.signature(function).parameters.items():
        parser.add_argument(f"--{parameter_name}", type=parameter.annotation if parameter.annotation != inspect._empty else None)
    
    def wrapper():
        args, unknown = parser.parse_known_args()
        non_null_args = {key: value for key, value in vars(args).items() if value is not None}
        return function(**non_null_args)
    
    return wrapper


def get_model(job_name, download_path):
    ws = get_workspace()
    ml_client = MLClient(
        credential=DefaultAzureCredential(),
        subscription_id=ws.subscription_id,
        resource_group_name=ws.resource_group,
        workspace_name=ws.name,
    )
    ml_client.jobs.download(
        name=job_name,
        output_name='checkpoint',
        download_path=download_path
    )
    checkpoint_path = Path(download_path) / "named-outputs" / "checkpoint" / "checkpoint"
    detector_loaded = ClonedAudioDetector.load_from_checkpoint(checkpoint_path=checkpoint_path , map_location=device)
    detector_loaded.eval();
    return detector_loaded


def get_file_batch_indices(file):
    match = re.match(r"^logits_batch_(\d+)_(\d+)$", Path(file).stem)
    if match:
        return match.groups()


def predict_macro_batch(model, dataset, predictions_directory, batch_size=100, macro_batch_size=10):
    with mounted_datastore(
        datastore_name="workspaceblobstore",
        relative_path=predictions_directory
    ) as predictions_path:
        files = pd.Series(Path(predictions_path).iterdir())
        
    batch_indices = pd.DataFrame(files.map(get_file_batch_indices).dropna().tolist(), columns=["batch_size", "batch_index"]).astype("int")
    
    batch_indices_of_size = batch_indices[batch_indices.batch_size == batch_size].batch_index
    if len(batch_indices_of_size) > 0:
        last_index = batch_indices_of_size.sort_values(ascending=False).iloc[0]
    else:
        last_index = -1
    
    batch_start_index = last_index + 1
    batch_end_index = batch_start_index + macro_batch_size

    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
    iterable = itertools.islice(dataloader, batch_start_index, batch_end_index)

    Path("temp").mkdir(exist_ok=True)
    for batch_index, batch in enumerate(tqdm(iterable, total=macro_batch_size), start=batch_start_index):
        batch_logits = model.forward(batch.to(device))
        pd.DataFrame(batch_logits.cpu().detach().numpy()).to_csv(
            Path("temp") / f"logits_batch_{batch_size}_{batch_index}.csv",
            index=False,
            header=False
        )
    upload_files_to_datastore("workspaceblobstore", predictions_directory, "temp", "*.csv")
    shutil.rmtree("temp", ignore_errors=True)
        

@make_command
def main(job_name, model_download_path, data_path, audio_files_csv, audio_files_folder, predictions_path):
    audio_files_folder = Path(data_path) / audio_files_folder
    audio_files_csv = audio_files_csv
    
    audio_files = pd.read_csv(str(audio_files_csv)).iloc[:, 0].map(
        lambda path: str(Path(audio_files_folder) / path)
    )
    
    detector_loaded = get_model(job_name, model_download_path)
    dumb_dataset = AudioDumbDataset(audio_files, 16000, 64000)
    predict_macro_batch(detector_loaded, dumb_dataset, predictions_path, batch_size=10, macro_batch_size=10)

    
if __name__=="__main__":
    main()

Overwriting src/classification/classification_wav2vec/predict.py


## Test script locally

In [15]:
# sys.modules.pop("audio_binary_dataset", None)
# sys.modules.pop("cloned_audio_detector", None)
# sys.modules.pop("utils", None)
# sys.modules.pop("predict", None)

# # predictions_path = directory_structure.training_artifacts_path / "predictions"
# # predictions_path.mkdir(exist_ok=True)
# audio_files_csv = "temp/selected.csv"

# with mounted_data_asset(name="CommonVoiceDeltaSegment15", label="latest") as common_voice_mount_path:
#     with WorkingDirectoryOn(project_source_path):
#         with relative_paths_from(
#             project_source_path,
#             [
#                 # predictions_path,
#                 directory_structure.models_path,
#                 audio_files_csv,
#             ]
#         ) as (
#             # predictions_path_relative,
#             model_download_path,
#             audio_files_csv_relative,
#         ):
#             with cli_arguments(
#                 job_name="dynamic_yacht_mgxs2hytb1",
#                 model_download_path=model_download_path,
#                 data=common_voice_mount_path,
#                 audio_files_csv=audio_files_csv_relative,
#                 audio_files_folder="en/clips/",
#                 predictions_path="Predictions/CommonVoiceDelta15"
#             ):
#                 runpy.run_path(f"predict.py", run_name='__main__')

## Create component definition

### Environment

In [16]:
predict_environment_name = "predict-environment"

In [17]:
%%rendertemplate {directory_structure.environments_path}/{predict_environment_name}.yaml
$schema: https://azuremlschemas.azureedge.net/latest/environment.schema.json
name: [[predict_environment_name]]
image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
conda_file: conda.yaml
description: Environment created for data preparation in voice classification

'job_definitions/environments/predict-environment.yaml'

In [18]:
# !az ml environment create --file {directory_structure.environments_path}/{predict_environment_name}.yaml

### Component

In [19]:
%%rendertemplate {pipeline_path}/prediction_job.yaml
$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json
name: predict
display_name: Predict audio file type

code: [[project_source_path_relative]]
command: >-
    python predict.py 
    --job_name=${{inputs.job_name}}
    --model_download_path="model"
    --data_path=${{inputs.audio_dataset}}
    --audio_files_csv=${{inputs.files_csv}}
    --audio_files_folder=${{inputs.audio_files_folder}}
    --predictions_path=${{inputs.predictions_path}}

inputs:
    job_name:
        type: string
    audio_dataset:
        type: uri_folder
        mode: ro_mount
    files_csv:
        type: uri_file
        mode: ro_mount
    audio_files_folder:
        type: string
    predictions_path:
        type: string
        
environment: azureml:[[predict_environment_name]]@latest

'job_definitions/classification_wav2vec/prediction_job.yaml'

# Pipeline

In [20]:
%%rendertemplate {pipeline_path}/prediction_pipeline.yaml
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline

experiment_name: ClonedVoiceDetectorPrediction
display_name: ClonedVoiceDetectorPrediction
description: Pipeline for cloned voice detection prediction

settings:
    # default_compute: azureml:[[cheap_compute_name]]
    default_compute: azureml:compute-cluster
    
inputs:
    common_voice_dataset:
        type: uri_folder
        path: azureml:CommonVoiceDeltaSegment15@latest
        mode: ro_mount
    training_job_name: "dynamic_yacht_mgxs2hytb1"

jobs:
    prepare_audio_files:
        type: command
        component: [[prepare_data_component_name]]
        inputs:
            common_voice_dataset: ${{parent.inputs.common_voice_dataset}}
            files_info_tsv: "en/validated.tsv"
        outputs:
            audio_files_csv:
                mode: upload
    predict:
        type: command
        component: prediction_job.yaml
        inputs:
            job_name: ${{parent.inputs.training_job_name}}
            audio_dataset: ${{parent.inputs.common_voice_dataset}}
            files_csv: ${{parent.jobs.prepare_audio_files.outputs.audio_files_csv}}
            audio_files_folder: "en/clips/"
            predictions_path: "Predictions/CommonVoiceDelta15"

'job_definitions/classification_wav2vec/prediction_pipeline.yaml'

# Execute pipeline

In [23]:
!az ml job create --file {pipeline_path}/prediction_pipeline.yaml

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
[32mUploading classification_wav2vec 