-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
onnxruntime_inference_collection.py
1028 lines (862 loc) · 40.5 KB
/
onnxruntime_inference_collection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
from __future__ import annotations
import collections
import collections.abc
import os
import typing
import warnings
from typing import Any, Sequence
from onnxruntime.capi import _pybind_state as C
if typing.TYPE_CHECKING:
import onnxruntime
def get_ort_device_type(device_type: str, device_index) -> C.OrtDevice:
if device_type == "cuda":
return C.OrtDevice.cuda()
elif device_type == "cann":
return C.OrtDevice.cann()
elif device_type == "cpu":
return C.OrtDevice.cpu()
elif device_type == "dml":
return C.OrtDevice.dml()
elif device_type == "ort":
return C.get_ort_device(device_index).device_type()
else:
raise Exception("Unsupported device type: " + device_type)
def check_and_normalize_provider_args(
providers: Sequence[str | tuple[str, dict[Any, Any]]] | None,
provider_options: Sequence[dict[Any, Any]] | None,
available_provider_names: Sequence[str],
):
"""
Validates the 'providers' and 'provider_options' arguments and returns a
normalized version.
:param providers: Optional sequence of providers in order of decreasing
precedence. Values can either be provider names or tuples of
(provider name, options dict).
:param provider_options: Optional sequence of options dicts corresponding
to the providers listed in 'providers'.
:param available_provider_names: The available provider names.
:return: Tuple of (normalized 'providers' sequence, normalized
'provider_options' sequence).
'providers' can contain either names or names and options. When any options
are given in 'providers', 'provider_options' should not be used.
The normalized result is a tuple of:
1. Sequence of provider names in the same order as 'providers'.
2. Sequence of corresponding provider options dicts with string keys and
values. Unspecified provider options yield empty dicts.
"""
if providers is None:
return [], []
provider_name_to_options = collections.OrderedDict()
def set_provider_options(name, options):
if name not in available_provider_names:
warnings.warn(
"Specified provider '{}' is not in available provider names."
"Available providers: '{}'".format(name, ", ".join(available_provider_names))
)
if name in provider_name_to_options:
warnings.warn(f"Duplicate provider '{name}' encountered, ignoring.")
return
normalized_options = {str(key): str(value) for key, value in options.items()}
provider_name_to_options[name] = normalized_options
if not isinstance(providers, collections.abc.Sequence):
raise ValueError("'providers' should be a sequence.")
if provider_options is not None:
if not isinstance(provider_options, collections.abc.Sequence):
raise ValueError("'provider_options' should be a sequence.")
if len(providers) != len(provider_options):
raise ValueError("'providers' and 'provider_options' should be the same length if both are given.")
if not all([isinstance(provider, str) for provider in providers]):
raise ValueError("Only string values for 'providers' are supported if 'provider_options' is given.")
if not all([isinstance(options_for_provider, dict) for options_for_provider in provider_options]):
raise ValueError("'provider_options' values must be dicts.")
for name, options in zip(providers, provider_options):
set_provider_options(name, options)
else:
for provider in providers:
if isinstance(provider, str):
set_provider_options(provider, dict())
elif (
isinstance(provider, tuple)
and len(provider) == 2
and isinstance(provider[0], str)
and isinstance(provider[1], dict)
):
set_provider_options(provider[0], provider[1])
else:
raise ValueError("'providers' values must be either strings or (string, dict) tuples.")
return list(provider_name_to_options.keys()), list(provider_name_to_options.values())
class Session:
"""
This is the main class used to run a model.
"""
def __init__(self):
# self._sess is managed by the derived class and relies on bindings from C.InferenceSession
self._sess = None
self._enable_fallback = True
def get_session_options(self):
"Return the session options. See :class:`onnxruntime.SessionOptions`."
return self._sess_options
def get_inputs(self):
"Return the inputs metadata as a list of :class:`onnxruntime.NodeArg`."
return self._inputs_meta
def get_outputs(self):
"Return the outputs metadata as a list of :class:`onnxruntime.NodeArg`."
return self._outputs_meta
def get_overridable_initializers(self):
"Return the inputs (including initializers) metadata as a list of :class:`onnxruntime.NodeArg`."
return self._overridable_initializers
def get_modelmeta(self):
"Return the metadata. See :class:`onnxruntime.ModelMetadata`."
return self._model_meta
def get_providers(self):
"Return list of registered execution providers."
return self._providers
def get_provider_options(self):
"Return registered execution providers' configurations."
return self._provider_options
def set_providers(self, providers=None, provider_options=None):
"""
Register the input list of execution providers. The underlying session is re-created.
:param providers: Optional sequence of providers in order of decreasing
precedence. Values can either be provider names or tuples of
(provider name, options dict). If not provided, then all available
providers are used with the default precedence.
:param provider_options: Optional sequence of options dicts corresponding
to the providers listed in 'providers'.
'providers' can contain either names or names and options. When any options
are given in 'providers', 'provider_options' should not be used.
The list of providers is ordered by precedence. For example
`['CUDAExecutionProvider', 'CPUExecutionProvider']`
means execute a node using CUDAExecutionProvider if capable,
otherwise execute using CPUExecutionProvider.
"""
# recreate the underlying C.InferenceSession
self._reset_session(providers, provider_options)
def disable_fallback(self):
"""
Disable session.run() fallback mechanism.
"""
self._enable_fallback = False
def enable_fallback(self):
"""
Enable session.Run() fallback mechanism. If session.Run() fails due to an internal Execution Provider failure,
reset the Execution Providers enabled for this session.
If GPU is enabled, fall back to CUDAExecutionProvider.
otherwise fall back to CPUExecutionProvider.
"""
self._enable_fallback = True
def _validate_input(self, feed_input_names):
missing_input_names = []
for input in self._inputs_meta:
if input.name not in feed_input_names and not input.type.startswith("optional"):
missing_input_names.append(input.name)
if missing_input_names:
raise ValueError(
f"Required inputs ({missing_input_names}) are missing from input feed ({feed_input_names})."
)
def run(self, output_names, input_feed, run_options=None):
"""
Compute the predictions.
:param output_names: name of the outputs
:param input_feed: dictionary ``{ input_name: input_value }``
:param run_options: See :class:`onnxruntime.RunOptions`.
:return: list of results, every result is either a numpy array,
a sparse tensor, a list or a dictionary.
::
sess.run([output_name], {input_name: x})
"""
self._validate_input(list(input_feed.keys()))
if not output_names:
output_names = [output.name for output in self._outputs_meta]
try:
return self._sess.run(output_names, input_feed, run_options)
except C.EPFail as err:
if self._enable_fallback:
print(f"EP Error: {err!s} using {self._providers}")
print(f"Falling back to {self._fallback_providers} and retrying.")
self.set_providers(self._fallback_providers)
# Fallback only once.
self.disable_fallback()
return self._sess.run(output_names, input_feed, run_options)
raise
def run_async(self, output_names, input_feed, callback, user_data, run_options=None):
"""
Compute the predictions asynchronously in a separate cxx thread from ort intra-op threadpool.
:param output_names: name of the outputs
:param input_feed: dictionary ``{ input_name: input_value }``
:param callback: python function that accept array of results, and a status string on error.
The callback will be invoked by a cxx thread from ort intra-op threadpool.
:param run_options: See :class:`onnxruntime.RunOptions`.
::
class MyData:
def __init__(self):
# ...
def save_results(self, results):
# ...
def callback(results: np.ndarray, user_data: MyData, err: str) -> None:
if err:
print (err)
else:
# save results to user_data
sess.run_async([output_name], {input_name: x}, callback)
"""
self._validate_input(list(input_feed.keys()))
if not output_names:
output_names = [output.name for output in self._outputs_meta]
return self._sess.run_async(output_names, input_feed, callback, user_data, run_options)
def run_with_ort_values(self, output_names, input_dict_ort_values, run_options=None):
"""
Compute the predictions.
:param output_names: name of the outputs
:param input_dict_ort_values: dictionary ``{ input_name: input_ort_value }``
See ``OrtValue`` class how to create `OrtValue`
from numpy array or `SparseTensor`
:param run_options: See :class:`onnxruntime.RunOptions`.
:return: an array of `OrtValue`
::
sess.run([output_name], {input_name: x})
"""
def invoke(sess, output_names, input_dict_ort_values, run_options):
input_dict = {}
for n, v in input_dict_ort_values.items():
input_dict[n] = v._get_c_value()
result = sess.run_with_ort_values(input_dict, output_names, run_options)
if not isinstance(result, C.OrtValueVector):
raise TypeError("run_with_ort_values() must return a instance of type 'OrtValueVector'.")
ort_values = [OrtValue(v) for v in result]
return ort_values
self._validate_input(list(input_dict_ort_values.keys()))
if not output_names:
output_names = [output.name for output in self._outputs_meta]
try:
return invoke(self._sess, output_names, input_dict_ort_values, run_options)
except C.EPFail as err:
if self._enable_fallback:
print(f"EP Error: {err!s} using {self._providers}")
print(f"Falling back to {self._fallback_providers} and retrying.")
self.set_providers(self._fallback_providers)
# Fallback only once.
self.disable_fallback()
return invoke(self._sess, output_names, input_dict_ort_values, run_options)
raise
def end_profiling(self):
"""
End profiling and return results in a file.
The results are stored in a filename if the option
:meth:`onnxruntime.SessionOptions.enable_profiling`.
"""
return self._sess.end_profiling()
def get_profiling_start_time_ns(self):
"""
Return the nanoseconds of profiling's start time
Comparable to time.monotonic_ns() after Python 3.3
On some platforms, this timer may not be as precise as nanoseconds
For instance, on Windows and MacOS, the precision will be ~100ns
"""
return self._sess.get_profiling_start_time_ns
def io_binding(self):
"Return an onnxruntime.IOBinding object`."
return IOBinding(self)
def run_with_iobinding(self, iobinding, run_options=None):
"""
Compute the predictions.
:param iobinding: the iobinding object that has graph inputs/outputs bind.
:param run_options: See :class:`onnxruntime.RunOptions`.
"""
self._sess.run_with_iobinding(iobinding._iobinding, run_options)
def get_tuning_results(self):
return self._sess.get_tuning_results()
def set_tuning_results(self, results, *, error_on_invalid=False):
return self._sess.set_tuning_results(results, error_on_invalid)
def run_with_ortvaluevector(self, run_options, feed_names, feeds, fetch_names, fetches, fetch_devices):
"""
Compute the predictions similar to other run_*() methods but with minimal C++/Python conversion overhead.
:param run_options: See :class:`onnxruntime.RunOptions`.
:param feed_names: list of input names.
:param feeds: list of input OrtValue.
:param fetch_names: list of output names.
:param fetches: list of output OrtValue.
:param fetch_devices: list of output devices.
"""
self._sess.run_with_ortvaluevector(run_options, feed_names, feeds, fetch_names, fetches, fetch_devices)
class InferenceSession(Session):
"""
This is the main class used to run a model.
"""
def __init__(
self,
path_or_bytes: str | bytes | os.PathLike,
sess_options: onnxruntime.SessionOptions | None = None,
providers: Sequence[str | tuple[str, dict[Any, Any]]] | None = None,
provider_options: Sequence[dict[Any, Any]] | None = None,
**kwargs,
) -> None:
"""
:param path_or_bytes: Filename or serialized ONNX or ORT format model in a byte string.
:param sess_options: Session options.
:param providers: Optional sequence of providers in order of decreasing
precedence. Values can either be provider names or tuples of
(provider name, options dict). If not provided, then all available
providers are used with the default precedence.
:param provider_options: Optional sequence of options dicts corresponding
to the providers listed in 'providers'.
The model type will be inferred unless explicitly set in the SessionOptions.
To explicitly set:
::
so = onnxruntime.SessionOptions()
# so.add_session_config_entry('session.load_model_format', 'ONNX') or
so.add_session_config_entry('session.load_model_format', 'ORT')
A file extension of '.ort' will be inferred as an ORT format model.
All other filenames are assumed to be ONNX format models.
'providers' can contain either names or names and options. When any options
are given in 'providers', 'provider_options' should not be used.
The list of providers is ordered by precedence. For example
`['CUDAExecutionProvider', 'CPUExecutionProvider']`
means execute a node using `CUDAExecutionProvider`
if capable, otherwise execute using `CPUExecutionProvider`.
"""
super().__init__()
if isinstance(path_or_bytes, (str, os.PathLike)):
self._model_path = os.fspath(path_or_bytes)
self._model_bytes = None
elif isinstance(path_or_bytes, bytes):
self._model_path = None
self._model_bytes = path_or_bytes # TODO: This is bad as we're holding the memory indefinitely
else:
raise TypeError(f"Unable to load from type '{type(path_or_bytes)}'")
self._sess_options = sess_options
self._sess_options_initial = sess_options
self._enable_fallback = True
if "read_config_from_model" in kwargs:
self._read_config_from_model = int(kwargs["read_config_from_model"]) == 1
else:
self._read_config_from_model = os.environ.get("ORT_LOAD_CONFIG_FROM_MODEL") == "1"
# internal parameters that we don't expect to be used in general so aren't documented
disabled_optimizers = kwargs.get("disabled_optimizers")
try:
self._create_inference_session(providers, provider_options, disabled_optimizers)
except (ValueError, RuntimeError) as e:
if self._enable_fallback:
try:
print("*************** EP Error ***************")
print(f"EP Error {e} when using {providers}")
print(f"Falling back to {self._fallback_providers} and retrying.")
print("****************************************")
self._create_inference_session(self._fallback_providers, None)
# Fallback only once.
self.disable_fallback()
return
except Exception as fallback_error:
raise fallback_error from e
# Fallback is disabled. Raise the original error.
raise e
def _create_inference_session(self, providers, provider_options, disabled_optimizers=None):
available_providers = C.get_available_providers()
# Tensorrt can fall back to CUDA if it's explicitly assigned. All others fall back to CPU.
if "TensorrtExecutionProvider" in available_providers:
if providers and any(
provider == "CUDAExecutionProvider"
or (isinstance(provider, tuple) and provider[0] == "CUDAExecutionProvider")
for provider in providers
):
self._fallback_providers = ["CUDAExecutionProvider", "CPUExecutionProvider"]
else:
self._fallback_providers = ["CPUExecutionProvider"]
# MIGraphX can fall back to ROCM if it's explicitly assigned. All others fall back to CPU.
elif "MIGraphXExecutionProvider" in available_providers:
if providers and any(
provider == "ROCMExecutionProvider"
or (isinstance(provider, tuple) and provider[0] == "ROCMExecutionProvider")
for provider in providers
):
self._fallback_providers = ["ROCMExecutionProvider", "CPUExecutionProvider"]
else:
self._fallback_providers = ["CPUExecutionProvider"]
else:
self._fallback_providers = ["CPUExecutionProvider"]
# validate providers and provider_options before other initialization
providers, provider_options = check_and_normalize_provider_args(
providers, provider_options, available_providers
)
session_options = self._sess_options if self._sess_options else C.get_default_session_options()
self._register_ep_custom_ops(session_options, providers, provider_options, available_providers)
if self._model_path:
sess = C.InferenceSession(session_options, self._model_path, True, self._read_config_from_model)
else:
sess = C.InferenceSession(session_options, self._model_bytes, False, self._read_config_from_model)
if disabled_optimizers is None:
disabled_optimizers = set()
elif not isinstance(disabled_optimizers, set):
# convert to set. assumes iterable
disabled_optimizers = set(disabled_optimizers)
# initialize the C++ InferenceSession
sess.initialize_session(providers, provider_options, disabled_optimizers)
self._sess = sess
self._sess_options = self._sess.session_options
self._inputs_meta = self._sess.inputs_meta
self._outputs_meta = self._sess.outputs_meta
self._overridable_initializers = self._sess.overridable_initializers
self._model_meta = self._sess.model_meta
self._providers = self._sess.get_providers()
self._provider_options = self._sess.get_provider_options()
self._profiling_start_time_ns = self._sess.get_profiling_start_time_ns
def _reset_session(self, providers, provider_options):
"release underlying session object."
# meta data references session internal structures
# so they must be set to None to decrement _sess reference count.
self._sess_options = None
self._inputs_meta = None
self._outputs_meta = None
self._overridable_initializers = None
self._model_meta = None
self._providers = None
self._provider_options = None
self._profiling_start_time_ns = None
# create a new C.InferenceSession
self._sess = None
self._sess_options = self._sess_options_initial
self._create_inference_session(providers, provider_options)
def _register_ep_custom_ops(self, session_options, providers, provider_options, available_providers):
for i in range(len(providers)):
if providers[i] in available_providers and providers[i] == "TensorrtExecutionProvider":
C.register_tensorrt_plugins_as_custom_ops(session_options, provider_options[i])
elif (
isinstance(providers[i], tuple)
and providers[i][0] in available_providers
and providers[i][0] == "TensorrtExecutionProvider"
):
C.register_tensorrt_plugins_as_custom_ops(session_options, providers[i][1])
class IOBinding:
"""
This class provides API to bind input/output to a specified device, e.g. GPU.
"""
def __init__(self, session: Session):
self._iobinding = C.SessionIOBinding(session._sess)
self._numpy_obj_references = {}
def bind_cpu_input(self, name, arr_on_cpu):
"""
bind an input to array on CPU
:param name: input name
:param arr_on_cpu: input values as a python array on CPU
"""
# Hold a reference to the numpy object as the bound OrtValue is backed
# directly by the data buffer of the numpy object and so the numpy object
# must be around until this IOBinding instance is around
self._numpy_obj_references[name] = arr_on_cpu
self._iobinding.bind_input(name, arr_on_cpu)
def bind_input(self, name, device_type, device_id, element_type, shape, buffer_ptr):
"""
:param name: input name
:param device_type: e.g. cpu, cuda, cann
:param device_id: device id, e.g. 0
:param element_type: input element type
:param shape: input shape
:param buffer_ptr: memory pointer to input data
"""
self._iobinding.bind_input(
name,
C.OrtDevice(
get_ort_device_type(device_type, device_id),
C.OrtDevice.default_memory(),
device_id,
),
element_type,
shape,
buffer_ptr,
)
def bind_ortvalue_input(self, name, ortvalue):
"""
:param name: input name
:param ortvalue: OrtValue instance to bind
"""
self._iobinding.bind_ortvalue_input(name, ortvalue._ortvalue)
def synchronize_inputs(self):
self._iobinding.synchronize_inputs()
def bind_output(
self,
name,
device_type="cpu",
device_id=0,
element_type=None,
shape=None,
buffer_ptr=None,
):
"""
:param name: output name
:param device_type: e.g. cpu, cuda, cann, cpu by default
:param device_id: device id, e.g. 0
:param element_type: output element type
:param shape: output shape
:param buffer_ptr: memory pointer to output data
"""
# Follow the `if` path when the user has not provided any pre-allocated buffer but still
# would like to bind an output to a specific device (e.g. cuda).
# Pre-allocating an output buffer may not be an option for the user as :
# (1) They may not want to use a custom allocator specific to the device they want to bind the output to,
# in which case ORT will allocate the memory for the user
# (2) The output has a dynamic shape and hence the size of the buffer may not be fixed across runs
if buffer_ptr is None:
self._iobinding.bind_output(
name,
C.OrtDevice(
get_ort_device_type(device_type, device_id),
C.OrtDevice.default_memory(),
device_id,
),
)
else:
if element_type is None or shape is None:
raise ValueError("`element_type` and `shape` are to be provided if pre-allocated memory is provided")
self._iobinding.bind_output(
name,
C.OrtDevice(
get_ort_device_type(device_type, device_id),
C.OrtDevice.default_memory(),
device_id,
),
element_type,
shape,
buffer_ptr,
)
def bind_ortvalue_output(self, name, ortvalue):
"""
:param name: output name
:param ortvalue: OrtValue instance to bind
"""
self._iobinding.bind_ortvalue_output(name, ortvalue._ortvalue)
def synchronize_outputs(self):
self._iobinding.synchronize_outputs()
def get_outputs(self):
"""
Returns the output OrtValues from the Run() that preceded the call.
The data buffer of the obtained OrtValues may not reside on CPU memory
"""
outputs = self._iobinding.get_outputs()
if not isinstance(outputs, C.OrtValueVector):
raise TypeError("get_outputs() must return an instance of type 'OrtValueVector'.")
return [OrtValue(ortvalue) for ortvalue in outputs]
def get_outputs_as_ortvaluevector(self):
return self._iobinding.get_outputs()
def copy_outputs_to_cpu(self):
"""Copy output contents to CPU (if on another device). No-op if already on the CPU."""
return self._iobinding.copy_outputs_to_cpu()
def clear_binding_inputs(self):
self._iobinding.clear_binding_inputs()
def clear_binding_outputs(self):
self._iobinding.clear_binding_outputs()
class OrtValue:
"""
A data structure that supports all ONNX data formats (tensors and non-tensors) that allows users
to place the data backing these on a device, for example, on a CUDA supported device.
This class provides APIs to construct and deal with OrtValues.
"""
def __init__(self, ortvalue, numpy_obj=None):
if isinstance(ortvalue, C.OrtValue):
self._ortvalue = ortvalue
# Hold a ref count to the numpy object if the OrtValue is backed directly
# by its data buffer so that it isn't destroyed when the OrtValue is in use
self._numpy_obj = numpy_obj
else:
# An end user won't hit this error
raise ValueError(
"`Provided ortvalue` needs to be of type `onnxruntime.capi.onnxruntime_pybind11_state.OrtValue`"
)
def _get_c_value(self):
return self._ortvalue
@staticmethod
def ortvalue_from_numpy(numpy_obj, device_type="cpu", device_id=0):
"""
Factory method to construct an OrtValue (which holds a Tensor) from a given Numpy object
A copy of the data in the Numpy object is held by the OrtValue only if the device is NOT cpu
:param numpy_obj: The Numpy object to construct the OrtValue from
:param device_type: e.g. cpu, cuda, cann, cpu by default
:param device_id: device id, e.g. 0
"""
# Hold a reference to the numpy object (if device_type is 'cpu') as the OrtValue
# is backed directly by the data buffer of the numpy object and so the numpy object
# must be around until this OrtValue instance is around
return OrtValue(
C.OrtValue.ortvalue_from_numpy(
numpy_obj,
C.OrtDevice(
get_ort_device_type(device_type, device_id),
C.OrtDevice.default_memory(),
device_id,
),
),
numpy_obj if device_type.lower() == "cpu" else None,
)
@staticmethod
def ortvalue_from_shape_and_type(shape=None, element_type=None, device_type="cpu", device_id=0):
"""
Factory method to construct an OrtValue (which holds a Tensor) from given shape and element_type
:param shape: List of integers indicating the shape of the OrtValue
:param element_type: The data type of the elements in the OrtValue (numpy type)
:param device_type: e.g. cpu, cuda, cann, cpu by default
:param device_id: device id, e.g. 0
"""
if shape is None or element_type is None:
raise ValueError("`element_type` and `shape` are to be provided if pre-allocated memory is provided")
return OrtValue(
C.OrtValue.ortvalue_from_shape_and_type(
shape,
element_type,
C.OrtDevice(
get_ort_device_type(device_type, device_id),
C.OrtDevice.default_memory(),
device_id,
),
)
)
@staticmethod
def ort_value_from_sparse_tensor(sparse_tensor):
"""
The function will construct an OrtValue instance from a valid SparseTensor
The new instance of OrtValue will assume the ownership of sparse_tensor
"""
return OrtValue(C.OrtValue.ort_value_from_sparse_tensor(sparse_tensor._get_c_tensor()))
def as_sparse_tensor(self):
"""
The function will return SparseTensor contained in this OrtValue
"""
return SparseTensor(self._ortvalue.as_sparse_tensor())
def data_ptr(self):
"""
Returns the address of the first element in the OrtValue's data buffer
"""
return self._ortvalue.data_ptr()
def device_name(self):
"""
Returns the name of the device where the OrtValue's data buffer resides e.g. cpu, cuda, cann
"""
return self._ortvalue.device_name().lower()
def shape(self):
"""
Returns the shape of the data in the OrtValue
"""
return self._ortvalue.shape()
def data_type(self):
"""
Returns the data type of the data in the OrtValue
"""
return self._ortvalue.data_type()
def element_type(self):
"""
Returns the proto type of the data in the OrtValue
if the OrtValue is a tensor.
"""
return self._ortvalue.element_type()
def has_value(self):
"""
Returns True if the OrtValue corresponding to an
optional type contains data, else returns False
"""
return self._ortvalue.has_value()
def is_tensor(self):
"""
Returns True if the OrtValue contains a Tensor, else returns False
"""
return self._ortvalue.is_tensor()
def is_sparse_tensor(self):
"""
Returns True if the OrtValue contains a SparseTensor, else returns False
"""
return self._ortvalue.is_sparse_tensor()
def is_tensor_sequence(self):
"""
Returns True if the OrtValue contains a Tensor Sequence, else returns False
"""
return self._ortvalue.is_tensor_sequence()
def numpy(self):
"""
Returns a Numpy object from the OrtValue.
Valid only for OrtValues holding Tensors. Throws for OrtValues holding non-Tensors.
Use accessors to gain a reference to non-Tensor objects such as SparseTensor
"""
return self._ortvalue.numpy()
def update_inplace(self, np_arr):
"""
Update the OrtValue in place with a new Numpy array. The numpy contents
are copied over to the device memory backing the OrtValue. It can be used
to update the input valuess for an InferenceSession with CUDA graph
enabled or other scenarios where the OrtValue needs to be updated while
the memory address can not be changed.
"""
self._ortvalue.update_inplace(np_arr)
class OrtDevice:
"""
A data structure that exposes the underlying C++ OrtDevice
"""
def __init__(self, c_ort_device):
"""
Internal constructor
"""
if isinstance(c_ort_device, C.OrtDevice):
self._ort_device = c_ort_device
else:
raise ValueError(
"`Provided object` needs to be of type `onnxruntime.capi.onnxruntime_pybind11_state.OrtDevice`"
)
def _get_c_device(self):
"""
Internal accessor to underlying object
"""
return self._ort_device
@staticmethod
def make(ort_device_name, device_id):
return OrtDevice(
C.OrtDevice(
get_ort_device_type(ort_device_name, device_id),
C.OrtDevice.default_memory(),
device_id,
)
)
def device_id(self):
return self._ort_device.device_id()
def device_type(self):
return self._ort_device.device_type()
class SparseTensor:
"""
A data structure that project the C++ SparseTensor object
The class provides API to work with the object.
Depending on the format, the class will hold more than one buffer
depending on the format
"""
def __init__(self, sparse_tensor):
"""
Internal constructor
"""
if isinstance(sparse_tensor, C.SparseTensor):
self._tensor = sparse_tensor
else:
# An end user won't hit this error
raise ValueError(
"`Provided object` needs to be of type `onnxruntime.capi.onnxruntime_pybind11_state.SparseTensor`"
)
def _get_c_tensor(self):
return self._tensor
@staticmethod
def sparse_coo_from_numpy(dense_shape, values, coo_indices, ort_device):
"""
Factory method to construct a SparseTensor in COO format from given arguments
:param dense_shape: 1-D numpy array(int64) or a python list that contains a dense_shape of the sparse tensor
must be on cpu memory
:param values: a homogeneous, contiguous 1-D numpy array that contains non-zero elements of the tensor
of a type.
:param coo_indices: contiguous numpy array(int64) that contains COO indices for the tensor. coo_indices may
have a 1-D shape when it contains a linear index of non-zero values and its length must be equal to
that of the values. It can also be of 2-D shape, in which has it contains pairs of coordinates for
each of the nnz values and its length must be exactly twice of the values length.
:param ort_device: - describes the backing memory owned by the supplied nummpy arrays. Only CPU memory is
suppored for non-numeric data types.
For primitive types, the method will map values and coo_indices arrays into native memory and will use
them as backing storage. It will increment the reference count for numpy arrays and it will decrement it
on GC. The buffers may reside in any storage either CPU or GPU.
For strings and objects, it will create a copy of the arrays in CPU memory as ORT does not support those
on other devices and their memory can not be mapped.
"""
return SparseTensor(
C.SparseTensor.sparse_coo_from_numpy(dense_shape, values, coo_indices, ort_device._get_c_device())
)
@staticmethod
def sparse_csr_from_numpy(dense_shape, values, inner_indices, outer_indices, ort_device):
"""
Factory method to construct a SparseTensor in CSR format from given arguments
:param dense_shape: 1-D numpy array(int64) or a python list that contains a dense_shape of the
sparse tensor (rows, cols) must be on cpu memory
:param values: a contiguous, homogeneous 1-D numpy array that contains non-zero elements of the tensor
of a type.
:param inner_indices: contiguous 1-D numpy array(int64) that contains CSR inner indices for the tensor.
Its length must be equal to that of the values.
:param outer_indices: contiguous 1-D numpy array(int64) that contains CSR outer indices for the tensor.
Its length must be equal to the number of rows + 1.
:param ort_device: - describes the backing memory owned by the supplied nummpy arrays. Only CPU memory is
suppored for non-numeric data types.
For primitive types, the method will map values and indices arrays into native memory and will use them as
backing storage. It will increment the reference count and it will decrement then count when it is GCed.
The buffers may reside in any storage either CPU or GPU.
For strings and objects, it will create a copy of the arrays in CPU memory as ORT does not support those
on other devices and their memory can not be mapped.
"""
return SparseTensor(
C.SparseTensor.sparse_csr_from_numpy(
dense_shape,
values,
inner_indices,
outer_indices,
ort_device._get_c_device(),
)
)
def values(self):
"""
The method returns a numpy array that is backed by the native memory
if the data type is numeric. Otherwise, the returned numpy array that contains
copies of the strings.
"""
return self._tensor.values()
def as_coo_view(self):
"""
The method will return coo representation of the sparse tensor which will enable
querying COO indices. If the instance did not contain COO format, it would throw.
You can query coo indices as:
::
coo_indices = sparse_tensor.as_coo_view().indices()
which will return a numpy array that is backed by the native memory.
"""
return self._tensor.get_coo_data()
def as_csrc_view(self):
"""
The method will return CSR(C) representation of the sparse tensor which will enable
querying CRS(C) indices. If the instance dit not contain CSR(C) format, it would throw.
You can query indices as:
::
inner_ndices = sparse_tensor.as_csrc_view().inner()
outer_ndices = sparse_tensor.as_csrc_view().outer()
returning numpy arrays backed by the native memory.
"""
return self._tensor.get_csrc_data()
def as_blocksparse_view(self):
"""
The method will return coo representation of the sparse tensor which will enable
querying BlockSparse indices. If the instance did not contain BlockSparse format, it would throw.
You can query coo indices as:
::
block_sparse_indices = sparse_tensor.as_blocksparse_view().indices()
which will return a numpy array that is backed by the native memory
"""
return self._tensor.get_blocksparse_data()
def to_cuda(self, ort_device):
"""
Returns a copy of this instance on the specified cuda device
:param ort_device: with name 'cuda' and valid gpu device id
The method will throw if:
- this instance contains strings
- this instance is already on GPU. Cross GPU copy is not supported