-
Notifications
You must be signed in to change notification settings - Fork 5
/
_labelmap.py
1805 lines (1386 loc) · 65.1 KB
/
_labelmap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import re
import gzip
import logging
from io import BytesIO
from functools import partial
from multiprocessing.pool import ThreadPool
from collections import defaultdict
import numpy as np
import pandas as pd
from requests import HTTPError
from libdvid import DVIDNodeService, encode_label_block
from ...util import Timer, round_box, extract_subvol, DEFAULT_TIMESTAMP, tqdm_proxy, ndrange, box_to_slicing, compute_parallel
from .. import dvid_api_wrapper, fetch_generic_json, fetch_repo_info
from ..repo import create_voxel_instance, fetch_repo_dag
from ..kafka import read_kafka_messages, kafka_msgs_to_df
from ..rle import parse_rle_response
from ._split import SplitEvent, fetch_supervoxel_splits_from_kafka
from .labelops_pb2 import MappingOps, MappingOp
from neuclease.dvid.server import fetch_server_info
logger = logging.getLogger(__name__)
@dvid_api_wrapper
def create_labelmap_instance(server, uuid, instance, versioned=True, tags=[], block_size=64, voxel_size=8.0,
voxel_units='nanometers', enable_index=True, max_scale=0, *, session=None):
"""
Create a labelmap instance.
Args:
enable_index:
Whether or not to support indexing on this label instance
Should usually be True, except for benchmarking purposes.
max_scale:
The maximum downres level of this labelmap instance.
Other args passed directly to create_voxel_instance().
"""
type_specific_settings = { "IndexedLabels": str(enable_index).lower(), "CountLabels": str(enable_index).lower(), "MaxDownresLevel": str(max_scale) }
create_voxel_instance( server, uuid, instance, 'labelmap', versioned, tags=tags, block_size=block_size, voxel_size=voxel_size,
voxel_units=voxel_units, type_specific_settings=type_specific_settings, session=session )
@dvid_api_wrapper
def fetch_maxlabel(server, uuid, instance, *, session=None, dag=None):
"""
Read the MaxLabel for the given segmentation instance at the given node.
This implementation includes a workaround for issue 284:
- https://github.com/janelia-flyem/dvid/issues/284
If the ``/maxlabel`` endpoint returns an error stating "No maximum label",
we recursively check the parent node(s) for a valid maxlabel until we find one.
"""
url = f'http://{server}/api/node/{uuid}/{instance}/maxlabel'
try:
return fetch_generic_json(url, session=session)["maxlabel"]
except HTTPError as ex:
if ex.response is None or 'No maximum label' not in ex.response.content.decode('utf-8'):
raise
# Oops, Issue 284
# Search upwards in the DAG for a uuid with a valid max label
if dag is None:
dag = fetch_repo_dag(server, uuid)
# Check the parent nodes
parents = list(dag.predecessors(uuid))
if not parents:
# No parents to check
raise
# Find the maxlabel of all parents
# (In theory, there can be more than one.)
parent_maxes = []
for parent_uuid in parents:
try:
parent_maxes.append( fetch_maxlabel(server, parent_uuid, instance, dag=dag) )
except HTTPError as ex:
parent_maxes.append(0)
parent_max = max(parent_maxes)
if parent_max == 0:
# Could not find a max label for any parent.
raise
return parent_max
@dvid_api_wrapper
def post_maxlabel(server, uuid, instance, maxlabel, *, session=None):
"""
Update the MaxLabel for the given segmentation instance at the given node.
Usually DVID auto-updates the MaxLabel for an instance upon cleave/split/split-supervoxel,
so there is rarely any need to use this function.
But it is useful when you are writing raw supervoxel data "under DVID's feet".
Note:
It is an error to post a smaller maxlabel value than the
current MaxLabel value for the instance.
See ``fetch_maxlabel()``
"""
url = f'http://{server}/api/node/{uuid}/{instance}/maxlabel/{maxlabel}'
r = session.post(url)
r.raise_for_status()
@dvid_api_wrapper
def fetch_nextlabel(server, uuid, instance, *, session=None):
url = f'http://{server}/api/node/{uuid}/{instance}/nextlabel'
r_json = fetch_generic_json(url, session=session)
return r_json['nextlabel']
@dvid_api_wrapper
def post_nextlabel(server, uuid, instance, num_labels, *, session=None):
"""
Ask DVID to reserve a number of new label IDs that are not yet
present in the given labelmap instance.
For typical merge/cleave/split operations, this is not necessary.
DVID handles the generation of label IDs as needed.
But in the (very rare) case where you want to overwrite segmentation supervoxels directly,
you should use this function to reserve a number of unique IDs that you can
use without conflicting with existing objects in the volume.
Args:
server:
dvid server, e.g. 'emdata3:8900'
uuid:
dvid uuid, e.g. 'abc9'
instance:
dvid instance name, e.g. 'segmentation'
num_labels:
How many labels to reserve
Returns:
(start, end), where start and end represent the first and last
labels in a contiguous block of label IDs that have been reserved.
Note:
The returned range is INCLUSIVE, meaning 'end' is reserved
(unlike python's range() function).
"""
url = f'http://{server}/api/node/{uuid}/{instance}/nextlabel/{num_labels}'
r = session.post(url)
r.raise_for_status()
d = r.json()
start, end = d["start"], d["end"]
num_reserved = end-start+1
assert num_reserved == num_labels, \
"Unexpected response from DVID. "\
f"Requested {num_labels}, but received {num_reserved} labels ({start}, {end})"
return (np.uint64(start), np.uint64(end))
@dvid_api_wrapper
def fetch_supervoxels(server, uuid, instance, body_id, user=None, *, session=None):
"""
Fetch the list of supervoxel IDs that are associated with the given body.
"""
# FIXME: Rename to 'fetch_supervoxels()'
# FIXME: Remove 'user' in favor of session arg
query_params = {}
if user:
query_params['u'] = user
url = f'http://{server}/api/node/{uuid}/{instance}/supervoxels/{body_id}'
r = session.get(url, params=query_params)
r.raise_for_status()
supervoxels = np.array(r.json(), np.uint64)
supervoxels.sort()
return supervoxels
# Deprecated name
fetch_supervoxels_for_body = fetch_supervoxels
@dvid_api_wrapper
def fetch_size(server, uuid, instance, label_id, supervoxels=False, *, session=None):
"""
Wrapper for DVID's /size endpoint.
Returns the size (voxel count) of a single body (or supervoxel)
which DVID obtains by reading the body's label indexes.
Args:
server:
dvid server, e.g. 'emdata3:8900'
uuid:
dvid uuid, e.g. 'abc9'
instance:
dvid instance name, e.g. 'segmentation'
label_id:
A single label ID to fetch size of.
Should be a body ID, unless supervoxels=True,
in which case it should be a supervoxel ID.
supervoxels:
If True, interpret label_id as supervoxel ID,
and return a supervoxel size, not a body size.
Returns:
The voxel count of the given body/supervoxel, as an integer.
"""
supervoxels = str(bool(supervoxels)).lower()
url = f'http://{server}/api/node/{uuid}/{instance}/size/{label_id}?supervoxels={supervoxels}'
response = fetch_generic_json(url, session=session)
return response['voxels']
# FIXME: Deprecated name
fetch_body_size = fetch_size
@dvid_api_wrapper
def fetch_sizes(server, uuid, instance, label_ids, supervoxels=False, *, session=None):
"""
Wrapper for DVID's /sizes endpoint.
Returns the size (voxel count) of the given bodies (or supervoxels),
which DVID obtains by reading the bodies' label indexes.
Args:
server:
dvid server, e.g. 'emdata3:8900'
uuid:
dvid uuid, e.g. 'abc9'
instance:
dvid instance name, e.g. 'segmentation'
label_ids:
List of label IDs to fetch sizes for.
Should be a list of body IDs, unless supervoxels=True,
in which case it should be a list of supervoxel IDs.
supervoxels:
If True, interpret label_ids as a list of supervoxel IDs,
and return supervoxel sizes, not body sizes.
Returns:
pd.Series, of the size results, in the same order as the labels passed in.
Indexed by label ID.
"""
label_ids = np.asarray(label_ids, np.uint64)
sv_param = str(bool(supervoxels)).lower()
url = f'http://{server}/api/node/{uuid}/{instance}/sizes?supervoxels={sv_param}'
sizes = fetch_generic_json(url, label_ids.tolist(), session=session)
sizes = pd.Series(sizes, index=label_ids, name='size')
if supervoxels:
sizes.index.name = 'sv'
else:
sizes.index.name = 'body'
return sizes
# FIXME: Deprecated name
fetch_body_sizes = fetch_sizes
@dvid_api_wrapper
def fetch_supervoxel_sizes_for_body(server, uuid, instance, body_id, user=None, *, session=None):
"""
Return the sizes of all supervoxels in a body.
Convenience function to call fetch_supervoxels() followed by fetch_sizes()
Returns:
pd.Series, indexed by supervoxel
"""
# FIXME: Remove 'user' param in favor of 'session' param.
supervoxels = fetch_supervoxels_for_body(server, uuid, instance, body_id, user, session=session)
query_params = {}
if user:
query_params['u'] = user
# FIXME: Call fetch_sizes() with a custom session instead of rolling our own request here.
url = f'http://{server}/api/node/{uuid}/{instance}/sizes?supervoxels=true'
r = session.get(url, params=query_params, json=supervoxels.tolist())
r.raise_for_status()
sizes = np.array(r.json(), np.uint32)
series = pd.Series(data=sizes, index=supervoxels)
series.index.name = 'sv'
series.name = 'size'
return series
@dvid_api_wrapper
def fetch_label(server, uuid, instance, coordinate_zyx, supervoxels=False, scale=0, *, session=None):
"""
Fetch the label at a single coordinate.
Args:
server:
dvid server, e.g. 'emdata3:8900'
uuid:
dvid uuid, e.g. 'abc9'
instance:
dvid instance name, e.g. 'segmentation'
coordinate_zyx:
A single coordinate ``[z,y,x]``
supervoxels:
If True, read supervoxel IDs from DVID, not body IDs.
scale:
Which scale of the data to read from.
(Your coordinates must be correspondingly scaled.)
Returns:
``np.uint64``
See also:
``fetch_labels()``, ``fectch_labels_batched()``
"""
coord_xyz = np.array(coordinate_zyx)[::-1]
coord_str = '_'.join(map(str, coord_xyz))
params = {}
if supervoxels:
params['supervoxels'] = str(bool(supervoxels)).lower()
if scale != 0:
params['scale'] = str(scale)
r = session.get(f'http://{server}/api/node/{uuid}/{instance}/label/{coord_str}', params=params)
r.raise_for_status()
return np.uint64(r.json()["Label"])
# Old name (FIXME: remove)
fetch_label_for_coordinate = fetch_label
@dvid_api_wrapper
def fetch_labels(server, uuid, instance, coordinates_zyx, supervoxels=False, scale=0, *, session=None):
"""
Fetch the labels at a list of coordinates.
Args:
server:
dvid server, e.g. 'emdata3:8900'
uuid:
dvid uuid, e.g. 'abc9'
instance:
dvid instance name, e.g. 'segmentation'
coordinates_zyx:
array of shape (N,3) with coordinates to sample.
Rows must be ``[[z,y,x], [z,y,x], ...``
supervoxels:
If True, read supervoxel IDs from DVID, not body IDs.
scale:
Which scale of the data to read from.
(Your coordinates must be correspondingly scaled.)
Returns:
ndarray of N labels
See also:
``fetch_label()``, ``fectch_labels_batched()``
"""
coordinates_zyx = np.asarray(coordinates_zyx, np.int32)
assert coordinates_zyx.ndim == 2 and coordinates_zyx.shape[1] == 3
params = {}
if supervoxels:
params['supervoxels'] = str(bool(supervoxels)).lower()
if scale != 0:
params['scale'] = str(scale)
coords_xyz = np.array(coordinates_zyx)[:, ::-1].tolist()
r = session.get(f'http://{server}/api/node/{uuid}/{instance}/labels', json=coords_xyz, params=params)
r.raise_for_status()
labels = np.array(r.json(), np.uint64)
return labels
def fetch_labels_batched(server, uuid, instance, coordinates_zyx, supervoxels=False, scale=0, batch_size=10_000, threads=0, processes=0, presort=True):
"""
Like fetch_labels, but fetches in batches, optionally multithreaded or multiprocessed.
See also: ``fetch_label()``, ``fectch_labels()``
"""
assert not threads or not processes, "Choose either threads or processes (not both)"
coords_df = pd.DataFrame(coordinates_zyx, columns=['z', 'y', 'x'], dtype=np.int32)
coords_df['label'] = np.uint64(0)
if presort:
with Timer("Pre-sorting coordinates by block index.", logger):
# Sort coordinates by their block index,
# so DVID will be able to service the requests faster.
coords_df['bz'] = coords_df['z'] // 64
coords_df['by'] = coords_df['y'] // 64
coords_df['bx'] = coords_df['x'] // 64
coords_df.sort_values(['bz', 'by', 'bx'], inplace=True)
del coords_df['bz']
del coords_df['by']
del coords_df['bx']
fetch_batch = partial(_fetch_labels_batch, server, uuid, instance, supervoxels, scale)
batch_dfs = []
for batch_start in range(0, len(coords_df), batch_size):
batch_stop = min(batch_start+batch_size, len(coords_df))
batch_df = coords_df.iloc[batch_start:batch_stop].copy()
batch_dfs.append(batch_df)
batch_starts = list(range(0, len(coords_df), batch_size))
if threads <= 1 and processes <= 1:
batch_result_dfs = map(fetch_batch, batch_dfs)
batch_result_dfs = tqdm_proxy(batch_result_dfs, total=len(batch_starts), leave=False, logger=logger)
batch_result_dfs = list(batch_result_dfs)
else:
batch_result_dfs = compute_parallel(fetch_batch, batch_dfs, 1, threads, processes, ordered=False, leave_progress=False)
return pd.concat(batch_result_dfs).sort_index()['label'].values
def _fetch_labels_batch(server, uuid, instance, supervoxels, scale, batch_df):
"""
Helper for fetch_labels_batched(), defined at top-level so it can be pickled.
"""
batch_coords = batch_df[['z', 'y', 'x']].values
# don't pass session: We want a unique session per thread
batch_df.loc[:, 'label'] = fetch_labels(server, uuid, instance, batch_coords, supervoxels, scale)
return batch_df
@dvid_api_wrapper
def fetch_sparsevol_rles(server, uuid, instance, label, supervoxels=False, scale=0, *, session=None):
"""
Fetch the sparsevol RLE representation for a given label.
See also: neuclease.dvid.rle.parse_rle_response()
"""
supervoxels = str(bool(supervoxels)).lower() # to lowercase string
url = f'http://{server}/api/node/{uuid}/{instance}/sparsevol/{label}?supervoxels={supervoxels}&scale={scale}'
r = session.get(url)
r.raise_for_status()
return r.content
@dvid_api_wrapper
def post_split_supervoxel(server, uuid, instance, supervoxel, rle_payload_bytes, downres=True, *, split_id=None, remain_id=None, session=None):
"""
Split the given supervoxel according to the provided RLE payload,
as specified in DVID's split-supervoxel docs.
Args:
server, uuid, intance:
Segmentation instance
supervoxel:
ID of the supervoxel to split
rle_payload_bytes:
RLE binary payload, in the format specified by the DVID docs.
split_id, remain_id:
DANGEROUS. Instead of letting DVID choose the ID of the new 'split' and
'remain' supervoxels, these parameters allow you to specify them yourself.
Returns:
The two new IDs resulting from the split: (split_sv_id, remaining_sv_id)
"""
url = f'http://{server}/api/node/{uuid}/{instance}/split-supervoxel/{supervoxel}'
if bool(split_id) ^ bool(remain_id):
msg = ("I'm not sure if DVID allows you to specify the split_id "
"without specifying remain_id (or vice-versa). "
"Please specify both (or neither).")
raise RuntimeError(msg)
params = {}
if not downres:
# true by default; not supported in older versions of dvid.
params['downres'] = 'false'
if split_id is not None:
params['split'] = str(split_id)
if remain_id is not None:
params['remain'] = str(remain_id)
r = session.post(url, data=rle_payload_bytes, params=params)
r.raise_for_status()
results = r.json()
return (results["SplitSupervoxel"], results["RemainSupervoxel"] )
# Legacy name
split_supervoxel = post_split_supervoxel
@dvid_api_wrapper
def fetch_mapping(server, uuid, instance, supervoxel_ids, *, session=None):
"""
For each of the given supervoxels, ask DVID what body they belong to.
If the supervoxel no longer exists, it will map to label 0.
Returns:
pd.Series, with index named 'sv' and values named 'body'
"""
supervoxel_ids = list(map(int, supervoxel_ids))
body_ids = fetch_generic_json(f'http://{server}/api/node/{uuid}/{instance}/mapping', json=supervoxel_ids, session=session)
mapping = pd.Series(body_ids, index=np.asarray(supervoxel_ids, np.uint64), dtype=np.uint64, name='body')
mapping.index.name = 'sv'
return mapping
@dvid_api_wrapper
def fetch_mappings(server, uuid, instance, as_array=False, *, format=None, session=None): # @ReservedAssignment
"""
Fetch the complete sv-to-label in-memory mapping table
from DVID and return it as a numpy array or a pandas Series (indexed by sv).
(This takes 30-60 seconds for a hemibrain-sized volume.)
NOTE: This returns the 'raw' mapping from DVID, which is usually not useful on its own.
DVID does not store entries for 'identity' mappings, and it sometimes includes
entries for supervoxels that have already been 'retired' due to splits.
See fetch_complete_mappings(), which compensates for these issues.
Args:
as_array:
If True, return the mapping as an array with shape (N,2),
where supervoxel is the first column and body is the second.
Otherwise, return a pd.Series
format:
The format in which dvid should return the data, either 'csv' or 'binary'.
By default, the DVID server version is checked to see if 'binary' is supported,
and 'binary' is used if possible. (The 'binary' format saves some time,
since there is no need to parse CSV.)
Returns:
pd.Series(index=sv, data=body), unless as_array is True
"""
if format is None:
# The first DVID version to support the 'binary' format is v0.8.24-15-g40b8b77
dvid_version = fetch_server_info(server)["DVID Version"]
assert dvid_version.startswith('v')
parts = re.split(r'[.-]', dvid_version[1:])
parts = parts[:4]
parts = tuple(int(p) for p in parts)
if parts >= (0, 8, 24, 15):
format = 'binary' # @ReservedAssignment
else:
format = 'csv' # @ReservedAssignment
if format == 'binary':
# This takes ~30 seconds so it's nice to log it.
uri = f"http://{server}/api/node/{uuid}/{instance}/mappings?format=binary"
with Timer(f"Fetching {uri}", logger):
r = session.get(uri)
r.raise_for_status()
a = np.frombuffer(r.content, np.uint64).reshape(-1,2)
if as_array:
return a
df = pd.DataFrame(a, columns=['sv', 'body'])
else:
# This takes ~30 seconds so it's nice to log it.
uri = f"http://{server}/api/node/{uuid}/{instance}/mappings"
with Timer(f"Fetching {uri}", logger):
r = session.get(uri)
r.raise_for_status()
with Timer(f"Parsing mapping", logger), BytesIO(r.content) as f:
df = pd.read_csv(f, sep=' ', header=None, names=['sv', 'body'], engine='c', dtype=np.uint64)
if as_array:
return df.values
df.set_index('sv', inplace=True)
assert df.index.dtype == np.uint64
assert df['body'].dtype == np.uint64
return df['body']
@dvid_api_wrapper
def fetch_complete_mappings(server, uuid, instance, include_retired=True, kafka_msgs=None, sort=None, *, session=None):
"""
Fetch the complete mapping from DVID for all agglomerated bodies,
including 'identity' mappings (for agglomerated bodies only)
and taking split supervoxels into account (discard them, or map them to 0).
This is similar to fetch_mappings() above, but compensates for the incomplete
mapping from DVID due to identity rows, and filters out retired supervoxels.
(This function takes ~2 minutes to run on the hemibrain volume.)
Note: Single-supervoxel bodies are not necessarily included in this mapping.
Any supervoxel IDs missing from the results of this function should be
considered as implicitly mapped to themselves.
Args:
server:
dvid server, e.g. 'emdata3:8900'
uuid:
dvid uuid, e.g. 'abc9'
instance:
dvid instance name, e.g. 'segmentation'
include_retired:
If True, include rows for 'retired' supervoxels, which all map to 0.
Returns:
pd.Series(index=sv, data=body)
"""
assert sort in (None, 'sv', 'body')
# Read complete kafka log; we need both split and cleave info
if kafka_msgs is None:
kafka_msgs = read_kafka_messages(server, uuid, instance)
split_events = fetch_supervoxel_splits_from_kafka(server, uuid, instance, kafka_msgs=kafka_msgs, session=session)
split_tables = list(map(lambda t: np.asarray([row[:-1] for row in t], np.uint64), split_events.values()))
if split_tables:
split_table = np.concatenate(split_tables)
retired_svs = split_table[:, SplitEvent._fields.index('old')] #@UndefinedVariable
retired_svs = set(retired_svs)
else:
retired_svs = set()
def extract_cleave_fragments():
for msg in kafka_msgs:
if msg["Action"] == "cleave":
yield msg["CleavedLabel"]
# Cleave fragment IDs (i.e. bodies that were created via a cleave)
# should not be included in the set of 'identity' rows.
# (These IDs are guaranteed to be disjoint from supervoxel IDs.)
cleave_fragments = set(extract_cleave_fragments())
# Fetch base mapping
base_mapping = fetch_mappings(server, uuid, instance, as_array=True, session=session)
base_svs = base_mapping[:,0]
base_bodies = base_mapping[:,1]
# Augment with identity rows, which aren't included in the base.
with Timer(f"Constructing missing identity-mappings", logger):
missing_idents = set(base_bodies) - set(base_svs) - retired_svs - cleave_fragments
missing_idents = np.fromiter(missing_idents, np.uint64)
missing_idents_mapping = np.array((missing_idents, missing_idents)).transpose()
parts = [base_mapping, missing_idents_mapping]
# Optionally include 'retired' supervoxels -- mapped to 0
if include_retired:
retired_svs_array = np.fromiter(retired_svs, np.uint64)
retired_mapping = np.zeros((len(retired_svs_array), 2), np.uint64)
retired_mapping[:, 0] = retired_svs_array
parts.append(retired_mapping)
# Combine into a single table
full_mapping = np.concatenate(parts)
full_mapping = np.asarray(full_mapping, order='C')
# Drop duplicates that may have been introduced via retired svs
# (if DVID didn't filter them out)
dupes = pd.Series(full_mapping[:,0]).duplicated(keep='last')
full_mapping = full_mapping[(~dupes).values]
# View as 1D buffer of structured dtype to sort in-place.
# (Sorted index is more efficient with speed and RAM in pandas)
mapping_view = memoryview(full_mapping.reshape(-1))
np.frombuffer(mapping_view, dtype=[('sv', np.uint64), ('body', np.uint64)]).sort()
# Construct pd.Series for fast querying
s = pd.Series(index=full_mapping[:,0], data=full_mapping[:,1])
if not include_retired:
# Drop all rows with retired supervoxels, including:
# identities we may have added that are now retired
# any retired SVs erroneously included by DVID itself in the fetched mapping
s.drop(retired_svs, inplace=True, errors='ignore')
# Reload index to ensure most RAM-efficient implementation.
# (This seems to make a big difference in RAM usage!)
s.index = s.index.values
s.index.name = 'sv'
s.name = 'body'
if sort == 'sv':
s.sort_index(inplace=True)
elif sort == 'body':
s.sort_values(inplace=True)
assert s.index.dtype == np.uint64
assert s.dtype == np.uint64
return s
@dvid_api_wrapper
def post_mappings(server, uuid, instance, mappings, mutid, *, batch_size=None, session=None):
"""
Post a list of SV-to-body mappings to DVID, provided as a ``pd.Series``.
Will be converted to protobuf structures for upload.
Note:
This is not intended for general use. It is used to initialize
the mapping after an initial ingestion of supervoxels.
Args:
server:
dvid server, e.g. 'emdata3:8900'
uuid:
dvid uuid, e.g. 'abc9'
instance:
dvid instance name, e.g. 'segmentation'
mappings:
``pd.Series``, indexed by sv, bodies as value
mutid:
The mutation ID to use when posting the mappings
batch_size:
If provided, the mappings will be sent in batches, whose sizes will
roughly correspond to the given size, in terms of the number of
supervoxels in the batch.
"""
assert isinstance(mappings, pd.Series)
df = pd.DataFrame(mappings.rename('body'))
df.index.name = 'sv'
df = df.reset_index()
def _post_mapping_ops(ops_list):
ops = MappingOps()
ops.mappings.extend(ops_list)
payload = ops.SerializeToString()
r = session.post(f'http://{server}/api/node/{uuid}/{instance}/mappings', data=payload)
r.raise_for_status()
progress_bar = tqdm_proxy(total=len(df), disable=(batch_size is None), logger=logger)
progress_bar.update(0)
batch_ops_so_far = 0
ops_list = []
for body_id, body_df in df.groupby('body'):
op = MappingOp()
op.mutid = mutid
op.mapped = body_id
op.original.extend( body_df['sv'] )
# Add to this chunk of ops
ops_list.append(op)
batch_ops_so_far += len(op.original)
# Send if chunk is full
if (batch_size is not None) and (batch_ops_so_far >= batch_size):
_post_mapping_ops(ops_list)
progress_bar.update(batch_ops_so_far)
ops_list = [] # reset
batch_ops_so_far = 0
# send last chunk, if there are leftovers
if ops_list:
_post_mapping_ops(ops_list)
progress_bar.update(batch_ops_so_far)
def copy_mappings(src_info, dest_info, batch_size=None, *, session=None):
"""
Copy the complete in-memory mapping from one server to another,
performed in batches and with a progress display.
Args:
src_triple:
tuple (server, uuid, instance) to copy from
dest_triple:
tuple (server, uuid, instance) to copy to
batch_size:
If provided, the mappings will be sent in batches, whose sizes will
roughly correspond to the given size, in terms of the number of
supervoxels in the batch.
"""
# Pick the higher mutation id between the source and destination
src_mutid = fetch_repo_info(*src_info[:2])["MutationID"]
dest_mutid = fetch_repo_info(*dest_info[:2])["MutationID"]
mutid = max(src_mutid, dest_mutid)
mappings = fetch_mappings(*src_info)
post_mappings(*dest_info, mappings, mutid, batch_size=batch_size, session=session)
@dvid_api_wrapper
def fetch_mutation_id(server, uuid, instance, body_id, *, session=None):
response = fetch_generic_json(f'http://{server}/api/node/{uuid}/{instance}/lastmod/{body_id}', session=session)
return response["mutation id"]
@dvid_api_wrapper
def fetch_sparsevol_coarse(server, uuid, instance, label_id, supervoxels=False, *, session=None):
"""
Return the 'coarse sparsevol' representation of a given body/supervoxel.
This is similar to the sparsevol representation at scale=6,
EXCEPT that it is generated from the label index, so no blocks
are lost from downsampling.
Return an array of coordinates of the form:
[[Z,Y,X],
[Z,Y,X],
[Z,Y,X],
...
]
See also: ``fetch_sparsevol_coarse_via_labelindex()``
Note: The returned coordinates are not necessarily sorted.
"""
supervoxels = str(bool(supervoxels)).lower()
r = session.get(f'http://{server}/api/node/{uuid}/{instance}/sparsevol-coarse/{label_id}?supervoxels={supervoxels}')
r.raise_for_status()
return parse_rle_response( r.content )
def fetch_sparsevol_coarse_threaded(server, uuid, instance, labels, supervoxels=False, num_threads=2):
"""
Call fetch_sparsevol_coarse() for a list of threads using a ThreadPool.
Returns:
dict of { label: coords }
If any of the sparsevols can't be found due to error 404,
'coords' for that label will be None.
"""
def fetch_coords(label_id):
try:
coords = fetch_sparsevol_coarse(server, uuid, instance, label_id, supervoxels)
return (label_id, coords)
except HTTPError as ex:
if (ex.response is not None and ex.response.status_code == 404):
return (label_id, None)
raise
with ThreadPool(num_threads) as pool:
labels_coords = pool.imap_unordered(fetch_coords, labels)
labels_coords = list(tqdm_proxy(labels_coords, total=len(labels)), logger=logger)
return dict(labels_coords)
@dvid_api_wrapper
def fetch_sparsevol(server, uuid, instance, label, supervoxels=False, scale=0, dtype=np.int32, *, session=None):
"""
Return coordinates of all voxels in the given body/supervoxel at the given scale.
For dtype arg, see parse_rle_response()
Note: At scale 0, this will be a LOT of data for any reasonably large body.
Use with caution.
"""
rles = fetch_sparsevol_rles(server, uuid, instance, label, supervoxels, scale, session=session)
return parse_rle_response(rles, dtype)
def compute_changed_bodies(instance_info_a, instance_info_b, *, session=None):
"""
Returns the list of all bodies whose supervoxels changed
between uuid_a and uuid_b.
This includes bodies that were changed, added, or removed completely.
Args:
instance_info_a:
(server, uuid, instance)
instance_info_b:
(server, uuid, instance)
"""
mapping_a = fetch_mappings(*instance_info_a, session=session)
mapping_b = fetch_mappings(*instance_info_b, session=session)
assert mapping_a.name == 'body'
assert mapping_b.name == 'body'
mapping_a = pd.DataFrame(mapping_a)
mapping_b = pd.DataFrame(mapping_b)
logger.info("Aligning mappings")
df = mapping_a.merge(mapping_b, 'outer', left_index=True, right_index=True, suffixes=['_a', '_b'], copy=False)
changed_df = df.query('body_a != body_b')
changed_df.fillna(0, inplace=True)
changed_bodies = np.unique(changed_df.values.astype(np.uint64))
if changed_bodies[0] == 0:
changed_bodies = changed_bodies[1:]
return changed_bodies
@dvid_api_wrapper
def generate_sample_coordinate(server, uuid, instance, label_id, supervoxels=False, *, session=None):
"""
Return an arbitrary coordinate that lies within the given body.
Usually faster than fetching all the RLEs.
This function fetches the sparsevol-coarse to select a block
in which the body of interest can be found, then it fetches the segmentation
for that block and picks a point within it that lies within the body of interest.
Args:
server:
dvid server, e.g. 'emdata3:8900'
uuid:
dvid uuid, e.g. 'abc9'
instance:
dvid instance name, e.g. 'segmentation'
label_id:
A body or supervoxel ID (if supervoxels=True)
supervoxels:
If True, treat ``label_id`` as a supervoxel ID.
Returns:
[Z,Y,X] -- An arbitrary point within the body of interest.
"""
SCALE = 6 # sparsevol-coarse is always scale 6
coarse_block_coords = fetch_sparsevol_coarse(server, uuid, instance, label_id, supervoxels, session=session)
num_blocks = len(coarse_block_coords)
middle_block_coord = (2**SCALE) * np.array(coarse_block_coords[num_blocks//2]) // 64 * 64
middle_block_box = (middle_block_coord, middle_block_coord + 64)
block = fetch_labelarray_voxels(server, uuid, instance, middle_block_box, supervoxels=supervoxels, session=session)
nonzero_coords = np.transpose((block == label_id).nonzero())
if len(nonzero_coords) == 0:
label_type = {False: 'body', True: 'supervoxel'}[supervoxels]
raise RuntimeError(f"The sparsevol-coarse info for this {label_type} ({label_id}) "
"appears to be out-of-sync with the scale-0 segmentation.")
return middle_block_coord + nonzero_coords[0]
@dvid_api_wrapper
def fetch_labelmap_voxels(server, uuid, instance, box_zyx, scale=0, throttle=False, supervoxels=False, *, format='array', session=None):
"""
Fetch a volume of voxels from the given instance.
Args:
server:
dvid server, e.g. 'emdata3:8900'
uuid:
dvid uuid, e.g. 'abc9'
instance:
dvid instance name, e.g. 'segmentation'
box_zyx:
The bounds of the volume to fetch in the coordinate system for the requested scale.
Given as a pair of coordinates (start, stop), e.g. [(0,0,0), (10,20,30)], in Z,Y,X order.
The box need not be block-aligned, but the request to DVID will be block aligned
to 64px boundaries, and the retrieved volume will be truncated as needed before
it is returned.
scale:
Which downsampling scale to fetch from
throttle:
If True, passed via the query string to DVID, in which case DVID might return a '503' error
if the server is too busy to service the request.
It is your responsibility to catch DVIDExceptions in that case.