Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 149 additions & 63 deletions src/core/mapping/base_mapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sstream>

#include "legion/legion_mapping.h"
#include "mappers/mapping_utilities.h"

#include "core/data/store.h"
#include "core/mapping/base_mapper.h"
Expand Down Expand Up @@ -502,6 +503,10 @@ void BaseMapper::map_task(const MapperContext ctx,
const MapTaskInput& input,
MapTaskOutput& output)
{
#ifdef DEBUG_LEGATE
logger.debug() << "Entering map_task for " << Utilities::to_string(runtime, ctx, task);
#endif

// Should never be mapping the top-level task here
assert(task.get_depth() > 0);

Expand Down Expand Up @@ -601,10 +606,51 @@ void BaseMapper::map_task(const MapperContext ctx,

output.chosen_instances.resize(task.regions.size());

// Map each field separately for each of the logical regions
std::vector<PhysicalInstance> needed_acquires;
std::map<PhysicalInstance, std::set<uint32_t>> instances_to_mappings;
for (uint32_t mapping_idx = 0; mapping_idx < mappings.size(); ++mapping_idx) {
bool can_fail = true;
std::map<PhysicalInstance, std::set<int32_t>> instance_to_mappings;
std::map<int32_t, PhysicalInstance> mapping_to_instance;
std::vector<bool> handled(mappings.size(), false);

// See case of failed instance creation below
auto tighten_write_reqs = [&]() {
for (int32_t mapping_idx = 0; mapping_idx < mappings.size(); ++mapping_idx) {
auto& mapping = mappings[mapping_idx];
PrivilegeMode priv = LEGION_NO_ACCESS;
#ifdef DEBUG_LEGATE
std::stringstream reqs_ss;
#endif
for (auto req_idx : mapping.requirement_indices()) {
const RegionRequirement& req = task.regions[req_idx];
if (!req.region.exists()) continue;
priv |= req.privilege;
#ifdef DEBUG_LEGATE
reqs_ss << " " << req_idx;
#endif
}
if (!(priv & LEGION_WRITE_PRIV) || mapping.policy.exact) continue;
#ifdef DEBUG_LEGATE
logger.debug() << "Task " << task.get_unique_id()
<< ": tightened mapping policy for reqs:" << reqs_ss.str();
#endif
mapping.policy.exact = true;
if (!handled[mapping_idx]) continue;
handled[mapping_idx] = false;
auto m2i_it = mapping_to_instance.find(mapping_idx);
if (m2i_it == mapping_to_instance.end()) continue;
PhysicalInstance inst = m2i_it->second;
mapping_to_instance.erase(m2i_it);
auto i2m_it = instance_to_mappings.find(inst);
i2m_it->second.erase(mapping_idx);
if (i2m_it->second.empty()) {
runtime->release_instance(ctx, inst);
instance_to_mappings.erase(i2m_it);
}
}
};

// Mapping each field separately for each of the logical regions
for (int32_t mapping_idx = 0; mapping_idx < mappings.size(); ++mapping_idx) {
if (handled[mapping_idx]) continue;
auto& mapping = mappings[mapping_idx];
auto req_indices = mapping.requirement_indices();

Expand All @@ -615,12 +661,14 @@ void BaseMapper::map_task(const MapperContext ctx,
if (target == StoreTarget::FBMEM) target = StoreTarget::ZCMEM;
#endif
output.future_locations.push_back(get_target_memory(task.target_proc, target));
handled[mapping_idx] = true;
continue;
} else if (mapping.for_unbound_stores()) {
}

if (mapping.for_unbound_stores()) {
for (auto req_idx : req_indices) {
output.output_targets[req_idx] = get_target_memory(task.target_proc, mapping.policy.target);
auto ndim = mapping.stores.front().dim();

// FIXME: Unbound stores can have more than one dimension later
std::vector<DimensionKind> dimension_ordering;
for (int32_t dim = ndim - 1; dim >= 0; --dim)
Expand All @@ -630,65 +678,79 @@ void BaseMapper::map_task(const MapperContext ctx,
output.output_constraints[req_idx].ordering_constraint =
OrderingConstraint(dimension_ordering, false);
}
handled[mapping_idx] = true;
continue;
}

std::vector<std::reference_wrapper<const RegionRequirement>> reqs;
#ifdef DEBUG_LEGATE
std::stringstream reqs_ss;
#endif
for (auto req_idx : req_indices) {
const auto& req = task.regions[req_idx];
if (!req.region.exists()) continue;
reqs.push_back(std::cref(req));
#ifdef DEBUG_LEGATE
reqs_ss << " " << req_idx;
#endif
}
if (reqs.empty()) {
handled[mapping_idx] = true;
continue;
}

if (reqs.empty()) continue;

// Get the reference to our valid instances in case we decide to use them
// Get an instance and acquire it if necessary. If the acquire fails then prune it from the
// mapper's data structures and retry, until we succeed or map_legate_store fails with an out of
// memory error.
PhysicalInstance result;
if (map_legate_store(ctx, task, mapping, reqs, task.target_proc, result))
needed_acquires.push_back(result);

for (auto req_idx : req_indices) output.chosen_instances[req_idx].push_back(result);
instances_to_mappings[result].insert(mapping_idx);
}

// Do an acquire on all the instances so we have our result
// Keep doing this until we succed or we get an out of memory error
while (!needed_acquires.empty() &&
!runtime->acquire_and_filter_instances(ctx, needed_acquires, true /*filter on acquire*/)) {
assert(!needed_acquires.empty());
// If we failed to acquire any of the instances we need to prune them
// out of the mapper's data structure so do that first
std::set<PhysicalInstance> failed_acquires;
filter_failed_acquires(ctx, needed_acquires, failed_acquires);

for (auto failed_acquire : failed_acquires) {
auto affected_mappings = instances_to_mappings[failed_acquire];
instances_to_mappings.erase(failed_acquire);

for (auto& mapping_idx : affected_mappings) {
auto& mapping = mappings[mapping_idx];
auto req_indices = mapping.requirement_indices();

std::vector<std::reference_wrapper<const RegionRequirement>> reqs;
for (auto req_idx : req_indices) reqs.push_back(std::cref(task.regions[req_idx]));

for (auto req_idx : req_indices) {
auto& instances = output.chosen_instances[req_idx];
uint32_t inst_idx = 0;
for (; inst_idx < instances.size(); ++inst_idx)
if (instances[inst_idx] == failed_acquire) break;
instances.erase(instances.begin() + inst_idx);
}

PhysicalInstance result;
if (map_legate_store(ctx, task, mapping, reqs, task.target_proc, result))
needed_acquires.push_back(result);

for (auto req_idx : req_indices) output.chosen_instances[req_idx].push_back(result);
instances_to_mappings[result].insert(mapping_idx);
while (map_legate_store(ctx, task, mapping, reqs, task.target_proc, result, can_fail)) {
if (result == PhysicalInstance()) break;
if (instance_to_mappings.count(result) > 0 || runtime->acquire_instance(ctx, result)) {
#ifdef DEBUG_LEGATE
logger.debug() << "Task " << task.get_unique_id() << ": acquired instance " << result
<< " for reqs:" << reqs_ss.str();
#endif
break;
}
#ifdef DEBUG_LEGATE
logger.debug() << "Task " << task.get_unique_id() << ": failed to acquire instance " << result
<< " for reqs:" << reqs_ss.str();
#endif
AutoLock lock(ctx, local_instances->manager_lock());
local_instances->erase(result);
}

// If instance creation failed we try mapping all stores again, but request tight instances for
// write requirements. The hope is that these write requirements cover the entire region (i.e.
// they use a complete partition), so the new tight instances will invalidate any pre-existing
// "bloated" instances for the same region, freeing up enough memory so that mapping can succeed
if (result == PhysicalInstance()) {
#ifdef DEBUG_LEGATE
logger.debug() << "Task " << task.get_unique_id()
<< ": failed mapping for reqs:" << reqs_ss.str();
#endif
assert(can_fail);
tighten_write_reqs();
mapping_idx = -1;
can_fail = false;
continue;
}

// Success; record the instance for this mapping.
#ifdef DEBUG_LEGATE
logger.debug() << "Task " << task.get_unique_id()
<< ": completed mapping for reqs:" << reqs_ss.str();
#endif
instance_to_mappings[result].insert(mapping_idx);
mapping_to_instance[mapping_idx] = result;
handled[mapping_idx] = true;
}

// Succeeded in mapping all stores, record it on map_task output.
for (const auto& m2i : mapping_to_instance)
for (auto req_idx : mappings[m2i.first].requirement_indices())
if (task.regions[req_idx].region.exists())
output.chosen_instances[req_idx].push_back(m2i.second);
}

void BaseMapper::map_replicate_task(const MapperContext ctx,
Expand Down Expand Up @@ -747,7 +809,8 @@ bool BaseMapper::map_legate_store(const MapperContext ctx,
const StoreMapping& mapping,
std::vector<std::reference_wrapper<const RegionRequirement>> reqs,
Processor target_proc,
PhysicalInstance& result)
PhysicalInstance& result,
bool can_fail)
{
const auto& policy = mapping.policy;
std::vector<LogicalRegion> regions;
Expand Down Expand Up @@ -776,12 +839,29 @@ bool BaseMapper::map_legate_store(const MapperContext ctx,
// If we're making a reduction instance, we should just make it now
if (redop != 0) {
layout_constraints.add_constraint(SpecializedConstraint(REDUCTION_FOLD_SPECIALIZE, redop));

if (!runtime->create_physical_instance(
ctx, target_memory, layout_constraints, regions, result, true /*acquire*/))
size_t footprint = 0;
if (runtime->create_physical_instance(ctx,
target_memory,
layout_constraints,
regions,
result,
true /*acquire*/,
LEGION_GC_DEFAULT_PRIORITY,
false /*tight bounds*/,
&footprint)) {
#ifdef DEBUG_LEGATE
Realm::LoggerMessage msg = logger.debug();
msg << "Operation " << mappable.get_unique_id() << ": created reduction instance " << result
<< " for";
for (LogicalRegion r : regions) msg << " " << r;
msg << " (size: " << footprint << " bytes, memory: " << target_memory << ")";
#endif
// We already did the acquire
return false;
}
if (!can_fail)
report_failed_mapping(mappable, mapping.requirement_index(), target_memory, redop);
// We already did the acquire
return false;
return true;
}

auto& fields = layout_constraints.field_constraint.field_set;
Expand All @@ -797,8 +877,8 @@ bool BaseMapper::map_legate_store(const MapperContext ctx,
local_instances->find_instance(
regions.front(), fields.front(), target_memory, result, policy)) {
#ifdef DEBUG_LEGATE
logger.debug() << get_mapper_name() << " found instance " << result << " for "
<< regions.front();
logger.debug() << "Operation " << mappable.get_unique_id() << ": reused cached instance "
<< result << " for " << regions.front();
#endif
runtime->enable_reentrant(ctx);
// Needs acquire to keep the runtime happy
Expand Down Expand Up @@ -861,8 +941,12 @@ bool BaseMapper::map_legate_store(const MapperContext ctx,
assert(result.exists());
#ifdef DEBUG_LEGATE
if (created) {
logger.debug() << get_mapper_name() << " created instance " << result << " for " << *group
<< " (size: " << footprint << " bytes, memory: " << target_memory << ")";
logger.debug() << "Operation " << mappable.get_unique_id() << ": created instance " << result
<< " for " << *group << " (size: " << footprint
<< " bytes, memory: " << target_memory << ")";
} else {
logger.debug() << "Operation " << mappable.get_unique_id() << ": found instance " << result
<< " for " << *group;
}
#endif
// Only save the result for future use if it is not an external instance
Expand All @@ -879,8 +963,10 @@ bool BaseMapper::map_legate_store(const MapperContext ctx,
runtime->enable_reentrant(ctx);

// If we make it here then we failed entirely
auto req_indices = mapping.requirement_indices();
for (auto req_idx : req_indices) report_failed_mapping(mappable, req_idx, target_memory, redop);
if (!can_fail) {
auto req_indices = mapping.requirement_indices();
for (auto req_idx : req_indices) report_failed_mapping(mappable, req_idx, target_memory, redop);
}
return true;
}

Expand Down
3 changes: 2 additions & 1 deletion src/core/mapping/base_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ class BaseMapper : public Legion::Mapping::Mapper, public LegateMapper {
const StoreMapping& mapping,
std::vector<std::reference_wrapper<const Legion::RegionRequirement>> reqs,
Legion::Processor target_proc,
Legion::Mapping::PhysicalInstance& result);
Legion::Mapping::PhysicalInstance& result,
bool can_fail);
bool map_raw_array(const Legion::Mapping::MapperContext ctx,
const Legion::Mappable& mappable,
unsigned index,
Expand Down