Skip to content

Commit

Permalink
Realize linear communication volume
Browse files Browse the repository at this point in the history
  • Loading branch information
jwbuurlage committed Aug 9, 2017
1 parent eaf7a00 commit b94ef24
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 68 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Expand Up @@ -7,7 +7,7 @@
[submodule "ext/bulk"]
path = ext/bulk
url = https://github.com/jwbuurlage/Bulk.git
branch = array_queue
branch = develop
[submodule "catch"]
path = ext/catch
url = https://github.com/philsquared/Catch
Expand Down
31 changes: 21 additions & 10 deletions distributed/overlaps.hpp
Expand Up @@ -15,14 +15,29 @@ struct overlap {
distributed::shadow region;
};

struct pod_shadow {
std::array<int, 2> min_pt;
std::array<int, 2> max_pt;
};

auto to_pod(distributed::shadow sh) {
return pod_shadow{math::vec_to_array<2_D, int>(sh.min_pt),
math::vec_to_array<2_D, int>(sh.max_pt)};
}

auto from_pod(pod_shadow sh) {
return distributed::shadow{math::array_to_vec<2_D, int>(sh.min_pt),
math::array_to_vec<2_D, int>(sh.max_pt)};
}

void communicate_overlaps(bulk::world& world,
projections<3_D, T>& local_proj_stack,
distributed::restricted_geometry<T>& local_geometry,
const std::vector<overlap>& overlaps) {
auto buffer = std::vector<T>();
auto& proj_data = local_proj_stack.mutable_data();

auto q = bulk::queue<T[], int, distributed::shadow>(world);
auto q = bulk::queue<T[], int, pod_shadow>(world);
for (auto ov : overlaps) {
auto offset = local_proj_stack.offset(ov.projection);
auto local_region = local_geometry.local_shadow(ov.projection);
Expand All @@ -40,12 +55,13 @@ void communicate_overlaps(bulk::world& world,
}
}

q(ov.target).send_many(buffer, ov.projection, ov.region);
q(ov.target).send(buffer, ov.projection, to_pod(ov.region));
}

world.sync();

