Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions legate/core/_legion/geometry.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ def raw(self) -> Any:
self.rect.rect_data[dim + i] = self._hi[i]
return self.rect

def to_domain(self) -> Domain:
return Domain(self.raw())


class Domain:
def __init__(self, domain: Any) -> None:
Expand Down
12 changes: 9 additions & 3 deletions legate/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from ._legion.util import Dispatchable
from .communicator import Communicator
from .legate import Library
from .operation import AutoTask, Copy, ManualTask
from .operation import AutoTask, Copy, Fill, ManualTask
from .runtime import Runtime
from .shape import Shape
from .store import RegionField, Store
Expand Down Expand Up @@ -125,7 +125,6 @@ def _create_scope(
)

self._libname = library.get_name()
self._unique_op_id = 0
self._provenance: list[Union[str, None]] = [None]

def destroy(self) -> None:
Expand Down Expand Up @@ -312,7 +311,14 @@ def create_auto_task(
def create_copy(self, mapper_id: int = 0) -> Copy:
from .operation import Copy

return Copy(self, mapper_id)
return Copy(self, mapper_id, self.get_unique_op_id())

def create_fill(
self, lhs: Store, value: Store, mapper_id: int = 0
) -> Fill:
from .operation import Fill

return Fill(self, lhs, value, mapper_id, self.get_unique_op_id())

def dispatch(self, op: Dispatchable[T]) -> T:
return self._runtime.dispatch(op)
Expand Down
155 changes: 116 additions & 39 deletions legate/core/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
ArgumentMap,
BufferBuilder,
Copy as SingleCopy,
Fill as SingleFill,
Future,
FutureMap,
IndexCopy,
IndexFill,
IndexTask,
Partition as LegionPartition,
Task as SingleTask,
Expand All @@ -44,22 +46,14 @@
from .utils import OrderedSet

if TYPE_CHECKING:
from . import (
FieldID,
FieldSpace,
IndexSpace,
OutputRegion,
Point,
Rect,
Region,
)
from . import FieldSpace, IndexSpace, OutputRegion, Point, Rect, Region
from ._legion.util import FieldListLike
from .context import Context
from .store import RegionField, Store
from .types import DTType


LegionTask = Union[IndexTask, SingleTask, IndexCopy, SingleCopy]
LegionOp = Union[IndexTask, SingleTask, IndexCopy, SingleCopy]


@unique
Expand Down Expand Up @@ -226,7 +220,7 @@ def __str__(self) -> str:
return f"RegionFieldArg({self._dim}, {self._req}, {self._field_id})"


LegionTaskMethod = Any
AddReqMethod = Any


def _index_copy_add_rw_dst_requirement(*args: Any, **kwargs: Any) -> None:
Expand All @@ -239,31 +233,31 @@ def _single_copy_add_rw_dst_requirement(*args: Any, **kwargs: Any) -> None:
SingleCopy.add_dst_requirement(*args, **kwargs)


_single_task_calls: dict[Permission, LegionTaskMethod] = {
_single_task_calls: dict[Permission, AddReqMethod] = {
Permission.NO_ACCESS: SingleTask.add_no_access_requirement,
Permission.READ: SingleTask.add_read_requirement,
Permission.WRITE: SingleTask.add_write_requirement,
Permission.READ_WRITE: SingleTask.add_read_write_requirement,
Permission.REDUCTION: SingleTask.add_reduction_requirement,
}

_index_task_calls: dict[Permission, LegionTaskMethod] = {
_index_task_calls: dict[Permission, AddReqMethod] = {
Permission.NO_ACCESS: IndexTask.add_no_access_requirement,
Permission.READ: IndexTask.add_read_requirement,
Permission.WRITE: IndexTask.add_write_requirement,
Permission.READ_WRITE: IndexTask.add_read_write_requirement,
Permission.REDUCTION: IndexTask.add_reduction_requirement,
}

_index_copy_calls: dict[Permission, LegionTaskMethod] = {
_index_copy_calls: dict[Permission, AddReqMethod] = {
Permission.READ: IndexCopy.add_src_requirement,
Permission.WRITE: IndexCopy.add_dst_requirement,
Permission.READ_WRITE: _index_copy_add_rw_dst_requirement,
Permission.SOURCE_INDIRECT: IndexCopy.add_src_indirect_requirement,
Permission.TARGET_INDIRECT: IndexCopy.add_dst_indirect_requirement,
}

_single_copy_calls: dict[Permission, LegionTaskMethod] = {
_single_copy_calls: dict[Permission, AddReqMethod] = {
Permission.READ: SingleCopy.add_src_requirement,
Permission.WRITE: SingleCopy.add_dst_requirement,
Permission.READ_WRITE: _single_copy_add_rw_dst_requirement,
Expand All @@ -283,10 +277,10 @@ def __init__(self, part: Optional[LegionPartition], proj: int) -> None:

def add(
self,
task: LegionTask,
task: LegionOp,
req: RegionReq,
fields: FieldListLike,
methods: dict[Permission, LegionTaskMethod],
methods: dict[Permission, AddReqMethod],
) -> None:
f = methods[req.permission]
parent = req.region
Expand All @@ -308,10 +302,10 @@ def add(

def add_single(
self,
task: LegionTask,
task: LegionOp,
req: RegionReq,
fields: FieldListLike,
methods: dict[Permission, LegionTaskMethod],
methods: dict[Permission, AddReqMethod],
) -> None:
f = methods[req.permission]
if req.permission != Permission.REDUCTION:
Expand Down Expand Up @@ -344,10 +338,10 @@ def __init__(self, part: Optional[LegionPartition], proj: int) -> None:

def add(
self,
task: LegionTask,
task: LegionOp,
req: RegionReq,
fields: FieldListLike,
methods: dict[Permission, LegionTaskMethod],
methods: dict[Permission, AddReqMethod],
) -> None:
f = methods[req.permission]
if req.permission != Permission.REDUCTION:
Expand All @@ -365,10 +359,10 @@ def add(

def add_single(
self,
task: LegionTask,
task: LegionOp,
req: RegionReq,
fields: FieldListLike,
methods: dict[Permission, LegionTaskMethod],
methods: dict[Permission, AddReqMethod],
) -> None:
f = methods[req.permission]
if req.permission != Permission.REDUCTION:
Expand Down Expand Up @@ -568,10 +562,8 @@ def insert(
self._fields[field_id] = proj_set
proj_set.insert(perm, proj_info)

def coalesce(
self, error_on_interference: bool
) -> dict[Any, list[Union[int, FieldID]]]:
coalesced: dict[Any, list[Union[int, FieldID]]] = {}
def coalesce(self, error_on_interference: bool) -> dict[Any, list[int]]:
coalesced: dict[Any, list[int]] = {}
for field_id, proj_set in self._fields.items():
proj_infos = proj_set.coalesce(error_on_interference)
for key in proj_infos:
Expand All @@ -585,15 +577,13 @@ def coalesce(

class RequirementAnalyzer:
def __init__(self, error_on_interference: bool = True) -> None:
self._field_sets: dict[Any, FieldSet] = {}
self._requirements: list[tuple[RegionReq, Any]] = []
self._requirement_map: dict[
tuple[RegionReq, Union[int, FieldID]], int
] = {}
self._field_sets: dict[Region, FieldSet] = {}
self._requirements: list[tuple[RegionReq, list[int]]] = []
self._requirement_map: dict[tuple[RegionReq, int], int] = {}
self._error_on_interference = error_on_interference

@property
def requirements(self) -> list[tuple[RegionReq, Any]]:
def requirements(self) -> list[tuple[RegionReq, list[int]]]:
return self._requirements

@property
Expand Down Expand Up @@ -635,11 +625,11 @@ def get_requirement_index(self, req: RegionReq, field_id: int) -> int:
class OutputAnalyzer:
def __init__(self) -> None:
self._groups: dict[Any, OrderedSet[tuple[int, Store]]] = {}
self._requirements: list[tuple[OutputReq, Any]] = []
self._requirements: list[tuple[OutputReq, list[int]]] = []
self._requirement_map: dict[tuple[OutputReq, int], int] = {}

@property
def requirements(self) -> list[tuple[OutputReq, Any]]:
def requirements(self) -> list[tuple[OutputReq, list[int]]]:
return self._requirements

@property
Expand Down Expand Up @@ -1078,8 +1068,10 @@ def build_copy(self, launch_domain: Rect) -> IndexCopy:
Permission.TARGET_INDIRECT,
):
assert len(fields) == 1
fields = fields[0]
req.proj.add(copy, req, fields, _index_copy_calls)
req.proj.add(copy, req, fields[0], _index_copy_calls)
else:
req.proj.add(copy, req, fields, _index_copy_calls)

if self._sharding_space is not None:
copy.set_sharding_space(self._sharding_space)
copy.set_possible_src_indirect_out_of_range(self._source_oor)
Expand All @@ -1100,8 +1092,10 @@ def build_single_copy(self) -> SingleCopy:
Permission.TARGET_INDIRECT,
):
assert len(fields) == 1
fields = fields[0]
req.proj.add_single(copy, req, fields, _single_copy_calls)
req.proj.add_single(copy, req, fields[0], _single_copy_calls)
else:
req.proj.add_single(copy, req, fields, _single_copy_calls)

if self._sharding_space is not None:
copy.set_sharding_space(self._sharding_space)
if self._point is not None:
Expand All @@ -1119,3 +1113,86 @@ def execute(
def execute_single(self) -> None:
copy = self.build_single_copy()
self._context.dispatch_single(copy)


class FillLauncher:
def __init__(
self,
context: Context,
lhs: Store,
lhs_proj: Proj,
value: Store,
mapper_id: int = 0,
tag: int = 0,
provenance: Optional[str] = None,
) -> None:
self._context = context
self._lhs = lhs
self._lhs_proj = lhs_proj
self._value = value
self._mapper_id = mapper_id
self._tag = tag
self._sharding_space: Union[IndexSpace, None] = None
self._point: Union[Point, None] = None
self._provenance = provenance

@property
def library_mapper_id(self) -> int:
return self._mapper_id

@property
def legion_mapper_id(self) -> int:
return self._context.get_mapper_id(self._mapper_id)

def set_sharding_space(self, space: IndexSpace) -> None:
self._sharding_space = space

def set_point(self, point: Point) -> None:
self._point = point

def build_fill(self, launch_domain: Rect) -> IndexFill:
if TYPE_CHECKING:
assert isinstance(self._lhs.storage, RegionField)
assert isinstance(self._value.storage, Future)
assert self._lhs_proj.part is not None
fill = IndexFill(
self._lhs_proj.part,
self._lhs_proj.proj,
self._lhs.storage.region.get_root(),
self._lhs.storage.field.field_id,
self._value.storage,
self.legion_mapper_id,
self._tag,
launch_domain.to_domain(),
self._provenance,
)
if self._sharding_space is not None:
fill.set_sharding_space(self._sharding_space)
return fill

def build_single_fill(self) -> SingleFill:
if TYPE_CHECKING:
assert isinstance(self._lhs.storage, RegionField)
assert isinstance(self._value.storage, Future)
fill = SingleFill(
self._lhs.storage.region,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that the Storage attached to a Store is always an exact match for the extent which that Store represents (i.e. any attached transforms that restrict the extent of the region have already been applied through partition operations on the attached Storage). I suspect this is true, but it's not trivially obvious from the code.

self._lhs.storage.region.get_root(),
self._lhs.storage.field.field_id,
self._value.storage,
self.legion_mapper_id,
self._tag,
self._provenance,
)
if self._sharding_space is not None:
fill.set_sharding_space(self._sharding_space)
if self._point is not None:
fill.set_point(self._point)
return fill

def execute(self, launch_domain: Rect) -> None:
fill = self.build_fill(launch_domain)
self._context.dispatch(fill)

def execute_single(self) -> None:
fill = self.build_single_fill()
self._context.dispatch_single(fill)
Loading