-
Notifications
You must be signed in to change notification settings - Fork 116
/
fpga.py
2277 lines (1958 loc) · 114 KB
/
fpga.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2019-2021 ETH Zurich and the DaCe authors. All rights reserved.
from six import StringIO
import collections
import enum
import functools
import itertools
import re
import warnings
import sympy as sp
import numpy as np
from typing import Dict, Iterable, List, Set, Tuple, Union
import copy
import dace
from dace.codegen.targets import cpp
from dace import subsets, data as dt, dtypes, memlet, symbolic
from dace.config import Config
from dace.frontend import operations
from dace.sdfg import SDFG, nodes, utils, dynamic_map_inputs
from dace.sdfg import ScopeSubgraphView, find_input_arraynode, find_output_arraynode
from dace.codegen import exceptions as cgx
from dace.codegen.codeobject import CodeObject
from dace.codegen.dispatcher import DefinedType
from dace.codegen.prettycode import CodeIOStream
from dace.codegen.targets.target import (TargetCodeGenerator, IllegalCopy, make_absolute)
from dace.codegen import cppunparse
from dace.properties import Property, make_properties, indirect_properties
from dace.sdfg.state import SDFGState
from dace.sdfg.utils import is_fpga_kernel
from dace.symbolic import evaluate
from dace.transformation.dataflow import MapUnroll
from collections import defaultdict
_CPU_STORAGE_TYPES = {dtypes.StorageType.CPU_Heap, dtypes.StorageType.CPU_ThreadLocal, dtypes.StorageType.CPU_Pinned}
_FPGA_STORAGE_TYPES = {
dtypes.StorageType.FPGA_Global, dtypes.StorageType.FPGA_Local, dtypes.StorageType.FPGA_Registers,
dtypes.StorageType.FPGA_ShiftRegister
}
_FPGA_LOCAL_STORAGE_TYPES = {
dtypes.StorageType.FPGA_Local, dtypes.StorageType.FPGA_Registers, dtypes.StorageType.FPGA_ShiftRegister
}
def vector_element_type_of(dtype):
if isinstance(dtype, dace.pointer):
# "Dereference" the pointer type and try again
return vector_element_type_of(dtype.base_type)
elif isinstance(dtype, dace.vector):
return dtype.base_type
return dtype
def is_external_stream(node: dace.sdfg.nodes.Node, subgraph: Union[dace.sdfg.SDFGState, ScopeSubgraphView]):
'''
Given a node and a subgraph, returns whether this is an external stream (the other endpoint is in
another FPGA Kernel) or not.
:return: True if node represent an external stream, False otherwise
'''
external = False
# If this is a stream, check if the other side of it is in the same kernel/subgraph
if isinstance(node, dace.nodes.AccessNode) and isinstance(node.desc(subgraph), dt.Stream):
for nn in subgraph.nodes():
if nn != node and isinstance(nn, dace.nodes.AccessNode) and node.desc(subgraph) == nn.desc(subgraph):
break
else:
external = True
return external
def is_multibank_array(array: dt.Data):
"""
:return: True if this array is placed on HBM/DDR on FPGA Global memory
"""
if (isinstance(array, dt.Array) and array.storage == dtypes.StorageType.FPGA_Global):
res = parse_location_bank(array)
return res is not None and (res[0] == "HBM" or res[0] == "DDR")
else:
return False
def is_multibank_array_with_distributed_index(array: dt.Data):
"""
:return: True if this array is placed on HBM/DDR and has an extra first
dimension equal to the number of banks is placed on. For HBM/DDR arrays
spanning across multiple banks this is always true.
"""
if is_multibank_array(array):
res = parse_location_bank(array)
low, high = get_multibank_ranges_from_subset(res[1], None)
return high - low > 1 or (len(array.shape) > 1 and str(array.shape[0]) == "1")
else:
return False
def is_fpga_array(array: dt.Data):
"""
:return: True if this array is placed on FPGA memory
"""
return isinstance(array, dt.Array) and array.storage in _FPGA_STORAGE_TYPES
def iterate_multibank_interface_ids(array: dt.Array, interface_ids: Union[int, List[Tuple[int, int]]]):
"""
Works on the interface_ids generated by make_parameter. If the array is a hbm/ddr multibank array,
interface_ids is a list of tuples of the form (bank, id), and the method will yield the values
one by one. If it is not, it will return a tuple of 0 (bank) and the interface id once.
"""
if is_multibank_array_with_distributed_index(array):
for bank, id in interface_ids:
yield (bank, id)
else:
yield (0, interface_ids)
def iterate_distributed_subset(desc: dt.Array, access_memlet: memlet.Memlet, is_write: bool, sdfg: SDFG):
"""
:param desc: The array accessed by the memlet
:param access_memlet: The memlet
:param is_write: If we care about the write or read direction. is_write means we write to desc,
not is_write means we read from it
:return: if access_memlet contains a distributed subset the method will count from the lower to the upper
end of it. Otherwise returns 0 once.
"""
if is_multibank_array_with_distributed_index(desc):
if is_write:
subset = access_memlet.dst_subset or access_memlet.subset
else:
subset = access_memlet.src_subset or access_memlet.subset
if subset is None:
yield 0
else:
# We can assume anywhere in the FPGA codegen that distributed subsets
# are evaluatable, because all maps are unrolled before codegen
low, high = get_multibank_ranges_from_subset(subset, sdfg)
for k in range(low, high):
yield k
else:
yield 0
def modify_distributed_subset(subset: subsets.Subset, change: int):
"""
Modifies the first index of :param subset: (the one used for distributed subsets).
:param subset: is deepcopied before any modification to it is done.
:param change: the first index is set to this value, unless it's (-1) in which case
the first index is completly removed
"""
cps = copy.deepcopy(subset)
if change == -1:
cps.pop([0])
else:
cps[0] = (change, change, 1)
return cps
def get_multibank_ranges_from_subset(subset: Union[subsets.Subset, str], sdfg: SDFG) -> Tuple[int, int]:
"""
Returns the upper and lower end of the accessed multibank-range, evaluated using the
constants on the SDFG.
:returns: (low, high) where low = the lowest accessed bank and high the
highest accessed bank + 1.
"""
if isinstance(subset, str):
subset = subsets.Range.from_string(subset)
low, high, stride = subset[0]
if stride != 1:
raise NotImplementedError(f"Strided multibank subsets not supported.")
try:
low = int(symbolic.resolve_symbol_to_constant(low, sdfg))
high = int(symbolic.resolve_symbol_to_constant(high, sdfg))
except:
raise ValueError(f"Only constant evaluatable indices allowed for multibank-memlets on the bank index.")
return (low, high + 1)
def parse_location_bank(array: dt.Array) -> Tuple[str, str]:
"""
:param array: an array on FPGA global memory
:return: None if an array is given which does not have a location['memorytype'] value.
Otherwise it will return a tuple (bank_type, bank_assignment), where bank_type
is one of 'DDR', 'HBM' and bank_assignment a string that describes which banks are
used.
"""
if "memorytype" in array.location:
if "bank" not in array.location:
raise ValueError("If 'memorytype' is specified for an array 'bank' must also be specified")
val: str = array.location["bank"]
memorytype: str = array.location["memorytype"]
memorytype = memorytype.upper()
if (memorytype == "DDR" or memorytype == "HBM"):
return (memorytype, array.location["bank"])
else:
raise ValueError(f"{memorytype} is an invalid memorytype. Supported are HBM and DDR.")
else:
return None
def fpga_ptr(name: str,
desc: dt.Data = None,
sdfg: SDFG = None,
subset_info: Union[subsets.Subset, int] = None,
is_write: bool = None,
dispatcher=None,
ancestor: int = 0,
is_array_interface: bool = False,
interface_id: int = None,
decouple_array_interfaces: bool = False):
"""
Returns a string that points to the data based on its name, and various other conditions
that may apply for that data field.
:param name: Data name.
:param desc: Data descriptor.
:param subset_info: Any additional information about the accessed subset.
:param ancestor: The ancestor level where the variable should be searched for if
is_array_interface is True when dispatcher is not None
:param is_array_interface: Data is pointing to an interface in FPGA-Kernel compilation
:param interface_id: An optional interface id that will be added to the name (only for array interfaces)
:param decouple_array_interfaces: if True it will qualify the name of an array interface, depending whether
it is used for reading from or writing to memory
:return: C-compatible name that can be used to access the data.
"""
if (desc is not None and is_multibank_array_with_distributed_index(desc)):
location_bank = parse_location_bank(desc)
mem_type = ""
if location_bank is not None:
mem_type = location_bank[0].lower()
if (subset_info == None):
raise ValueError("Cannot generate name for bank without subset info")
elif (isinstance(subset_info, int)):
name = f"{mem_type}{subset_info}_{name}"
elif (isinstance(subset_info, subsets.Subset)):
if (sdfg == None):
raise ValueError("Cannot generate name for bank using subset if sdfg not provided")
low, high = get_multibank_ranges_from_subset(subset_info, sdfg)
if (low + 1 != high):
raise ValueError("ptr cannot generate names for subsets accessing more than one memory bank")
name = f"{mem_type}{low}_{name}"
subset_info = low #used for arrayinterface name where it must be int
if is_array_interface:
if decouple_array_interfaces:
# qualify the name
if is_write is None:
raise ValueError("is_write must be set for ArrayInterface.")
ptr_in = f"__{name}_in"
ptr_out = f"__{name}_out"
if dispatcher is not None:
# DaCe allows reading from an output connector, even though it
# is not an input connector. If this occurs, panic and read
# from the output interface instead
if is_write or not dispatcher.defined_vars.has(ptr_in, ancestor):
# Throw a KeyError if this pointer also doesn't exist
dispatcher.defined_vars.get(ptr_out, ancestor)
# Otherwise use it
name = ptr_out
else:
name = ptr_in
else:
# We might call this before the variable is even defined (e.g., because
# we are about to define it), so if the dispatcher is not passed, just
# return the appropriate string
name = ptr_out if is_write else ptr_in
# Append the interface id, if provided
if interface_id is not None:
name = f"{name}_{interface_id}"
return name
def unqualify_fpga_array_name(sdfg: dace.SDFG, arr_name: str):
'''
Returns the unqualified array name if it refers to an array interface.
Otherwise return it as it is.
:param name: array name to unqualify
'''
if arr_name not in sdfg.arrays and (arr_name.endswith('_in')
or arr_name.endswith('out')) and arr_name.startswith('__'):
unqualified = re.sub('_in$|_out$', '', arr_name)
unqualified = re.sub('^__', '', unqualified)
return unqualified
else:
return arr_name
class FPGACodeGen(TargetCodeGenerator):
# Set by deriving class
target_name = None
title = None
language = None
def __init__(self, frame_codegen, sdfg: SDFG):
# The inheriting class must set target_name, title and language.
self._in_device_code = False
self._cpu_codegen = None
self._frame = frame_codegen
self._dispatcher = frame_codegen.dispatcher
self._kernel_count = 0
self._global_sdfg = sdfg
self._program_name = sdfg.name
# Verify that we did not miss the allocation of any global arrays, even
# if they're nested deep in the SDFG
self._allocated_global_arrays = set()
self._unrolled_pes = set()
# Dictionary node->kernel_id
self._node_to_kernel = defaultdict()
# Keep track of dependencies among kernels (if any)
self._kernels_dependencies = dict()
self._kernels_names_to_id = dict()
# Register dispatchers
self._cpu_codegen = self._dispatcher.get_generic_node_dispatcher()
self._host_codes = []
self._kernel_codes = []
# any other kind of generated file if any (name, code object)
self._other_codes = {}
self._bank_assignments = {} # {(data name, sdfg): (type, id)}
self._stream_connections = {} # { name: [src, dst] }
# For generating kernel instrumentation code, is incremented every time
# a kernel is instrumented
self._kernel_instrumentation_index: int = 0
self._decouple_array_interfaces = False
# Register additional FPGA dispatchers
self._dispatcher.register_map_dispatcher([dtypes.ScheduleType.FPGA_Device], self)
self._dispatcher.register_state_dispatcher(self, predicate=is_fpga_kernel)
self._dispatcher.register_node_dispatcher(
self,
predicate=lambda sdfg, state, node: self._in_device_code and not (isinstance(
node, nodes.Tasklet) and node.language == dtypes.Language.SystemVerilog))
fpga_storage = [
dtypes.StorageType.FPGA_Global,
dtypes.StorageType.FPGA_Local,
dtypes.StorageType.FPGA_Registers,
dtypes.StorageType.FPGA_ShiftRegister,
]
self._dispatcher.register_array_dispatcher(fpga_storage, self)
# Register permitted copies
for storage_from in itertools.chain(fpga_storage, [dtypes.StorageType.Register]):
for storage_to in itertools.chain(fpga_storage, [dtypes.StorageType.Register]):
if (storage_from == dtypes.StorageType.Register and storage_to == dtypes.StorageType.Register):
# register this as copy dispatcher only if the destination is scheduled on FPGA
self._dispatcher.register_copy_dispatcher(storage_from, storage_to, dtypes.ScheduleType.FPGA_Device,
self)
else:
self._dispatcher.register_copy_dispatcher(storage_from, storage_to, None, self)
self._dispatcher.register_copy_dispatcher(dtypes.StorageType.FPGA_Global, dtypes.StorageType.CPU_Heap, None,
self)
self._dispatcher.register_copy_dispatcher(dtypes.StorageType.FPGA_Global, dtypes.StorageType.CPU_ThreadLocal,
None, self)
self._dispatcher.register_copy_dispatcher(dtypes.StorageType.CPU_Heap, dtypes.StorageType.FPGA_Global, None,
self)
self._dispatcher.register_copy_dispatcher(dtypes.StorageType.CPU_ThreadLocal, dtypes.StorageType.FPGA_Global,
None, self)
# Memory width converters (gearboxing) to generate globally
self.converters_to_generate = set()
@property
def has_initializer(self):
return True
@property
def has_finalizer(self):
return False
def preprocess(self, sdfg: SDFG) -> None:
# Right before finalizing code, write FPGA context to state structure
self._frame.statestruct.append('dace_fpga_context *fpga_context;')
# Call vendor-specific preprocessing
self._internal_preprocess(sdfg)
def _kernels_subgraphs(self, graph: Union[dace.sdfg.SDFGState, ScopeSubgraphView], dependencies: dict):
'''
Finds subgraphs of an SDFGState or ScopeSubgraphView that correspond to kernels.
This is done by looking to which kernel, each node belongs.
:param graph, the state/subgraph to consider
:param dependencies: a dictionary containing for each kernel ID, the IDs of the kernels on which it
depends on
:return a list of tuples (subgraph, kernel ID) topologically ordered according kernel dependencies.
'''
from dace.sdfg.scope import ScopeSubgraphView
if not isinstance(graph, (dace.sdfg.SDFGState, ScopeSubgraphView)):
raise TypeError("Expected SDFGState or ScopeSubgraphView, got: {}".format(type(graph).__name__))
subgraphs = collections.defaultdict(list) # {kernel_id: {nodes in subgraph}}
# Go over the nodes and populate the kernels subgraphs
for node in graph.nodes():
if isinstance(node, dace.sdfg.SDFGState):
continue
node_repr = utils.unique_node_repr(graph, node)
if node_repr in self._node_to_kernel:
subgraphs[self._node_to_kernel[node_repr]].append(node)
# add this node to the corresponding subgraph
if isinstance(node, dace.nodes.AccessNode):
# AccessNodes can be read from multiple kernels, so
# check all out edges
start_nodes = [e.dst for e in graph.out_edges(node)]
for n in start_nodes:
n_repr = utils.unique_node_repr(graph, n)
if n_repr in self._node_to_kernel:
subgraphs[self._node_to_kernel[n_repr]].append(node)
# Now stick each of the found components together in a ScopeSubgraphView and return
# them. Sort according kernel dependencies order.
# Build a dependency graph
import networkx as nx
kernels_graph = nx.DiGraph()
for k in subgraphs.keys():
# we could have no dependencies at all
kernels_graph.add_node(k)
if k in dependencies:
kernel_dependencies = dependencies[k]
for p in kernel_dependencies:
kernels_graph.add_edge(p, k)
subgraph_views = []
all_nodes = graph.nodes()
# Use topological sort to order kernels according to their dependencies
for kernel_id in nx.topological_sort(kernels_graph):
# Return the subgraph and the kernel id
subgraph_views.append((ScopeSubgraphView(graph, [n for n in all_nodes if n in subgraphs[kernel_id]],
None), kernel_id))
del kernels_graph
return subgraph_views
def generate_state(self, sdfg: dace.SDFG, state: dace.SDFGState, function_stream: CodeIOStream,
callsite_stream: CodeIOStream):
'''
Generate an FPGA State, possibly comprising multiple Kernels and/or PEs.
:param sdfg:
:param state:
:param function_stream: CPU code stream: contains global declarations (e.g. exported forward declaration of
device specific host functions).
:param callsite_stream: CPU code stream, contains the actual code (for creating global buffers, invoking
device host functions, and so on).
'''
state_id = sdfg.node_id(state)
if not self._in_device_code:
# Unroll maps directly in the SDFG so the subgraphs can be
# recognized as independent processing elements
top_level_unrolled = [
n for n in state.scope_children()[None]
if isinstance(n, dace.sdfg.nodes.MapEntry) and n.schedule == dtypes.ScheduleType.Unrolled
]
for map_entry in top_level_unrolled:
MapUnroll.apply_to(sdfg, map_entry=map_entry)
if top_level_unrolled:
disp = self._dispatcher.get_scope_dispatcher(dtypes.ScheduleType.Unrolled)
self._dispatcher._used_targets.add(disp)
kernels = [] # List of tuples (subgraph, kernel_id)
# Start a new state code generation: reset previous dependencies if any
self._kernels_dependencies.clear()
self._kernels_names_to_id.clear()
# Determine independent components: these are our starting kernels.
# Then, try to split these components further
subgraphs = dace.sdfg.concurrent_subgraphs(state)
start_kernel = 0
for sg in subgraphs:
# Determine kernels in state
num_kernels, dependencies = self.partition_kernels(sg, default_kernel=start_kernel)
if num_kernels > 1:
# For each kernel, derive the corresponding subgraphs
# and keep track of dependencies
kernels.extend(self._kernels_subgraphs(sg, dependencies))
self._kernels_dependencies.update(dependencies)
else:
kernels.append((sg, start_kernel))
start_kernel = start_kernel + num_kernels
# There is no need to generate additional kernels if the number of found kernels
# is equal to the number of connected components: use PEs instead (only one kernel)
if len(subgraphs) == len(kernels):
kernels = [(state, 0)]
self._num_kernels = len(kernels)
state_parameters = []
# As long as we generate kernels, generate the host file for invoking kernels,
# synchronize them, create transient buffers.
state_host_header_stream = CodeIOStream()
state_host_body_stream = CodeIOStream()
instrumentation_stream = CodeIOStream()
# Kernels are now sorted considering their dependencies
for kern, kern_id in kernels:
# Generate all kernels in this state
subgraphs = dace.sdfg.concurrent_subgraphs(kern)
shared_transients = set(sdfg.shared_transients())
# Allocate global memory transients, unless they are shared with
# other states
all_transients = set(kern.all_transients())
allocated = set(shared_transients)
for node in kern.data_nodes():
data = node.desc(sdfg)
if node.data not in all_transients or node.data in allocated:
continue
if (data.storage == dtypes.StorageType.FPGA_Global and not isinstance(data, dt.View)):
allocated.add(node.data)
self._dispatcher.dispatch_allocate(sdfg, kern, state_id, node, data, function_stream,
callsite_stream)
# Create a unique kernel name to avoid name clashes
# If this kernels comes from a Nested SDFG, use that name also
if sdfg.parent_nsdfg_node is not None:
kernel_name = f"{sdfg.parent_nsdfg_node.label}_{state.label}_{kern_id}_{sdfg.sdfg_id}"
else:
kernel_name = f"{state.label}_{kern_id}_{sdfg.sdfg_id}"
# Vitis HLS removes double underscores, which leads to a compilation
# error down the road due to kernel name mismatch. Remove them here
# to prevent this
kernel_name = re.sub(r"__+", "_", kernel_name)
self._kernels_names_to_id[kernel_name] = kern_id
# Generate kernel code
self.generate_kernel(sdfg, state, kernel_name, subgraphs, function_stream, callsite_stream,
state_host_header_stream, state_host_body_stream, instrumentation_stream,
state_parameters, kern_id)
kernel_args_call_host = []
kernel_args_opencl = []
# Include state in args
kernel_args_opencl.append(f"{self._global_sdfg.name}_t *__state")
kernel_args_call_host.append(f"__state")
for is_output, arg_name, arg, interface_id in state_parameters:
# Streams and Views are not passed as arguments
if (isinstance(arg, dt.Array)):
for bank, _ in iterate_multibank_interface_ids(arg, interface_id):
current_name = fpga_ptr(arg_name,
arg,
sdfg,
bank,
decouple_array_interfaces=self._decouple_array_interfaces)
kernel_args_call_host.append(arg.as_arg(False, name=current_name))
kernel_args_opencl.append(FPGACodeGen.make_opencl_parameter(current_name, arg))
elif (not isinstance(arg, dt.Stream) and not isinstance(arg, dt.View)):
kernel_args_call_host.append(arg.as_arg(False, name=arg_name))
kernel_args_opencl.append(FPGACodeGen.make_opencl_parameter(arg_name, arg))
kernel_args_call_host = dtypes.deduplicate(kernel_args_call_host)
kernel_args_opencl = dtypes.deduplicate(kernel_args_opencl)
## Generate the global function here
kernel_host_stream = CodeIOStream()
host_function_name = f"__dace_runstate_{sdfg.sdfg_id}_{state.name}_{state_id}"
function_stream.write("\n\nDACE_EXPORTED void {}({});\n\n".format(host_function_name,
", ".join(kernel_args_opencl)))
# add generated header information
kernel_host_stream.write(state_host_header_stream.getvalue())
kernel_host_stream.write(f"""\
DACE_EXPORTED void {host_function_name}({', '.join(kernel_args_opencl)}) {{""")
if state.instrument == dtypes.InstrumentationType.FPGA:
kernel_host_stream.write("""\
const unsigned long int _dace_fpga_begin_us = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
""")
kernel_host_stream.write(f"""\
hlslib::ocl::Program program = __state->fpga_context->Get().CurrentlyLoadedProgram();\
""")
# Create a vector to collect all events that are being generated to allow
# waiting before exiting this state
kernel_host_stream.write("std::vector<hlslib::ocl::Event> all_events;")
# Kernels invocations
kernel_host_stream.write(state_host_body_stream.getvalue())
# Wait for all events
kernel_host_stream.write("hlslib::ocl::WaitForEvents(all_events);")
# Instrumentation
if state.instrument == dtypes.InstrumentationType.FPGA:
kernel_host_stream.write("""
// Begin FPGA kernel runtime instrumentation
cl_ulong first_start = std::numeric_limits<unsigned long int>::max();
cl_ulong last_end = std::numeric_limits<unsigned long int>::min();""")
if Config.get_bool("instrumentation", "print_fpga_runtime"):
kernel_host_stream.write("""
std::cout << std::scientific;""")
kernel_host_stream.write(instrumentation_stream.getvalue())
kernel_host_stream.write(f"""\
const unsigned long int _dace_fpga_end_us = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count();
// Convert from nanoseconds (reported by OpenCL) to microseconds (expected by the profiler)
__state->report.add_completion("Full FPGA kernel runtime for {state.label}", "FPGA", 1e-3 * first_start, 1e-3 * last_end, {sdfg.sdfg_id}, {state_id}, -1);
__state->report.add_completion("Full FPGA state runtime for {state.label}", "FPGA", _dace_fpga_begin_us, _dace_fpga_end_us, {sdfg.sdfg_id}, {state_id}, -1);
""")
if Config.get_bool("instrumentation", "print_fpga_runtime"):
kernel_host_stream.write(f"""
const double elapsed = 1e-6 * (_dace_fpga_end_us - _dace_fpga_begin_us);
std::cout << "FPGA program \\"{state.label}\\" executed in " << elapsed << " seconds.\\n";\
""")
kernel_host_stream.write("}\n")
callsite_stream.write("{}({});".format(host_function_name, ", ".join(kernel_args_call_host)))
# Store code strings to be passed to compilation phase
self._host_codes.append((kernel_name, kernel_host_stream.getvalue()))
else: # self._in_device_code == True
to_allocate = dace.sdfg.local_transients(sdfg, state, None)
allocated = set()
subgraphs = dace.sdfg.concurrent_subgraphs(state)
for node in state.data_nodes():
data = node.desc(sdfg)
if node.data not in to_allocate or node.data in allocated:
continue
# Make sure there are no global transients in the nested state
# that are thus not gonna be allocated
if data.storage == dtypes.StorageType.FPGA_Global and not isinstance(data, dt.View):
raise cgx.CodegenError("Cannot allocate global memory from device code.")
allocated.add(node.data)
# Allocate transients
self._dispatcher.dispatch_allocate(sdfg, state, state_id, node, data, function_stream, callsite_stream)
self.generate_nested_state(sdfg, state, state.label, subgraphs, function_stream, callsite_stream)
@staticmethod
def shared_data(subgraphs):
"""Returns a set of data objects that are shared between two or more of
the specified subgraphs."""
shared = set()
if len(subgraphs) >= 2:
seen = {}
for sg in subgraphs:
for node in sg:
if isinstance(node, dace.sdfg.nodes.AccessNode):
if node.data in seen:
if seen[node.data] != sg:
shared.add(node.data)
else:
seen[node.data] = sg
return shared
def make_parameters(self, sdfg: SDFG, state: SDFGState, subgraphs):
"""
Determines the parameters that must be passed to the passed list of
subgraphs, as well as to the global kernel.
:return: A tuple with the following six entries:
- Data container parameters that should be passed from the
host to the FPGA kernel.
- Data containers that are local to the kernel, but must be
allocated by the host prior to invoking the kernel.
- A dictionary mapping from each processing element subgraph
to which parameters it needs (from the total list of
parameters).
- Parameters that must be passed to the kernel from the host,
but that do not exist before the CPU calls the kernel
wrapper.
- A dictionary of which memory interfaces should be assigned to
which memory banks.
- External streams that connect different FPGA kernels, and
must be defined during the compilation flow.
"""
# Get a set of data nodes that are shared across subgraphs
shared_data = self.shared_data(subgraphs)
# Transients that are accessed in other states in this SDFG
used_outside = sdfg.shared_transients()
# Build a dictionary of arrays to arbitrary data nodes referring to
# them, needed to trace memory bank assignments and to pass to the array
# allocator
data_to_node: Dict[str, dace.nodes.Node] = {}
global_data_parameters = set()
# Count appearances of each global array to create multiple interfaces
if self._decouple_array_interfaces:
global_interfaces: Dict[str, int] = collections.defaultdict(int)
else:
# For Xilinx, even if we are not decoupling array interfaces we need anyway to use different interfaces
# if we access the same container from different PEs
global_interfaces: Dict[str, (int, int)] = collections.defaultdict(lambda: (0, 0))
top_level_local_data = set()
subgraph_parameters = collections.OrderedDict() # {subgraph: [params]}
nested_global_transients = set()
# [(Is an output, dataname string, data object, interface)]
external_streams: Set[tuple[bool, str, dt.Data, dict[str, int]]] = set()
# Mapping from global arrays to memory interfaces
bank_assignments: Dict[str, Tuple[str, Union[int, subsets.Range]]] = {}
# Mapping from symbol to a unique parameter tuple
all_symbols = {k: (False, k, dt.Scalar(v), None) for k, v in sdfg.symbols.items() if k not in sdfg.constants}
# Add symbols from inter-state edges
global_symbols = copy.deepcopy(sdfg.symbols)
interstate_symbols = {}
for e in sdfg.dfs_edges(sdfg.start_state):
symbols = e.data.new_symbols(sdfg, global_symbols)
# Inferred symbols only take precedence if global symbol not defined or None
symbols = {
k: v if (k not in global_symbols or global_symbols[k] is None) else global_symbols[k]
for k, v in symbols.items()
}
interstate_symbols.update(symbols)
global_symbols.update(symbols)
all_symbols.update({
k: (False, k, dt.Scalar(v), None)
for k, v in interstate_symbols.items() if k not in all_symbols and k not in sdfg.constants
})
# Symbols that will be passed as parameters to the top-level kernel
global_symbols = set()
# Sorting by name, then by input/output, then by interface id
sort_func = lambda t: f"{t[1]}{t[0]}{t[3]}"
subgraph_counter = 0
for subgraph in subgraphs:
data_to_node.update(
{node.data: node
for node in subgraph.nodes() if isinstance(node, dace.sdfg.nodes.AccessNode)})
is_rtl_subgraph = any([isinstance(node, nodes.RTLTasklet) for node in subgraph.nodes()])
subsdfg = subgraph.parent
candidates = [] # type: List[Tuple[bool,str,Data]]
# [(is an output, dataname string, data object)]
array_to_banks_used_out: Dict[str, Set[int]] = {}
array_to_banks_used_in: Dict[str, Set[int]] = {}
for n in subgraph.source_nodes():
# Check if the node is connected to an RTL tasklet, in which
# case it should be an external stream
is_external = is_rtl_subgraph
is_output = True
if not is_external and self._num_kernels > 1:
if is_external_stream(n, subgraph):
is_external = True
is_output = False
if is_external:
external_streams |= {(is_output, e.data.data, subsdfg.arrays[e.data.data], None)
for e in state.out_edges(n)
if isinstance(subsdfg.arrays[e.data.data], dt.Stream)}
else:
candidates += [(False, e.data.data, subsdfg.arrays[e.data.data]) for e in state.in_edges(n)]
for n in subgraph.sink_nodes():
# Check if the node is connected to an RTL tasklet, in which
# case it should be an external stream
is_external = is_rtl_subgraph
is_output = False
if not is_external and self._num_kernels > 1:
if is_external_stream(n, subgraph):
is_external = True
is_output = True
if is_external:
external_streams |= {(is_output, e.data.data, subsdfg.arrays[e.data.data], None)
for e in state.in_edges(n)
if isinstance(subsdfg.arrays[e.data.data], dt.Stream)}
else:
candidates += [(True, e.data.data, subsdfg.arrays[e.data.data]) for e in state.out_edges(n)]
# Find other data nodes that are used internally
for n, scope in subgraph.all_nodes_recursive():
if isinstance(n, dace.sdfg.nodes.AccessNode):
# Add nodes if they are outer-level, or an inner-level transient
# (inner-level inputs/outputs are just connected to data in the outer layers,
# whereas transients can be independent).
# Views are not nested global transients
if scope == subgraph or n.desc(scope).transient:
desc = n.desc(scope)
if scope.out_degree(n) > 0:
candidates.append((False, n.data, desc))
if scope.in_degree(n) > 0:
candidates.append((True, n.data, desc))
if is_multibank_array_with_distributed_index(desc):
# Record all banks used by this subgraph to generate interfaces for them
# inputs and outputs seperate, because using a bank as an input doesn't mean
# we also need an output interface
current_banks_out = set()
current_banks_in = set()
for edge in scope.in_edges(n):
for bank in iterate_distributed_subset(desc, edge.data, True, sdfg):
current_banks_out.add(bank)
for edge in scope.out_edges(n):
for bank in iterate_distributed_subset(desc, edge.data, False, sdfg):
current_banks_in.add(bank)
if n.data in array_to_banks_used_in:
array_to_banks_used_in[n.data].update(current_banks_in)
else:
array_to_banks_used_in[n.data] = current_banks_in
if n.data in array_to_banks_used_out:
array_to_banks_used_out[n.data].update(current_banks_out)
else:
array_to_banks_used_out[n.data] = current_banks_out
if scope != subgraph:
if (isinstance(n.desc(scope), dt.Array)
and n.desc(scope).storage == dtypes.StorageType.FPGA_Global
and not isinstance(n.desc(scope), dt.View)):
nested_global_transients.add(n)
subgraph_parameters[subgraph] = set()
# For each subgraph, keep a listing of array to current interface ID
data_to_interface: Dict[str, int] = {}
# multibank data name -> is_output -> List of (bank, interface id)
# same as data_to_interface, but for HBM/DDR-arrays with multiple banks
multibank_data_to_interface: Dict[str, Dict[bool, List[Tuple[int, int]]]] = {}
# Differentiate global and local arrays. The former are allocated
# from the host and passed to the device code, while the latter are
# (statically) allocated on the device side.
for is_output, data_name, desc in candidates:
# Ignore views, as these never need to be explicitly passed
if isinstance(desc, dt.View):
continue
# Only distinguish between inputs and outputs for arrays
if not isinstance(desc, dt.Array):
is_output = None
# If this is a global array, assign the correct interface ID and
# memory interface (e.g., DDR or HBM bank)
if (isinstance(desc, dt.Array) and desc.storage == dtypes.StorageType.FPGA_Global):
if data_name in data_to_interface:
interface_id = data_to_interface[data_name]
elif data_name in multibank_data_to_interface and is_output in multibank_data_to_interface[
data_name]:
interface_id = multibank_data_to_interface[data_name][is_output]
else:
# Get and update global memory interface ID
if is_multibank_array_with_distributed_index(desc):
tmp_interface_ids = []
if is_output:
banks_looked_at = array_to_banks_used_out[data_name]
else:
banks_looked_at = array_to_banks_used_in[data_name]
for bank in banks_looked_at:
ptr_str = fpga_ptr(data_name,
desc,
sdfg,
bank,
decouple_array_interfaces=self._decouple_array_interfaces)
if self._decouple_array_interfaces:
tmp_interface_id = global_interfaces[ptr_str]
global_interfaces[ptr_str] += 1
else:
if ptr_str not in global_interfaces:
global_interfaces[ptr_str] = (0, subgraph_counter)
tmp_interface_id, last_used_in = global_interfaces[ptr_str]
if last_used_in != subgraph_counter:
# we accessed the same container from a different subgraph/PE: we need
# to use a different interface
tmp_interface_id += 1
global_interfaces[ptr_str] = (tmp_interface_id, subgraph_counter)
tmp_interface_ids.append((bank, tmp_interface_id))
interface_id = tuple(tmp_interface_ids)
if data_name not in multibank_data_to_interface:
multibank_data_to_interface[data_name] = {}
multibank_data_to_interface[data_name][is_output] = interface_id
else:
if self._decouple_array_interfaces:
interface_id = global_interfaces[data_name]
global_interfaces[data_name] += 1
else:
if data_name not in global_interfaces:
global_interfaces[data_name] = (0, subgraph_counter)
interface_id, last_used_in = global_interfaces[data_name]
if last_used_in != subgraph_counter:
# we accessed the same container from a different data subgraph/PE: we need
# to use a different interface
global_interfaces[data_name] = (interface_id + 1, subgraph_counter)
interface_id += 1
data_to_interface[data_name] = interface_id
# Collect the memory bank specification, if present, by
# traversing outwards to where the data container is
# actually allocated
inner_node = data_to_node[data_name]
trace = utils.trace_nested_access(inner_node, subgraph, sdfg)
bank = None
bank_type = None
for (trace_in, trace_out), _, _, trace_sdfg in trace:
trace_node = trace_in or trace_out
trace_name = trace_node.data
trace_desc = trace_node.desc(trace_sdfg)
if "bank" in trace_desc.location:
trace_type, trace_bank = parse_location_bank(trace_desc)
if (bank is not None and bank_type is not None
and (bank != trace_bank or bank_type != trace_type)):
raise cgx.CodegenError("Found inconsistent memory bank "
f"specifier for {trace_name}.")
bank = trace_bank
bank_type = trace_type
# Make sure the array has been allocated on this bank in the
# outermost scope
if bank_type is not None:
outer_node = trace[0][0][0] or trace[0][0][1]
outer_desc = outer_node.desc(trace[0][2])
okbank = False
if ("bank" in outer_desc.location):
trace_type, trace_bank = parse_location_bank(outer_desc)
okbank = (trace_type == bank_type and trace_bank == bank)
if not okbank:
raise cgx.CodegenError("Memory bank allocation must be present on "
f"outermost data descriptor {outer_node.data} "
"to be allocated correctly.")
bank_assignments[data_name] = (bank_type, bank)
else:
bank_assignments[data_name] = None
else:
interface_id = None
if (not desc.transient or desc.storage == dtypes.StorageType.FPGA_Global or data_name in used_outside):
# Add the data as a parameter to this PE
subgraph_parameters[subgraph].add((is_output, data_name, desc, interface_id))
# Global data is passed from outside the kernel
global_data_parameters.add((is_output, data_name, desc, interface_id))
elif data_name in shared_data:
# Add the data as a parameter to this PE
subgraph_parameters[subgraph].add((is_output, data_name, desc, interface_id))
# Must be allocated outside PEs and passed to them
top_level_local_data.add(data_name)
# Order by name
subgraph_parameters[subgraph] = list(sorted(subgraph_parameters[subgraph], key=sort_func))
# Append symbols used in this subgraph
for k in sorted(self._frame.free_symbols(subgraph)):
if k not in sdfg.constants:
param = all_symbols[k]
subgraph_parameters[subgraph].append(param)
global_symbols.add(param)
subgraph_counter += 1
# Order by name
global_data_parameters = list(sorted(global_data_parameters, key=sort_func))
global_data_parameters += sorted(global_symbols, key=sort_func)
external_streams = list(sorted(external_streams, key=sort_func))
nested_global_transients = list(sorted(nested_global_transients))
stream_names = {sname for _, sname, _, _ in external_streams}
top_level_local_data = [data_to_node[name] for name in sorted(top_level_local_data) if name not in stream_names]
return (global_data_parameters, top_level_local_data, subgraph_parameters, nested_global_transients,
bank_assignments, external_streams)
def generate_nested_state(self, sdfg, state, nest_name, subgraphs, function_stream, callsite_stream):
for sg in subgraphs:
self._dispatcher.dispatch_subgraph(sdfg,
sg,
sdfg.node_id(state),
function_stream,
callsite_stream,
skip_entry_node=False)
def generate_scope(self, sdfg, dfg_scope, state_id, function_stream, callsite_stream):
if not self._in_device_code:
# If we're not already generating kernel code we need to set up the
# kernel launch
subgraphs = [dfg_scope]
return self.generate_kernel(sdfg, sdfg.node(state_id),
dfg_scope.source_nodes()[0].map.label.replace(" ", "_"), subgraphs,
function_stream, callsite_stream)
self.generate_node(sdfg, dfg_scope, state_id, dfg_scope.source_nodes()[0], function_stream, callsite_stream)