for (auto && [ xs, proj, sh ] : q) {
for (auto && [ xs, proj, shpod ] : q) {
auto sh = from_pod(shpod);
auto local_region = local_geometry.local_shadow(proj);
auto w = distributed::shadow{sh.min_pt - local_region.min_pt,
sh.max_pt - local_region.min_pt};
Expand All @@ -66,14 +82,9 @@ std::vector<overlap>
compute_overlaps(bulk::world& world,
const distributed::restricted_geometry<T>& geometry) {

struct pod_shadow {
std::array<int, 2> min_pt;
std::array<int, 2> max_pt;
};

auto overlaps = std::vector<overlap>();

auto s = world.processor_id();
auto s = world.rank();
auto p = world.active_processors();

// STEP 1: communicate shadows for all projections
Expand All @@ -82,8 +93,8 @@ compute_overlaps(bulk::world& world,
bulk::coarray<pod_shadow>(world, p * geometry.projection_count());

for (int i = 0; i < geometry.projection_count(); ++i) {
auto sh = geometry.local_shadow(i);
for (int t = 0; t < p; ++t) {
auto sh = geometry.local_shadow(i);
shadows(t)[p * i + s] = {math::vec_to_array<2_D, int>(sh.min_pt),
math::vec_to_array<2_D, int>(sh.max_pt)};
}
Expand Down
135 changes: 115 additions & 20 deletions distributed/simple_distributed.cpp
@@ -1,3 +1,10 @@
/*
* FIXME:
* - Can we group sends together? this should greatly reduce data size
* with a constant factor.
*/

#include <cassert>
#include <sstream>

#include "bulk/backends/mpi/mpi.hpp"
Expand Down Expand Up @@ -40,44 +47,133 @@ auto calculate_local_volume(bulk::rectangular_partitioning<3, G>& partitioning,

struct shared_pixel {
int target;
int projection;
int local_line;
int remote_line;
};

std::pair<std::vector<shared_pixel>, std::vector<shared_pixel>>
compute_contributions(bulk::world& world,
const distributed::restricted_geometry<T>& geometry) {
world.sync();
(void)geometry;
auto s = world.rank();
auto p = world.active_processors();
auto& gg = geometry.global_geometry();

// block_size is number of local projections
auto block_size = ((geometry.projection_count() - 1) / p) + 1;
auto shadows = bulk::coarray<pod_shadow>(world, p * block_size);
auto offsets = bulk::coarray<int>(world, p * block_size);

// 1. send all `p` windows for all projections `i` to processor `i % p`
for (int i = 0; i < geometry.projection_count(); ++i) {
auto sh = geometry.local_shadow(i);
shadows(i % p)[(i / p) * p + s] = to_pod(sh);
offsets(i % p)[(i / p) * p + s] = geometry.offset(i);
}
world.sync();

// 2. for each projection, for each pixel store the contributors
auto global_shape = gg.projection_shape(0);
auto pixel_count = math::product<2_D, int>(global_shape);
auto pixels = std::vector<std::vector<int>>(pixel_count * block_size);
for (int b = 0; b < block_size; ++b) {
for (int t = 0; t < p; ++t) {
auto w = shadows[b * p + t];
for (int j = w.min_pt[1]; j <= w.max_pt[1]; ++j) {
for (int i = w.min_pt[0]; i <= w.max_pt[0]; ++i) {
if (j * global_shape[0] + i >= pixel_count) {
std::cout << b << " [" << w.min_pt[0] << ", "
<< w.max_pt[0] << "] -> " << i << " " << j
<< " / " << pixel_count << " = "
<< global_shape[0] << " x " << global_shape[1]
<< "\n";
assert(false);
}
pixels[b * pixel_count + j * global_shape[0] + i].push_back(
t);
}
}
}
}

// 3. decide who will be responsible for the pixel
// maybe (line_number % #contributors)
// 4. send to the owner the contributors, to the contributors the owner
auto local_idx = [](auto offset, auto lw, auto gw, auto idx) {
auto x = idx % gw[0] - lw.min_pt[0];
auto y = idx / gw[0] - lw.min_pt[1];
auto idx_in_proj = y * (lw.max_pt[0] - lw.min_pt[0] + 1) + x;
return offset + idx_in_proj;
};

auto my_contributions = bulk::queue<shared_pixel>(world);
auto my_responsibilities = bulk::queue<shared_pixel>(world);

for (int b = 0; b < block_size; ++b) {
for (int i = 0; i < pixel_count; ++i) {
auto pixel_idx = b * pixel_count + i;
if (pixels[pixel_idx].size() == 0) {
continue;
}
auto owner =
pixels[pixel_idx][pixel_idx % pixels[pixel_idx].size()];
// for contributors, we can compute the local index if we know the
// offset.. lets just send them... aaaaand got 'em..

auto owner_line =
local_idx(offsets[b * p + owner], shadows[b * p + owner],
gg.projection_shape(0), i);

for (auto j = 0u; j < pixels[pixel_idx].size(); ++j) {
auto t = pixels[pixel_idx][j];
if (t == owner) {
continue;
}

// 4. send to the owner the contributors, to the contributors
// the owner
auto contributor_line =
local_idx(offsets[b * p + t], shadows[b * p + t],
gg.projection_shape(0), i);
my_contributions(t).send({owner, contributor_line, owner_line});
my_responsibilities(owner).send(
{t, owner_line, contributor_line});
}
}
}

world.sync();

// 5. store contributors and owners in a vector
return {{}, {}};
auto contributions = std::vector<shared_pixel>(my_contributions.size());
auto responsibilities =
std::vector<shared_pixel>(my_responsibilities.size());
std::copy(my_contributions.begin(), my_contributions.end(),
contributions.begin());
std::copy(my_responsibilities.begin(), my_responsibilities.end(),
responsibilities.begin());

return {std::move(contributions), std::move(responsibilities)};
}

void communicate_contributions(bulk::world& world, projections<3_D, T>& projs,
const std::vector<shared_pixel>& contributions,
const std::vector<shared_pixel>& results) {
auto q = bulk::queue<int, int, T>(world);
auto q = bulk::queue<int, T>(world);
auto share = [&](const auto& xs) {
for (auto[target, projection, local, remote] : xs) {
q(target).send(projection, remote,
projs[projs.offset(projection) + local]);
for (auto [target, local, remote] : xs) {
q(target).send(remote, projs[local]);
}
world.sync();
};

share(contributions);
for (auto[projection, idx, value] : q) {
projs[projs.offset(projection) + idx] += value;
world.sync();
for (auto [idx, value] : q) {
projs[idx] += value;
}

share(results);
for (auto[projection, idx, value] : q) {
projs[projs.offset(projection) + idx] = value;
world.sync();
for (auto [idx, value] : q) {
projs[idx] = value;
}
}

Expand All @@ -99,8 +195,8 @@ void run(util::args options) {
// {k, k, k}, {2, 1, 4});

env.spawn(processors, [&](auto& world) {
auto s = world.processor_id();
auto vs = calculate_local_volume(partitioning, world.processor_id());
auto s = world.rank();
auto vs = calculate_local_volume(partitioning, world.rank());

// Make scene with s = 0, connect to this scene from other procs
util::ext_plotter<3_D, T> plotter("tcp://localhost:5555");
Expand Down Expand Up @@ -150,8 +246,7 @@ void run(util::args options) {
}
};

auto overlaps = compute_overlaps<T>(world, gs);
communicate_overlaps(world, p, gs, overlaps);
auto [go_forth, and_back] = compute_contributions(world, gs);

// communicate row and column sums
auto invert = [](auto x) { return (T)1.0 / x; };
Expand All @@ -160,7 +255,7 @@ void run(util::args options) {
};
auto r = projections<3_D, T>(gs);
fp(image<3_D, T>(vs, (T)1.0), gs, vs, r);
communicate_overlaps(world, r, gs, overlaps);
communicate_contributions(world, r, go_forth, and_back);
invert_all(r.mutable_data());
auto c = image<3_D, T>(vs);
bp(projections<3_D, T>(gs, (T)1.0), gs, vs, c);
Expand All @@ -175,7 +270,7 @@ void run(util::args options) {
for (int l = 0; l < gs.lines(); ++l) {
q[l] = r[l] * (p[l] - q[l]);
}
communicate_overlaps(world, q, gs, overlaps);
communicate_contributions(world, q, go_forth, and_back);
bp(q, gs, vs, z);
for (int i = 0; i < vs.cells(); ++i) {
x[i] += c[i] * z[i];
Expand Down
2 changes: 1 addition & 1 deletion ext/bulk
Submodule bulk updated 44 files
+4 −5 CHANGELOG.md
+1 −0 CMakeLists.txt
+14 −12 README.md
+3 −1 backends/mpi/memory_buffer.hpp
+208 −125 backends/mpi/world.hpp
+1 −1 backends/thread/examples/densematrix_sharedmemory.cpp
+5 −5 backends/thread/examples/hello_thread.cpp
+13 −1 backends/thread/world.hpp
+2 −2 benchmark/benchmark.cpp
+7 −6 docs/mkdocs.yml
+5 −5 docs/pages/api/world.md
+2 −2 docs/pages/api/world/next_rank.md
+2 −2 docs/pages/api/world/prev_rank.md
+2 −2 docs/pages/api/world/rank.md
+1 −1 docs/pages/backends/mpi.md
+4 −4 docs/pages/bsp.md
+42 −10 docs/pages/bulk_vs_bsplib.md
+5 −5 docs/pages/coarrays.md
+8 −8 docs/pages/environment_world.md
+12 −11 docs/pages/index.md
+8 −8 docs/pages/message_passing.md
+6 −27 docs/pages/tour.md
+5 −5 docs/pages/variables.md
+3 −3 examples/communication.cpp
+214 −0 examples/data/alice.txt
+1 −1 examples/dot_product.cpp
+5 −4 examples/hello.cpp
+2 −5 examples/messaging.cpp
+1 −1 examples/partitioning.cpp
+74 −0 examples/word_count.cpp
+1 −1 include/bulk/algorithm.hpp
+13 −1 include/bulk/array.hpp
+1 −1 include/bulk/communication.hpp
+45 −8 include/bulk/future.hpp
+21 −100 include/bulk/messages.hpp
+8 −6 include/bulk/partitioned_array.hpp
+45 −0 include/bulk/util/meta_helpers.hpp
+123 −0 include/bulk/util/serialize.hpp
+59 −20 include/bulk/variable.hpp
+46 −22 include/bulk/world.hpp
+7 −7 test/bulk_test_common.hpp
+84 −60 test/communication.cpp
+2 −2 test/initialization.cpp
+1 −1 test/partitioning.cpp
2 changes: 1 addition & 1 deletion ext/catch
Submodule catch updated from 8ebe94 to d5613f
2 changes: 1 addition & 1 deletion ext/cpptoml
Submodule cpptoml updated 1 files
+2 −0 include/cpptoml.h
2 changes: 1 addition & 1 deletion ext/cppzmq
2 changes: 1 addition & 1 deletion ext/fmt
Submodule fmt updated 4 files
+2 −0 README.rst
+30 −0 doc/api.rst
+14 −5 fmt/format.h
+1 −1 test/util-test.cc
2 changes: 1 addition & 1 deletion ext/libzmq
6 changes: 2 additions & 4 deletions include/tomos/distributed/recursive_bisectioning.hpp
Expand Up @@ -55,12 +55,13 @@ split_t find_split(const std::vector<math::line<D, T>>& lines,
std::size_t idx = 0;
for (auto line : lines) {
auto intersections = math::intersect_bounds<D, T>(line, bounds);
// FIXME: check if intersects, since line may start inside bounds
if (intersections) {
auto a = intersections.value().first;
auto b = intersections.value().second;
crossings.push_back({a, idx, math::sign<D, T>(b - a)});
crossings.push_back({b, idx, math::sign<D, T>(a - b)});
} else {
std::cout << "Line that should intersect does indeed not\n";
}
++idx;
}
Expand Down Expand Up @@ -190,11 +191,8 @@ partition_bisection(const tomo::geometry::base<D, T>& geometry,
assert(1 << depth == processors);

// containers for the current left and right lines
// TODO: needed here?
std::vector<math::line<D, T>> all_lines;
for (auto l : geometry) {
// FIXME actually; we need intersections here, not the truncation
// because it does a 'discrete intersection'
auto line = math::truncate_to_volume(l, object_volume);
if (line) {
all_lines.push_back(line.value());
Expand Down
18 changes: 16 additions & 2 deletions include/tomos/geometry.hpp
Expand Up @@ -190,8 +190,7 @@ class base {

/** Construct the geometry with a given number of lines. */
base(int projection_count, bool parallel = false)
: projection_count_(projection_count), parallel_(parallel) {
}
: projection_count_(projection_count), parallel_(parallel) {}

virtual ~base() = default;

Expand All @@ -217,12 +216,27 @@ class base {

virtual std::array<math::vec<D, T>, D - 1>
projection_delta(int i) const = 0;
int offset(int idx) const { return offsets_[idx]; }

protected:
std::vector<int> offsets_;

void compute_offsets_() {
offsets_.resize(projection_count());
offsets_[0] = 0;
std::iota(offsets_.begin() + 1, offsets_.end(), 0);
std::transform(
offsets_.begin() + 1, offsets_.end(), offsets_.begin() + 1,
[&](int i) { return math::reduce<D - 1>(projection_shape(i)); });
std::partial_sum(offsets_.begin() + 1, offsets_.end(),
offsets_.begin() + 1);
}

void compute_lines_() {
for (auto i = 0; i < projection_count_; ++i) {
line_count_ += math::reduce<D - 1>(this->projection_shape(i));
}
this->compute_offsets_();
}

int projection_count_ = 0;
Expand Down
3 changes: 2 additions & 1 deletion include/tomos/math.hpp
Expand Up @@ -322,7 +322,8 @@ intersect_bounds(line<D, T> l, std::array<vec2<int>, D> bounds) {
auto for_sure_far_enough = product<D, T>(bounds_sides);

auto result = aabb_intersection<D, T>(
l.origin, for_sure_far_enough * l.delta, bounds_sides, bounds_origin);
l.origin - for_sure_far_enough * l.delta, for_sure_far_enough * l.delta,
bounds_sides, bounds_origin);

if (result) {
return std::pair<vec<D, T>, vec<D, T>>{result.value().first,
Expand Down

0 comments on commit b94ef24

Please sign in to comment.