Skip to content

Commit

Permalink
Remove relation callbacks and replace with threaded relation processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
hollinger committed Sep 24, 2014
1 parent 5c86f4f commit 3f248b4
Show file tree
Hide file tree
Showing 18 changed files with 176 additions and 216 deletions.
11 changes: 7 additions & 4 deletions middle-pgsql.cpp
Expand Up @@ -732,10 +732,10 @@ void middle_pgsql_t::iterate_ways(middle_t::pending_processor& pf)
osmid_t id;
while(id_tracker::is_valid(id = ways_pending_tracker->pop_mark()))
{
pf.enqueue(id);
pf.enqueue_ways(id);
}
// in case we had higher ones than the middle
pf.enqueue(id);
pf.enqueue_ways(id);

size_t pending_count = pf.size();
fprintf(stderr, "\t%zu ways are pending\n", pending_count);
Expand Down Expand Up @@ -929,7 +929,7 @@ int middle_pgsql_t::relations_delete(osmid_t osm_id)
return 0;
}

void middle_pgsql_t::iterate_relations(middle_t::cb_func &callback)
void middle_pgsql_t::iterate_relations(pending_processor& pf)
{
// The flag we pass to indicate that the way in question might exist already in the database */
int exists = Append;
Expand Down Expand Up @@ -968,9 +968,12 @@ void middle_pgsql_t::iterate_relations(middle_t::cb_func &callback)
}

//send it to the backends
callback(id, exists);
pf.enqueue_relations(id);
}

//let the threads work on them
pf.process_relations();

time(&end);
fprintf(stderr, "\rProcess %i finished processing %i relations in %i sec\n", 0, count, (int)(end - start));

Expand Down
2 changes: 1 addition & 1 deletion middle-pgsql.hpp
Expand Up @@ -46,7 +46,7 @@ struct middle_pgsql_t : public slim_middle_t {
int relation_changed(osmid_t id);

void iterate_ways(middle_t::pending_processor& pf);
void iterate_relations(cb_func &cb);
void iterate_relations(pending_processor& pf);

size_t pending_count() const;

Expand Down
15 changes: 6 additions & 9 deletions middle-ram.cpp
Expand Up @@ -168,15 +168,10 @@ int middle_ram_t::nodes_get_list(struct osmNode *nodes, const osmid_t *ndids, in
return count;
}

void middle_ram_t::iterate_relations(middle_t::cb_func &callback)
void middle_ram_t::iterate_relations(pending_processor& pf)
{
int block, offset;

// to maintain backwards compatibility, we need to set this flag
// which fakes the previous behaviour of having deleted all the
// ways.
simulate_ways_deleted = true;

fprintf(stderr, "\n");
for(block=NUM_BLOCKS-1; block>=0; block--) {
if (!rels[block])
Expand All @@ -189,12 +184,14 @@ void middle_ram_t::iterate_relations(middle_t::cb_func &callback)
if (rel_out_count % 10 == 0)
fprintf(stderr, "\rWriting relation (%u)", rel_out_count);

callback(id, 0);
pf.enqueue_relations(id);
}
}
}
pf.enqueue_relations(id_tracker::max());

fprintf(stderr, "\rWriting relation (%u)\n", rel_out_count);
//let the threads process the relations
pf.process_relations();
}

