In [2]:
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(processes=False, n_workers=3)
client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 60932 instead
  http_address["port"], self.http_server.port


0,1
Client  Scheduler: inproc://192.168.0.28/16140/1  Dashboard: http://192.168.0.28:60932/status,Cluster  Workers: 3  Cores: 6  Memory: 16.86 GB


In [3]:
import sys
sys.path.append("..")  # Adds higher directory to python modules path.
from ecosound.core.audiotools import Sound
from ecosound.core.spectrogram import Spectrogram
from ecosound.core.annotation import Annotation
from ecosound.detection.detector_builder import DetectorFactory
from ecosound.visualization.grapher_builder import GrapherFactory
from ecosound.measurements.measurer_builder import MeasurerFactory
import ecosound.core.tools
import time
import os

In [4]:
def run_detector(infile, outdir):
    ## Input paraneters ##########################################################   
    
    
    # Spectrogram parameters
    frame = 3000
    nfft = 4096
    step = 500
    fmin = 0
    fmax = 1000
    window_type = 'hann'

    # start and stop time of wavfile to analyze
    #t1 = 0 # 24
    #t2 = 60 # 40
    ## ###########################################################################
    outfile = os.path.join(outdir, os.path.split(file)[1]+'.nc')
    
    if os.path.exists(outfile) is False:
        # load audio data
        sound = Sound(infile)
        #sound.read(channel=0, chunk=[t1, t2], unit='sec')
        sound.read(channel=0, unit='sec')
        # Calculates  spectrogram
        spectro = Spectrogram(frame, window_type, nfft, step, sound.waveform_sampling_frequency, unit='samp')
        spectro.compute(sound, dB=True, use_dask=True, dask_chunks=40)
        # Crop unused frequencies
        spectro.crop(frequency_min=fmin, frequency_max=fmax, inplace=True)
        # Denoise
        spectro.denoise('median_equalizer',
                        window_duration=3,
                        use_dask=True,
                        dask_chunks=(2048,1000),
                        inplace=True)
        # Detector
        file_timestamp = ecosound.core.tools.filename_to_datetime(infile)[0]
        detector = DetectorFactory('BlobDetector',
                                   kernel_duration=0.1,
                                   kernel_bandwidth=300,
                                   threshold=10,
                                   duration_min=0.05,
                                   bandwidth_min=40)
        detections = detector.run(spectro,
                                  start_time=file_timestamp,
                                  use_dask=True,
                                  dask_chunks=(2048,1000),
                                  debug=False)
        # Maasurements
        spectro_features = MeasurerFactory('SpectrogramFeatures', resolution_time=0.001, resolution_freq=0.1, interp='linear')
        measurements = spectro_features.compute(spectro,
                                                detections,
                                                debug=False,
                                                verbose=False,
                                                use_dask=True)
        measurements.to_netcdf(outfile)
    else:
        print('Recording already processed.')
        

In [5]:
indir = r'C:\Users\xavier.mouy\Documents\PhD\Projects\Dectector\datasets'
outdir=r'C:\Users\xavier.mouy\Documents\PhD\Projects\Dectector\results\Full_dataset2'
ext='.wav'

files = ecosound.core.tools.list_files(indir,
                                        ext,
                                        recursive=True,
                                        case_sensitive=True)

for idx,  file in enumerate(files):
    print(str(idx)+r'/'+str(len(files))+': '+ file)
    try:
        tic = time.perf_counter()
        run_detector(file, outdir)
        toc = time.perf_counter()
    except:
        print('ERROR HERE --------------------------------------')
            
    print(f"Executed in {toc - tic:0.4f} seconds")

0/229: C:\Users\xavier.mouy\Documents\PhD\Projects\Dectector\datasets\DFO_snake-island_rca-in_20181017\audio_data\67674121.181017000806.wav


  (array([0.01400757, 0.0140686 , 0.01409912, ..., 0 ... 162500]), 4096)
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good
  % (format_bytes(len(b)), s)


ERROR HERE --------------------------------------

distributed.utils - ERROR - Timed out trying to connect to 'inproc://192.168.0.28/16140/4' after 10 s: Timed out trying to connect to 'inproc://192.168.0.28/16140/4' after 10 s: connect() didn't finish in time
Traceback (most recent call last):
  File "C:\Users\xavier.mouy\Anaconda3\envs\ecosound\lib\site-packages\distributed\comm\core.py", line 232, in connect
    _raise(error)
  File "C:\Users\xavier.mouy\Anaconda3\envs\ecosound\lib\site-packages\distributed\comm\core.py", line 213, in _raise
    raise IOError(msg)
OSError: Timed out trying to connect to 'inproc://192.168.0.28/16140/4' after 10 s: connect() didn't finish in time

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\xavier.mouy\Anaconda3\envs\ecosound\lib\site-packages\distributed\utils.py", line 664, in log_errors
    yield
  File "C:\Users\xavier.mouy\Anaconda3\envs\ecosound\lib\site-packages\distributed\dashboard\components\shared.py", line 325, in

KeyboardInterrupt: 

In [4]:
cluster.close()


NameError: name 'cluster' is not defined

In [6]:
client.close()

NameError: name 'client' is not defined