# Parallel implementation of pulse_finder_species_set with torch Dataset
In this notebook we create a class with torch.utils.data.Dataset as a parent

This class implements `__len__` and `__getitem__` so that we can analyze files in parallel


The analysis happens within the `__getitem__` implementation

After initializing the PulseFinder object, we create a DataLoader to run analysis in parallel

In [12]:
import torch
import numpy as np
import pandas as pd
from os.path import basename
import sys

from opensoundscape.audio import Audio
from opensoundscape.spectrogram import Spectrogram
from opensoundscape.pulse_finder import pulse_finder_species_set, summarize_top_scores

In [2]:
from librosa import get_duration
from itertools import chain

In [24]:
class PulseFinder(torch.utils.data.Dataset):
    """A torch Dataset child which takes a list of audio files and species table, and creates """
    def __init__(self, audio_paths,species_table,out_dir):
        self.audio_paths = audio_paths
        self.species_df = pd.read_csv(species_table)
        self.out_dir = out_dir
        self.results = [None] * len(self.audio_paths) #list of paths to result tables saved by __getitem__
        
    def __len__(self):
        return len(self.audio_paths)

    def __getitem__(self, item_idx): #do all the processing here
        """returns path to saved results table. also appends that path to self.results"""
        
        try:
            audio_path = self.audio_paths[item_idx]

            audio = Audio(audio_path,sample_rate=32000)

            spectrogram = Spectrogram.from_audio(audio,segment_length=256)

            result_df = pulse_finder_species_set(spectrogram,self.species_df.copy())

            #save result dataframe to a file
            output_location = f'{self.out_dir}/{basename(audio_path)}_prdf.csv'
            result_df.to_csv(output_location)

    #         self.results[item_idx] = output_location

            return {'data':[output_location]}
        
        except Exception as e:
            sys.stderr.write(f'exception on file {audio_path}. {e} \n')
            return {'data':[]}
        

def collate_fn(batch):                                                                                                                                                                     
    return chain.from_iterable([x["data"] for x in batch])

def summarize(list_of_df_paths):
    """returns a file vs species-top-score df with all files this object has analyzed"""
    results_dfs = [pd.read_csv(t) for t in self.results if t is not None]
    complete_audio_paths = [self.audio_paths[i] for i in range(len(self)) if self.results[i] is not None]
    return summarize_top_scores(complete_audio_paths,results_dfs)



In [25]:
a = Audio('/Volumes/lacie/projects1/Sam-Lapp/opensoundscape/tests/silence_10s.mp3',sample_rate=32000)



In [26]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [27]:
audio_paths = ['/Volumes/lacie/projects1/Sam-Lapp/opensoundscape/tests/great_plains_toad.wav',
               '/Volumes/lacie/projects1/Sam-Lapp/opensoundscape/tests/silence_10s.mp3',
              '/Volumes/lacie/projects1/Sam-Lapp/opensoundscape/tests/idontexist.mp3']
out_dir = '/Volumes/lacie/projects1/Sam-Lapp/opensoundscape/output'
species_table = '/Volumes/lacie/projects1/Sam-Lapp/DAAN9/tables/ampr_frog_pulse_rates_20200406.csv'

In [18]:
audio = Audio(audio_paths[0],sample_rate=32000)
spectrogram = Spectrogram.from_audio(audio,segment_length=256)


  spectrogram = 10 * np.log10(spectrogram)


we initialize the PulseFinder object with all of the audio files we want to analyze

In [19]:
pulse_finder_obj = PulseFinder(audio_paths,species_table,out_dir)

we then create a DataLoader to manage parallelization

In [20]:
batch_size = 1 # number of parallel analyses on each cpu
num_workers = 1 # number of cpu's to use

#dataloader will return the outputs of PulseFinder's __getitem__ in a *list* 
dataloader = torch.utils.data.DataLoader(pulse_finder_obj, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=collate_fn)

we can retrieve the results by requesting that dataloader process them simply by iterating on dataloader (`in` operator)

In [22]:
returns = []
for outputs in dataloader:
    for out in outputs:
        returns.append(out)
returns

exception on file /Volumes/lacie/projects1/Sam-Lapp/opensoundscape/tests/idontexist.mp3. Error: The file /Volumes/lacie/projects1/Sam-Lapp/opensoundscape/tests/idontexist.mp3 doesn't exist? 


['/Volumes/lacie/projects1/Sam-Lapp/opensoundscape/output/great_plains_toad.wav_prdf.csv',
 '/Volumes/lacie/projects1/Sam-Lapp/opensoundscape/output/silence_10s.mp3_prdf.csv']

In [70]:
df = pulse_finder_obj.summarize()