size_t middle_ram_t::pending_count() const {
Expand All @@ -205,7 +202,7 @@ void middle_ram_t::iterate_ways(middle_t::pending_processor& pf)
{
//let the outputs enqueue everything they have the non slim middle
//has nothing of its own to enqueue as it doesnt have pending anything
pf.enqueue(id_tracker::max());
pf.enqueue_ways(id_tracker::max());

//let the threads process the ways
pf.process_ways();
Expand Down
2 changes: 1 addition & 1 deletion middle-ram.hpp
Expand Up @@ -45,7 +45,7 @@ struct middle_ram_t : public middle_t {
std::vector<osmid_t> relations_using_way(osmid_t way_id) const;

void iterate_ways(middle_t::pending_processor& pf);
void iterate_relations(cb_func &cb);
void iterate_relations(pending_processor& pf);

size_t pending_count() const;

Expand Down
3 changes: 0 additions & 3 deletions middle.cpp
Expand Up @@ -22,8 +22,5 @@ middle_t::~middle_t() {
slim_middle_t::~slim_middle_t() {
}

middle_t::cb_func::~cb_func() {
}

middle_t::pending_processor::~pending_processor() {
}
12 changes: 4 additions & 8 deletions middle.hpp
Expand Up @@ -46,22 +46,18 @@ struct middle_t : public middle_query_t {
virtual int ways_set(osmid_t id, osmid_t *nds, int nd_count, struct keyval *tags) = 0;
virtual int relations_set(osmid_t id, struct member *members, int member_count, struct keyval *tags) = 0;

struct cb_func {
virtual ~cb_func();
virtual int operator()(osmid_t id, int exists) = 0;
virtual void finish(int exists) = 0;
};

struct pending_processor {
virtual ~pending_processor();
virtual void enqueue(osmid_t id) = 0;
virtual void enqueue_ways(osmid_t id) = 0;
virtual void process_ways() = 0;
virtual void enqueue_relations(osmid_t id) = 0;
virtual void process_relations() = 0;
virtual int thread_count() = 0;
virtual int size() = 0;
};

virtual void iterate_ways(pending_processor& pf) = 0;
virtual void iterate_relations(cb_func &cb) = 0;
virtual void iterate_relations(pending_processor& pf) = 0;

virtual size_t pending_count() const = 0;

Expand Down
107 changes: 49 additions & 58 deletions osmdata.cpp
Expand Up @@ -152,30 +152,6 @@ void osmdata_t::start() {

namespace {

struct cb_func : public middle_t::cb_func {
cb_func() {}
void add(middle_t::cb_func *ptr) { m_ptrs.push_back(ptr); }
bool empty() const { return m_ptrs.empty(); }
virtual ~cb_func() {
BOOST_FOREACH(middle_t::cb_func *ptr, m_ptrs) {
delete ptr;
}
}
int operator()(osmid_t id, int exists) {
int status = 0;
BOOST_FOREACH(middle_t::cb_func *ptr, m_ptrs) {
status |= ptr->operator()(id, exists);
}
return status;
}
void finish(int exists) {
BOOST_FOREACH(middle_t::cb_func *ptr, m_ptrs) {
ptr->finish(exists);
}
}
std::vector<middle_t::cb_func*> m_ptrs;
};

//TODO: have the main thread using the main middle to query the middle for batches of ways (configurable number)
//and stuffing those into the work queue, so we have a single producer multi consumer threaded queue
//since the fetching from middle should be faster than the processing in each backend.
Expand All @@ -184,14 +160,22 @@ struct pending_threaded_processor : public middle_t::pending_processor {
typedef std::vector<boost::shared_ptr<output_t> > output_vec_t;
typedef std::pair<boost::shared_ptr<const middle_query_t>, output_vec_t> clone_t;

static void do_batch(output_vec_t const& outputs, pending_queue_t& queue, boost::atomic_size_t& ids_done, int append) {
static void do_batch_ways(output_vec_t const& outputs, pending_queue_t& queue, boost::atomic_size_t& ids_done, int append) {
pending_job_t job;
while (queue.pop(job)) {
outputs.at(job.second)->pending_way(job.first, append);
++ids_done;
}
}

static void do_batch_rels(output_vec_t const& outputs, pending_queue_t& queue, boost::atomic_size_t& ids_done, int append) {
pending_job_t job;
while (queue.pop(job)) {
outputs.at(job.second)->pending_relation(job.first, append);
++ids_done;
}
}

//starts up count threads and works on the queue
pending_threaded_processor(boost::shared_ptr<middle_query_t> mid, const output_vec_t& outs, size_t thread_count, size_t job_count, int append)
: outs(outs), queue(job_count), ids_queued(0), append(append) {
Expand All @@ -218,7 +202,7 @@ struct pending_threaded_processor : public middle_t::pending_processor {

~pending_threaded_processor() {}

void enqueue(osmid_t id) {
void enqueue_ways(osmid_t id) {
for(size_t i = 0; i < outs.size(); ++i) {
outs[i]->enqueue_ways(queue, id, i, ids_queued);
}
Expand All @@ -231,7 +215,7 @@ struct pending_threaded_processor : public middle_t::pending_processor {

//make the threads and start them
for (size_t i = 0; i < clones.size(); ++i) {
workers.create_thread(boost::bind(do_batch, boost::cref(clones[i].second), boost::ref(queue), boost::ref(ids_done), append));
workers.create_thread(boost::bind(do_batch_ways, boost::cref(clones[i].second), boost::ref(queue), boost::ref(ids_done), append));
}

//TODO: print out progress
Expand All @@ -254,13 +238,37 @@ struct pending_threaded_processor : public middle_t::pending_processor {
}
}

void enqueue_relations(osmid_t id) {
for(size_t i = 0; i < outs.size(); ++i) {
outs[i]->enqueue_relations(queue, id, i, ids_queued);
}
}

void process_relations() {
//after all processing
//reset the number we've done
ids_done = 0;

//make the threads and start them
for (size_t i = 0; i < clones.size(); ++i) {
workers.create_thread(boost::bind(do_batch_rels, boost::cref(clones[i].second), boost::ref(queue), boost::ref(ids_done), append));
}

//wait for them to really be done
workers.join_all();
ids_queued = 0;

//TODO: commit all the outputs (will finish the copies and commit the transactions)
//TODO: collapse the expire_tiles trees
//collect all the new rels that became pending from each
//output in each thread back to their respective main outputs
BOOST_FOREACH(const clone_t& clone, clones) {
//for each clone/original output
for(output_vec_t::const_iterator original_output = outs.begin(), clone_output = clone.second.begin();
original_output != outs.end() && clone_output != clone.second.end(); ++original_output, ++clone_output) {
//done copying ways for now
clone_output->get()->commit();
//merge the expire tree from this threads copy of output back
original_output->get()->merge_expire_trees(*clone_output);
}
}
}

int thread_count() {
Expand Down Expand Up @@ -309,37 +317,20 @@ void osmdata_t::stop() {
//threaded pending processing
pending_threaded_processor ptp(mid, outs, 1, pending_count, append);

/* Pending ways
* This stage takes ways which were processed earlier, but might be
* involved in a multipolygon relation. They could also be ways that
* were modified in diff processing.
*/
if (!outs.empty()) {
/* Pending ways
* This stage takes ways which were processed earlier, but might be
* involved in a multipolygon relation. They could also be ways that
* were modified in diff processing.
*/
mid->iterate_ways( ptp );
}

/* Pending relations
* This is like pending ways, except there aren't pending relations
* on import, only on update.
* TODO: Can we skip this on import?
*/
{
cb_func callback;
BOOST_FOREACH(boost::shared_ptr<output_t>& out, outs) {
middle_t::cb_func *rel_callback = out->relation_callback();
if (rel_callback != NULL) {
callback.add(rel_callback);
}
}
if (!callback.empty()) {
mid->iterate_relations( callback );
callback.finish(append);

mid->commit();
BOOST_FOREACH(boost::shared_ptr<output_t>& out, outs) {
out->commit();
}
}
/* Pending relations
* This is like pending ways, except there aren't pending relations
* on import, only on update.
* TODO: Can we skip this on import?
*/
mid->iterate_relations( ptp );
}

/* Clustering, index creation, and cleanup.
Expand Down
12 changes: 7 additions & 5 deletions output-gazetteer.cpp
Expand Up @@ -1061,18 +1061,20 @@ void output_gazetteer_t::commit()
{
}

middle_t::cb_func *output_gazetteer_t::relation_callback() {
/* Process any remaining ways and relations */
return NULL;
}

void output_gazetteer_t::enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) {
}

int output_gazetteer_t::pending_way(osmid_t id, int exists) {
return 0;
}

void output_gazetteer_t::enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) {
}

int output_gazetteer_t::pending_relation(osmid_t id, int exists) {
return 0;
}

void output_gazetteer_t::stop()
{
/* Stop any active copy */
Expand Down
4 changes: 3 additions & 1 deletion output-gazetteer.hpp
Expand Up @@ -16,13 +16,15 @@ class output_gazetteer_t : public output_t {
virtual boost::shared_ptr<output_t> clone(const middle_query_t* cloned_middle) const;

int start();
middle_t::cb_func *relation_callback();
void stop();
void commit();

void enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added);
int pending_way(osmid_t id, int exists);

void enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added);
int pending_relation(osmid_t id, int exists);

int node_add(osmid_t id, double lat, double lon, struct keyval *tags);
int way_add(osmid_t id, osmid_t *nodes, int node_count, struct keyval *tags);
int relation_add(osmid_t id, struct member *members, int member_count, struct keyval *tags);
Expand Down

0 comments on commit 3f248b4

Please sign in to comment.