[`spikesorters_docker`](https://github.com/catalystneuro/spikesorters_docker/) is a test repo for trying to use dockerized environments for spike sorters within spikeinterface, using the [`hither`](https://github.com/flatironinstitute/hither) module. If this works, it would alleviate a headaches from installing different spikesorters coming from dependency issues or lacking Matlab licenses. 

### "Tutorial" script from github page

In [1]:
import spikeextractors as se
import spikesorters_docker as ss

# create a dumpable test example
rec, _ = se.example_datasets.toy_example(dumpable=True)

# run sorter in Docker container
ss.run_klusta(rec, output_folder="klusta_docker", use_docker=True) 

# by default, the following docker images are used
print(ss.default_docker_images)

09:52:25 [I] klustakwik KlustaKwik2 version git-7d8e9fa2-dirty
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  if isinstance(start_frame, (float, np.float)):
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  if isinstance(end_frame, (float, np.float)) and np.isfinite(end_frame):
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  elif isinstance(v, (np.int, np.int32, np.int64)):
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  elif isinstance(v, (np.float, np.float32, np.float64)):


Running in docker image docker://spikeinterface/klusta:0.1.0
HITHER JOBS: 1 pending; 0 queued; 0 running; 0 finished; 0 errored; 0 cache hits
Elapsed time for manage-pending-jobs: 15.561959981918335 sec
HITHER JOBS: 0 pending; 0 queued; 0 running; 1 finished; 0 errored; 0 cache hits
{'klusta': 'docker://spikeinterface/klusta:0.1.0', 'mountainsort4': 'docker://spikeinterface/mountainsort4:0.1.0', 'herdingspikes': 'docker://spikeinterface/herdingspikes:0.1.0'}


In [1]:
from pathlib import Path
import time
import numpy as np
import spikesorters as ss
import spiketoolkit as st
import spikeextractors as se
from spikesorters_docker.default_docker_images import default_docker_images

21:11:46 [I] klustakwik KlustaKwik2 version git-8fdb7a38-dirty


### run_sorter function from spikesorters_docker

In [2]:
def run_sorter(sorter_name, recording, output_folder, delete_output_folder=False,
               grouping_property=None, parallel=False, verbose=False, raise_error=True, n_jobs=-1,
               joblib_backend='loky', use_docker=True, container=None,
               **params):
    output_folder = Path(output_folder)
    recording_json = output_folder / "recording_input.json"
    sorting_json = output_folder / "sorting_output.json"

    # dump recording
    recording.dump_to_json(output_folder / "recording_input.json")

    if use_docker:
        if container is None:
            assert sorter_name in default_docker_images, f"Default docker image for {sorter_name} not found"
            docker_image = default_docker_images[sorter_name]

        print(f"Running in docker image {docker_image}")
        
        # define hither function with container at run time
        @hi.function('run_sorter_docker_with_container', '0.1.0', image=docker_image)
        def run_sorter_docker_with_container(
                recording_json, sorter_name, **kwargs
        ):
            recording = se.load_extractor_from_json(recording_json)
            # run sorter
            t_start = time.time()
            sorting = ss.run_sorter(sorter_name, recording, **kwargs)
            t_stop = time.time()
            print(f'{sorter_name} run time {np.round(t_stop - t_start)}s')
            output_folder = Path(kwargs['output_folder'])
            sorting.dump_to_json(output_folder / 'sorting_output.json')

        sorting_job = run_sorter_docker_with_container.run(recording_json=recording_json, sorter_name=sorter_name,
                                                           output_folder=output_folder,
                                                           delete_output_folder=delete_output_folder,
                                                           grouping_property=grouping_property, parallel=parallel,
                                                           verbose=verbose, raise_error=raise_error, n_jobs=n_jobs,
                                                           joblib_backend=joblib_backend,
                                                           **params)
        sorting_job.wait()
        sorting = se.load_extractor_from_json(sorting_json)
    else:
        sorting = None
        print('Standard sorting is turned off, sorry!')

    return sorting

  and should_run_async(code)


Get chached data ready for spike sorting

In [2]:
import os

dir_name = r'/mnt/d/freelance-work/catalyst-neuro/hussaini-lab-to-nwb/example_data_raw'
base_filename = 'axona_raw_5s'
filename = os.path.join(dir_name, base_filename)
print(filename)

/mnt/d/freelance-work/catalyst-neuro/hussaini-lab-to-nwb/example_data_raw/axona_raw_5s


  and should_run_async(code)


In [3]:
recording = se.AxonaRecordingExtractor(filename=filename)

In [4]:
recording.set_channel_groups([i//4 for i in range(len(recording.get_channel_ids()))])

print(f'Updated channel groups with a 4 channel per tetrode scheme:\n{recording.get_channel_groups()}')

Updated channel groups with a 4 channel per tetrode scheme:
[0 0 0 0 1 1 1 1 2 2 2 2 3 3 3 3]


In [5]:
# We simply paste this code into a .prb file
prb_file_content = '''
channel_groups = {
    0: {
        'channels': [0, 1, 2, 3],
        'geometry': [[0, 0], [1, 0], [2, 0], [3, 0]],
    },
    1: {
        'channels': [4, 5, 6, 7],
        'geometry': [[6, 0], [7, 0], [8, 0], [9, 0]],
    },
    2: {
        'channels': [8, 9, 10, 11],
        'geometry': [[12, 0], [13, 0], [14, 0], [15, 0]],
    },
    3: {
        'channels': [12, 13, 14, 15],
        'geometry': [[18, 0], [19, 0], [20, 0], [21, 0]],
    }
}
'''
f = open("channel_groups.prb", "w")
f.write(prb_file_content)
f.close()

In [6]:
recording_prb = recording.load_probe_file(os.path.join(dir_name, 'channel_groups.prb'))
print('Channel ids:', recording_prb.get_channel_ids())
print('Loaded properties', recording_prb.get_shared_channel_property_names())

# 'group' and 'location' can be returned as lists:
print('Channel groups:', recording_prb.get_channel_groups())
print('Channel locations:\n', recording_prb.get_channel_locations())

Channel ids: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
Loaded properties ['gain', 'group', 'location', 'name', 'offset']
Channel groups: [0 0 0 1 1 1 1 2 2 2 2 3 3 3 3]
Channel locations:
 [[ 1.  0.]
 [ 2.  0.]
 [ 3.  0.]
 [ 6.  0.]
 [ 7.  0.]
 [ 8.  0.]
 [ 9.  0.]
 [12.  0.]
 [13.  0.]
 [14.  0.]
 [15.  0.]
 [18.  0.]
 [19.  0.]
 [20.  0.]
 [21.  0.]]


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  if isinstance(start_frame, (float, np.float)):
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  if isinstance(end_frame, (float, np.float)) and np.isfinite(end_frame):


In [7]:
r_filt = st.preprocessing.bandpass_filter(recording_prb, freq_min=300, freq_max=6000)

In [8]:
r_cmr = st.preprocessing.common_reference(r_filt, reference='median')

In [9]:
#r_cache = se.load_extractor_from_pickle(os.path.join(dir_name, 'cached_data_preproc.pkl'))

In [10]:
r_cache = r_cmr

### Klusta

In [5]:
sorting_KL_all = run_sorter(
    sorter_name='klusta',
    recording=r_cache, 
    output_folder=os.path.join(dir_name, 'klusta'), 
    adjacency_radius=50, 
    verbose=True, 
    threshold_weak_std_factor=3,
    use_docker=True
)
print('Found', len(sorting_KL_all.get_unit_ids()), 'units')

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  elif isinstance(v, (np.int, np.int32, np.int64)):
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  elif isinstance(v, (np.float, np.float32, np.float64)):


Running in docker image docker://spikeinterface/klusta:0.1.0
HITHER JOBS: 1 pending; 0 queued; 0 running; 0 finished; 0 errored; 0 cache hits
Elapsed time for manage-pending-jobs: 21.68818497657776 sec
HITHER JOBS: 0 pending; 0 queued; 0 running; 1 finished; 0 errored; 0 cache hits
Found 6 units


### Herdingspikes

In [14]:
!ls -l

total 2420
-rwxrwxrwx 1 steburg steburg   54974 Apr  4 15:33 SpikeInterface_Tutorial.ipynb
-rwxrwxrwx 1 steburg steburg     800 Apr 12 20:23 axona_tutorial_re.nwb
-rwxrwxrwx 1 steburg steburg  171124 Apr  5 17:18 axona_tutorial_se.nwb
-rwxrwxrwx 1 steburg steburg     451 Apr  3 11:28 channel_groups.prb
-rwxrwxrwx 1 steburg steburg    2208 Apr 12 20:27 export_from_spikeinterface_to_tint.ipynb
drwxrwxrwx 1 steburg steburg    4096 Apr  1 21:38 ironclust
drwxrwxrwx 1 steburg steburg    4096 Apr 13 09:53 klusta_docker
-rwxrwxrwx 1 steburg steburg  236408 Apr 11 21:26 m2_modular_pipeline_spikesorters_docker.ipynb
-rwxrwxrwx 1 steburg steburg 1659654 Apr 12 20:25 milestone2_modular_pipeline_colab.ipynb
drwxrwxrwx 1 steburg steburg    4096 Apr  4 15:16 phy_KL
drwxrwxrwx 1 steburg steburg    4096 Apr  1 19:29 python-neo
drwxrwxrwx 1 steburg steburg    4096 Apr 10 17:01 spikeextractors
-rwxrwxrwx 1 steburg steburg   98629 Apr 13 13:42 test_spikesorters_docker.ipynb
-rwxrwxrwx 1 ste

In [13]:
import hither2 as hi

In [14]:
jh = hi.ParallelJobHandler(num_workers=4)

In [17]:
@hi.function('integrate_bessel', '0.1.0',
             image='docker://jsoules/simplescipy:latest',
             kachery_support=False)
def integrate_bessel(v, a, b):
    # Definite integral of bessel function of first kind
    # of order v from a to b
    import scipy.integrate as integrate
    import scipy.special as special
    return integrate.quad(lambda x: special.jv(v, x), a, b)[0]

In [18]:
# call function directly
val1 = integrate_bessel(v=2.5, a=0, b=4.5)

# call using hither pipeline
job = integrate_bessel.run(v=2.5, a=0, b=4.5)
val2 = job.wait()

# run inside container
with hi.Config(use_container=True, job_handler=jh):
    job = integrate_bessel.run(v=2.5, a=0, b=4.5)
    val3 = job.wait()

print(val1, val2, val3)

HITHER JOBS: 2 pending; 0 queued; 0 running; 1 finished; 0 errored; 0 cache hits


TypeError: 'str' object is not callable

In [19]:
# From https://github.com/flatironinstitute/hither/blob/master/hither2/examples/example1.py

import os
import time
from typing import List, Union
from hither.job import Job
import hither2 as hi
import numpy as np


thisdir = dir_name

def test1():
    a = np.array([[1, 2, 3], [4, 5, 6 + 7j]])
    b, c = test_numpy_serialization(x=a)
    print(b)
    print(c)

    b, c = test_numpy_serialization(x=a)
    print(b)
    print(c)

@hi.function('test_id', '0.1.0')
def test_id(x):
    return x

def test3():
    jh = hi.ParallelJobHandler(num_workers=4)
    a = np.array([1, 2, 3, 4, 5])
    with hi.Config(use_container=True, job_handler=jh):
        jobs = [
            hi.Job(test_numpy_serialization, dict(x=a*i, delay=3))
            for i in range(4)
        ]
        j2 = hi.Job(test_id, {'x': jobs})
        print('*******************************************')
        cc = j2.wait().return_value
        print(cc)

def test4():
    a = np.array([1, 2, 3, 4, 5])
    jc = hi.JobCache(feed_name='default-job-cache')
    with hi.Config(use_container=True, job_cache=jc):
        j = hi.Job(test_numpy_serialization, dict(x=a))
        j2 = hi.Job(test_id, dict(x=j))
        print('*******************************************')
        r = j2.wait()
        b, c = r.return_value
        print(b)
        print(c)

@hi.function('multiply_arrays', '0.1.2')
def multiply_arrays(x: np.ndarray, y: np.ndarray, delay: float):
    if delay > 0: time.sleep(delay)
    return x * y

def test5():
    jc = hi.JobCache(feed_name='default-job-cache')
    jh = hi.ParallelJobHandler(num_workers=4)
    jobs: List[hi.Job] = []
    with hi.Config(job_cache=jc, job_handler=jh):
        for i in range(8):
            print(f'Creating job {i}')
            j = hi.Job(multiply_arrays, dict(x=np.array([i, i]), y=np.array([2, 2]), delay=4))
            jobs.append(j)
    print('Waiting for jobs to complete')
    hi.wait(None)
    for j in jobs:
        if j.status == 'finished':
            print('RESULT:', j.status, j.result.return_value)
        elif j.status == 'error':
            print('ERROR', j.result.error)

In [20]:
import os
import time
from typing import List, Union
from hither.job import Job
import hither2 as hi
import numpy as np

dir_name = r'/mnt/d/freelance-work/catalyst-neuro/hussaini-lab-to-nwb/example_data_raw'
base_filename = 'axona_raw_5s'
filename = os.path.join(dir_name, base_filename)
print(filename)

/mnt/d/freelance-work/catalyst-neuro/hussaini-lab-to-nwb/example_data_raw/axona_raw_5s


In [21]:
@hi.function('test_id', '0.1.0')
def test_id(x):
    return x

def test2():
    with hi.Config(use_container=True):
        j2 = hi.Job(test_id, dict(x='a'))
        print('*******************************************')
        r = j2.wait()
        print('*******************************************')
        b = r.return_value
        print(b)

In [22]:
test2()

*******************************************
HITHER JOBS: 3 pending; 0 queued; 0 running; 1 finished; 0 errored; 0 cache hits


TypeError: 'str' object is not callable

In [11]:
import hither as hi
import hither2 as hi2

In [12]:
output_folder = Path(os.path.join(dir_name, 'herdingspikes'))
recording_json = output_folder / "recording_input.json"
sorting_json = output_folder / "sorting_output.json"

# dump recording
r_cache.dump_to_json(output_folder / "recording_input.json")

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  elif isinstance(v, (np.int, np.int32, np.int64)):
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  elif isinstance(v, (np.float, np.float32, np.float64)):


In [13]:
# define hither function with container at run time
@hi2.function('run_sorter_docker_with_container', '0.1.0',
             image=hi2.DockerImageFromScript(
                 name='herdingspikes',
                 dockerfile='/mnt/d/spikeinterface/spikesorters_docker/containers/steffen_tests/Dockerfile_hither2'
             ), kachery_support=False)
def run_sorter_docker_with_container(recording_json, sorter_name, **kwargs):
    recording = se.load_extractor_from_json(recording_json)
    # run sorter
    t_start = time.time()
    sorting = ss.run_sorter(sorter_name, recording, **kwargs)
    t_stop = time.time()
    print(f'{sorter_name} run time {np.round(t_stop - t_start)}s')
    output_folder = Path(kwargs['output_folder'])
    sorting.dump_to_json(output_folder / 'sorting_output.json')

In [14]:
@hi2.function(
    'run_sorter_docker_with_container', '0.1.0',
    image=hi2.DockerImageFromScript(
        name='herdingspikes',
        dockerfile='/mnt/d/spikeinterface/spikesorters_docker/containers/steffen_tests/Dockerfile_hither2'),
    modules=[]
)
def run_sorter_docker_with_container(recording_json, sorter_name, **kwargs):
    recording = se.load_extractor_from_json(recording_json)
    # run sorter
    t_start = time.time()
    sorting = ss.run_sorter(sorter_name, recording, **kwargs)
    t_stop = time.time()
    print(f'{sorter_name} run time {np.round(t_stop - t_start)}s')
    output_folder = Path(kwargs['output_folder'])
    sorting.dump_to_json(output_folder / 'sorting_output.json')

In [27]:
with hi2.Config(use_container=True):
    j = hi2.Job(run_sorter_docker_with_container,
                dict(
                    recording_json=recording_json,
                    sorter_name='herdingspikes',
                    output_folder=output_folder,
                    delete_output_folder=False,
                    grouping_property='group',
                    parallel=True,
                    verbose=True,
                    clustering_bandwidth=20,
                    raise_error=True,
                    n_jobs=-1,
                    joblib_backend='loky'
                )
    )
    r = j.wait()

#sorting = se.load_extractor_from_json(sorting_json)

HITHER JOBS: 17 pending; 0 queued; 1 running; 16 finished; 0 errored; 0 cache hits
Elapsed time for manage-pending-jobs: 1258.7698583602905 sec


Exception: Not safe for pickling: (<class 'pathlib.PosixPath'>). Perhaps this type should be whitelisted.

In [22]:
@hi2.function('multiply_arrays', '0.1.2')
def multiply_arrays(x: np.ndarray, y: np.ndarray, delay: float):
    if delay > 0: time.sleep(delay)
    return x * y

def test5():
    jh = hi2.ParallelJobHandler(num_workers=4)
    jobs: List[hi2.Job] = []
    with hi2.Config(job_handler=jh):
        for i in range(8):
            print(f'Creating job {i}')
            j = hi2.Job(multiply_arrays, dict(x=np.array([i, i]), y=np.array([2, 2]), delay=4))
            jobs.append(j)
    print('Waiting for jobs to complete')
    for j in jobs:
        if j.status == 'finished':
            print('RESULT:', j.status, j.result.return_value)
        elif j.status == 'error':
            print('ERROR', j.result.error)
            
def test6():
    jh = hi2.ParallelJobHandler(num_workers=4)
    jobs: List[hi2.Job] = []
    with hi2.Config(use_container=True, job_handler=jh):
        for i in range(8):
            print(f'Creating job {i}')
            j = hi2.Job(multiply_arrays, dict(x=np.array([i, i]), y=np.array([2, 2]), delay=4))
            jobs.append(j)
    print('Waiting for jobs to complete')
    for j in jobs:
        if j.status == 'finished':
            print('RESULT:', j.status, j.result.return_value)
        elif j.status == 'error':
            print('ERROR', j.result.error)

In [23]:
test5()

Creating job 0
Creating job 1
Creating job 2
Creating job 3
Creating job 4
Creating job 5
Creating job 6
Creating job 7
Waiting for jobs to complete


In [24]:
test6()

Creating job 0
Creating job 1
Creating job 2
Creating job 3
Creating job 4
Creating job 5
Creating job 6
Creating job 7
Waiting for jobs to complete


In [10]:
run_sorter_docker_with_container.run?

In [24]:
output_folder = Path(os.path.join(dir_name, 'herdingspikes'))
recording_json = output_folder / "recording_input.json"
sorting_json = output_folder / "sorting_output.json"

# dump recording
r_cache.dump_to_json(output_folder / "recording_input.json")


# define hither function with container at run time
@hi.function('run_sorter_docker_with_container', '0.1.0',
             #image='docker://spikeinterface/herdingspikes:0.1.0',
             image=hi.DockerImageFromScript(name='herdingspikes',
                                            dockerfile='/mnt/d/spikeinterface/spikesorters_docker/containers/herdingspikes'),
             kachery_support=True)
def run_sorter_docker_with_container(recording_json, sorter_name, **kwargs):
    recording = se.load_extractor_from_json(recording_json)
    # run sorter
    t_start = time.time()
    sorting = ss.run_sorter(sorter_name, recording, **kwargs)
    t_stop = time.time()
    print(f'{sorter_name} run time {np.round(t_stop - t_start)}s')
    output_folder = Path(kwargs['output_folder'])
    sorting.dump_to_json(output_folder / 'sorting_output.json')
        
with hi.Config(use_container=True):
    sorting_job = run_sorter_docker_with_container.run(
        recording_json=recording_json,
        sorter_name='herdingspikes',
        output_folder=output_folder,
        delete_output_folder=False,
        grouping_property='group',
        parallel=True,
        clustering_bandwidth=20,
        verbose=True,
        raise_error=True,
        n_jobs=-1,
        joblib_backend='loky')
    sorting_job.wait()

sorting = se.load_extractor_from_json(sorting_json)

HITHER JOBS: 5 pending; 0 queued; 0 running; 1 finished; 0 errored; 0 cache hits


TypeError: 'str' object is not callable

In [None]:
sorting_HS = ss.run_sorter(
    sorter_name_or_class='herdingspikes',
    recording=r_cache,
    output_folder=os.path.join(dir_name, 'herdingspikes'),
    grouping_property='group',
    clustering_bandwidth=20,
    parallel=False,
    verbose=True,
    filter=False
)
print('Found', len(sorting_HS.get_unit_ids()), 'units')

In [6]:
sorting_HS = run_sorter(
    sorter_name='herdingspikes',
    recording=r_cache,
    output_folder=os.path.join(dir_name, 'herdingspikes'),
    grouping_property='group',
    clustering_bandwidth=20,
    verbose=True,
    use_docker=True
)
print('Found', len(sorting_HS.get_unit_ids()), 'units')

Running in docker image docker://spikeinterface/herdingspikes:0.1.0
Elapsed time for manage-pending-jobs: 2.0342297554016113 sec
HITHER JOBS: 0 pending; 0 queued; 0 running; 1 finished; 1 errored; 0 cache hits


Exception: Error in run_sorter_docker_with_container (0.1.0): Spike sorting failed: 'DataFrame' object has no attribute 'cl'. You can inspect the runtime trace in the herdingspikes.log of the output folder.'

### Mountainsort4

In [16]:
default_docker_images

{'klusta': 'docker://spikeinterface/klusta:0.1.0',
 'mountainsort4': 'docker://spikeinterface/mountainsort4:0.1.0',
 'herdingspikes': 'docker://spikeinterface/herdingspikes:0.1.0'}

In [26]:
def run_sorter(sorter_name, recording, output_folder, delete_output_folder=False,
               grouping_property=None, parallel=False, verbose=False, raise_error=True, n_jobs=-1,
               joblib_backend='loky', use_docker=True, container=None,
               **params):
    output_folder = Path(output_folder)
    recording_json = output_folder / "recording_input.json"
    sorting_json = output_folder / "sorting_output.json"

    # dump recording
    recording.dump_to_json(output_folder / "recording_input.json")

    if use_docker:
        if container is None:
            assert sorter_name in default_docker_images, f"Default docker image for {sorter_name} not found"
            docker_image = default_docker_images[sorter_name]
            
        print(f"Running in docker image {docker_image}")
        
        # define hither function with container at run time
        @hi.function('run_sorter_docker_with_container', '0.1.0',
                     image=docker_image)
        def run_sorter_docker_with_container(
                recording_json, sorter_name, **kwargs
        ):
            recording = se.load_extractor_from_json(recording_json)
            # run sorter
            t_start = time.time()
            sorting = ss.run_sorter(sorter_name, recording, **kwargs)
            t_stop = time.time()
            print(f'{sorter_name} run time {np.round(t_stop - t_start)}s')
            output_folder = Path(kwargs['output_folder'])
            sorting.dump_to_json(output_folder / 'sorting_output.json')

        sorting_job = run_sorter_docker_with_container.run(recording_json=recording_json, sorter_name=sorter_name,
                                                           output_folder=output_folder,
                                                           delete_output_folder=delete_output_folder,
                                                           grouping_property=grouping_property, parallel=parallel,
                                                           verbose=verbose, raise_error=raise_error, n_jobs=n_jobs,
                                                           joblib_backend=joblib_backend,
                                                           **params)
        sorting_job.wait()
        sorting = se.load_extractor_from_json(sorting_json)
    else:
        sorting = None
        print('Standard sorting is turned off, sorry!')

    return sorting

In [28]:
sorting_MS4 = run_sorter(
    recording=r_cache,
    sorter_name='mountainsort4',
    output_folder=os.path.join(dir_name, 'mountainsort4_group'),
    grouping_property='group',
    parallel=False,
    verbose=True,
    filter=False,
    use_docker=True
)
print(f'Mountainsort4 found {len(sorting_MS4.get_unit_ids())} units')

Running in docker image docker://spikeinterface/mountainsort4:0.1.0
HITHER JOBS: 1 pending; 0 queued; 2 running; 2 finished; 1 errored; 0 cache hits
Elapsed time for manage-pending-jobs: 130.25226593017578 sec


KeyboardInterrupt: 

__Running mountainsort with the following seems to work!!!__

Ah, but this works because I turned `parallel=False`, not because it runs in Docker!

In [31]:
output_folder = Path(os.path.join(dir_name, 'herdingspikes'))
recording_json = output_folder / "recording_input.json"
sorting_json = output_folder / "sorting_output.json"

# dump recording
r_cache.dump_to_json(output_folder / "recording_input.json")

In [32]:
# define hither function with container at run time
@hi.function('run_sorter_docker_with_container', '0.1.0',
             image='docker://spikeinterface/mountainsort4:0.1.0')
def run_sorter_docker_with_container(
        recording_json, sorter_name, **kwargs
):
    recording = se.load_extractor_from_json(recording_json)
    # run sorter
    t_start = time.time()
    sorting = ss.run_sorter(sorter_name, recording, **kwargs)
    t_stop = time.time()
    print(f'{sorter_name} run time {np.round(t_stop - t_start)}s')
    output_folder = Path(kwargs['output_folder'])
    sorting.dump_to_json(output_folder / 'sorting_output.json')
        
sorting_job = run_sorter_docker_with_container.run(recording_json=recording_json,
                                                   sorter_name='mountainsort4',
                                                   output_folder=output_folder,
                                                   delete_output_folder=False,
                                                   grouping_property='group',
                                                   parallel=True,
                                                   verbose=True,
                                                   raise_error=True,
                                                   n_jobs=-1,
                                                   joblib_backend='loky')
sorting_job.wait()
sorting = se.load_extractor_from_json(sorting_json)

HITHER JOBS: 1 pending; 0 queued; 3 running; 2 finished; 3 errored; 0 cache hits
Elapsed time for manage-pending-jobs: 44.76244854927063 sec


KeyboardInterrupt: 

In [25]:
sorting.sortings[0]

<spikeextractors.extractors.mdaextractors.mdaextractors.MdaSortingExtractor at 0x7ff4d71aaf40>

In [11]:
sorting_MS4 = run_sorter(
    recording=r_cache,
    sorter_name='mountainsort4',
    output_folder=os.path.join(dir_name, 'mountainsort4_group'),
    grouping_property='group',
    parallel=True,
    verbose=True,
    filter=False,
    use_docker=True
)
print(f'Mountainsort4 found {len(sorting_MS4.get_unit_ids())} units')

Running in docker image docker://spikeinterface/mountainsort4:0.1.0
HITHER JOBS: 1 pending; 0 queued; 0 running; 1 finished; 1 errored; 0 cache hits
Elapsed time for manage-pending-jobs: 491.8024661540985 sec


KeyboardInterrupt: 

In [17]:
hi.function?