From 3f248b4e1b2ea3d14837b274689656f57694848e Mon Sep 17 00:00:00 2001 From: Jordan Hollinger Date: Wed, 24 Sep 2014 17:35:33 -0400 Subject: [PATCH] Remove relation callbacks and replace with threaded relation processing. --- middle-pgsql.cpp | 11 ++-- middle-pgsql.hpp | 2 +- middle-ram.cpp | 15 +++--- middle-ram.hpp | 2 +- middle.cpp | 3 -- middle.hpp | 12 ++--- osmdata.cpp | 107 +++++++++++++++++--------------------- output-gazetteer.cpp | 12 +++-- output-gazetteer.hpp | 4 +- output-multi.cpp | 70 +++++++++++-------------- output-multi.hpp | 17 ++---- output-null.cpp | 11 ++-- output-null.hpp | 4 +- output-pgsql.cpp | 71 +++++++++++-------------- output-pgsql.hpp | 18 ++----- output.hpp | 4 +- tests/middle-tests.cpp | 23 +++++--- tests/test-parse-xml2.cpp | 6 ++- 18 files changed, 176 insertions(+), 216 deletions(-) diff --git a/middle-pgsql.cpp b/middle-pgsql.cpp index c45107b5e..8f22ec43f 100644 --- a/middle-pgsql.cpp +++ b/middle-pgsql.cpp @@ -732,10 +732,10 @@ void middle_pgsql_t::iterate_ways(middle_t::pending_processor& pf) osmid_t id; while(id_tracker::is_valid(id = ways_pending_tracker->pop_mark())) { - pf.enqueue(id); + pf.enqueue_ways(id); } // in case we had higher ones than the middle - pf.enqueue(id); + pf.enqueue_ways(id); size_t pending_count = pf.size(); fprintf(stderr, "\t%zu ways are pending\n", pending_count); @@ -929,7 +929,7 @@ int middle_pgsql_t::relations_delete(osmid_t osm_id) return 0; } -void middle_pgsql_t::iterate_relations(middle_t::cb_func &callback) +void middle_pgsql_t::iterate_relations(pending_processor& pf) { // The flag we pass to indicate that the way in question might exist already in the database */ int exists = Append; @@ -968,9 +968,12 @@ void middle_pgsql_t::iterate_relations(middle_t::cb_func &callback) } //send it to the backends - callback(id, exists); + pf.enqueue_relations(id); } + //let the threads work on them + pf.process_relations(); + time(&end); fprintf(stderr, "\rProcess %i finished processing %i relations in %i sec\n", 0, count, (int)(end - start)); diff --git a/middle-pgsql.hpp b/middle-pgsql.hpp index fdc2b727c..1fd7667ae 100644 --- a/middle-pgsql.hpp +++ b/middle-pgsql.hpp @@ -46,7 +46,7 @@ struct middle_pgsql_t : public slim_middle_t { int relation_changed(osmid_t id); void iterate_ways(middle_t::pending_processor& pf); - void iterate_relations(cb_func &cb); + void iterate_relations(pending_processor& pf); size_t pending_count() const; diff --git a/middle-ram.cpp b/middle-ram.cpp index 6eb179e7a..55c387ef8 100644 --- a/middle-ram.cpp +++ b/middle-ram.cpp @@ -168,15 +168,10 @@ int middle_ram_t::nodes_get_list(struct osmNode *nodes, const osmid_t *ndids, in return count; } -void middle_ram_t::iterate_relations(middle_t::cb_func &callback) +void middle_ram_t::iterate_relations(pending_processor& pf) { int block, offset; - // to maintain backwards compatibility, we need to set this flag - // which fakes the previous behaviour of having deleted all the - // ways. - simulate_ways_deleted = true; - fprintf(stderr, "\n"); for(block=NUM_BLOCKS-1; block>=0; block--) { if (!rels[block]) @@ -189,12 +184,14 @@ void middle_ram_t::iterate_relations(middle_t::cb_func &callback) if (rel_out_count % 10 == 0) fprintf(stderr, "\rWriting relation (%u)", rel_out_count); - callback(id, 0); + pf.enqueue_relations(id); } } } + pf.enqueue_relations(id_tracker::max()); - fprintf(stderr, "\rWriting relation (%u)\n", rel_out_count); + //let the threads process the relations + pf.process_relations(); } size_t middle_ram_t::pending_count() const { @@ -205,7 +202,7 @@ void middle_ram_t::iterate_ways(middle_t::pending_processor& pf) { //let the outputs enqueue everything they have the non slim middle //has nothing of its own to enqueue as it doesnt have pending anything - pf.enqueue(id_tracker::max()); + pf.enqueue_ways(id_tracker::max()); //let the threads process the ways pf.process_ways(); diff --git a/middle-ram.hpp b/middle-ram.hpp index 93f0928e5..2889cd3bd 100644 --- a/middle-ram.hpp +++ b/middle-ram.hpp @@ -45,7 +45,7 @@ struct middle_ram_t : public middle_t { std::vector relations_using_way(osmid_t way_id) const; void iterate_ways(middle_t::pending_processor& pf); - void iterate_relations(cb_func &cb); + void iterate_relations(pending_processor& pf); size_t pending_count() const; diff --git a/middle.cpp b/middle.cpp index 9d377f071..63cc0a348 100644 --- a/middle.cpp +++ b/middle.cpp @@ -22,8 +22,5 @@ middle_t::~middle_t() { slim_middle_t::~slim_middle_t() { } -middle_t::cb_func::~cb_func() { -} - middle_t::pending_processor::~pending_processor() { } diff --git a/middle.hpp b/middle.hpp index 77bef4d39..397358aa2 100644 --- a/middle.hpp +++ b/middle.hpp @@ -46,22 +46,18 @@ struct middle_t : public middle_query_t { virtual int ways_set(osmid_t id, osmid_t *nds, int nd_count, struct keyval *tags) = 0; virtual int relations_set(osmid_t id, struct member *members, int member_count, struct keyval *tags) = 0; - struct cb_func { - virtual ~cb_func(); - virtual int operator()(osmid_t id, int exists) = 0; - virtual void finish(int exists) = 0; - }; - struct pending_processor { virtual ~pending_processor(); - virtual void enqueue(osmid_t id) = 0; + virtual void enqueue_ways(osmid_t id) = 0; virtual void process_ways() = 0; + virtual void enqueue_relations(osmid_t id) = 0; + virtual void process_relations() = 0; virtual int thread_count() = 0; virtual int size() = 0; }; virtual void iterate_ways(pending_processor& pf) = 0; - virtual void iterate_relations(cb_func &cb) = 0; + virtual void iterate_relations(pending_processor& pf) = 0; virtual size_t pending_count() const = 0; diff --git a/osmdata.cpp b/osmdata.cpp index 123e8d888..debbd8926 100644 --- a/osmdata.cpp +++ b/osmdata.cpp @@ -152,30 +152,6 @@ void osmdata_t::start() { namespace { -struct cb_func : public middle_t::cb_func { - cb_func() {} - void add(middle_t::cb_func *ptr) { m_ptrs.push_back(ptr); } - bool empty() const { return m_ptrs.empty(); } - virtual ~cb_func() { - BOOST_FOREACH(middle_t::cb_func *ptr, m_ptrs) { - delete ptr; - } - } - int operator()(osmid_t id, int exists) { - int status = 0; - BOOST_FOREACH(middle_t::cb_func *ptr, m_ptrs) { - status |= ptr->operator()(id, exists); - } - return status; - } - void finish(int exists) { - BOOST_FOREACH(middle_t::cb_func *ptr, m_ptrs) { - ptr->finish(exists); - } - } - std::vector m_ptrs; -}; - //TODO: have the main thread using the main middle to query the middle for batches of ways (configurable number) //and stuffing those into the work queue, so we have a single producer multi consumer threaded queue //since the fetching from middle should be faster than the processing in each backend. @@ -184,7 +160,7 @@ struct pending_threaded_processor : public middle_t::pending_processor { typedef std::vector > output_vec_t; typedef std::pair, output_vec_t> clone_t; - static void do_batch(output_vec_t const& outputs, pending_queue_t& queue, boost::atomic_size_t& ids_done, int append) { + static void do_batch_ways(output_vec_t const& outputs, pending_queue_t& queue, boost::atomic_size_t& ids_done, int append) { pending_job_t job; while (queue.pop(job)) { outputs.at(job.second)->pending_way(job.first, append); @@ -192,6 +168,14 @@ struct pending_threaded_processor : public middle_t::pending_processor { } } + static void do_batch_rels(output_vec_t const& outputs, pending_queue_t& queue, boost::atomic_size_t& ids_done, int append) { + pending_job_t job; + while (queue.pop(job)) { + outputs.at(job.second)->pending_relation(job.first, append); + ++ids_done; + } + } + //starts up count threads and works on the queue pending_threaded_processor(boost::shared_ptr mid, const output_vec_t& outs, size_t thread_count, size_t job_count, int append) : outs(outs), queue(job_count), ids_queued(0), append(append) { @@ -218,7 +202,7 @@ struct pending_threaded_processor : public middle_t::pending_processor { ~pending_threaded_processor() {} - void enqueue(osmid_t id) { + void enqueue_ways(osmid_t id) { for(size_t i = 0; i < outs.size(); ++i) { outs[i]->enqueue_ways(queue, id, i, ids_queued); } @@ -231,7 +215,7 @@ struct pending_threaded_processor : public middle_t::pending_processor { //make the threads and start them for (size_t i = 0; i < clones.size(); ++i) { - workers.create_thread(boost::bind(do_batch, boost::cref(clones[i].second), boost::ref(queue), boost::ref(ids_done), append)); + workers.create_thread(boost::bind(do_batch_ways, boost::cref(clones[i].second), boost::ref(queue), boost::ref(ids_done), append)); } //TODO: print out progress @@ -254,13 +238,37 @@ struct pending_threaded_processor : public middle_t::pending_processor { } } + void enqueue_relations(osmid_t id) { + for(size_t i = 0; i < outs.size(); ++i) { + outs[i]->enqueue_relations(queue, id, i, ids_queued); + } + } void process_relations() { - //after all processing + //reset the number we've done + ids_done = 0; + + //make the threads and start them + for (size_t i = 0; i < clones.size(); ++i) { + workers.create_thread(boost::bind(do_batch_rels, boost::cref(clones[i].second), boost::ref(queue), boost::ref(ids_done), append)); + } + //wait for them to really be done + workers.join_all(); + ids_queued = 0; - //TODO: commit all the outputs (will finish the copies and commit the transactions) - //TODO: collapse the expire_tiles trees + //collect all the new rels that became pending from each + //output in each thread back to their respective main outputs + BOOST_FOREACH(const clone_t& clone, clones) { + //for each clone/original output + for(output_vec_t::const_iterator original_output = outs.begin(), clone_output = clone.second.begin(); + original_output != outs.end() && clone_output != clone.second.end(); ++original_output, ++clone_output) { + //done copying ways for now + clone_output->get()->commit(); + //merge the expire tree from this threads copy of output back + original_output->get()->merge_expire_trees(*clone_output); + } + } } int thread_count() { @@ -309,37 +317,20 @@ void osmdata_t::stop() { //threaded pending processing pending_threaded_processor ptp(mid, outs, 1, pending_count, append); - /* Pending ways - * This stage takes ways which were processed earlier, but might be - * involved in a multipolygon relation. They could also be ways that - * were modified in diff processing. - */ if (!outs.empty()) { + /* Pending ways + * This stage takes ways which were processed earlier, but might be + * involved in a multipolygon relation. They could also be ways that + * were modified in diff processing. + */ mid->iterate_ways( ptp ); - } - /* Pending relations - * This is like pending ways, except there aren't pending relations - * on import, only on update. - * TODO: Can we skip this on import? - */ - { - cb_func callback; - BOOST_FOREACH(boost::shared_ptr& out, outs) { - middle_t::cb_func *rel_callback = out->relation_callback(); - if (rel_callback != NULL) { - callback.add(rel_callback); - } - } - if (!callback.empty()) { - mid->iterate_relations( callback ); - callback.finish(append); - - mid->commit(); - BOOST_FOREACH(boost::shared_ptr& out, outs) { - out->commit(); - } - } + /* Pending relations + * This is like pending ways, except there aren't pending relations + * on import, only on update. + * TODO: Can we skip this on import? + */ + mid->iterate_relations( ptp ); } /* Clustering, index creation, and cleanup. diff --git a/output-gazetteer.cpp b/output-gazetteer.cpp index d8c434a36..3ed41a4b1 100644 --- a/output-gazetteer.cpp +++ b/output-gazetteer.cpp @@ -1061,11 +1061,6 @@ void output_gazetteer_t::commit() { } -middle_t::cb_func *output_gazetteer_t::relation_callback() { - /* Process any remaining ways and relations */ - return NULL; -} - void output_gazetteer_t::enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) { } @@ -1073,6 +1068,13 @@ int output_gazetteer_t::pending_way(osmid_t id, int exists) { return 0; } +void output_gazetteer_t::enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) { +} + +int output_gazetteer_t::pending_relation(osmid_t id, int exists) { + return 0; +} + void output_gazetteer_t::stop() { /* Stop any active copy */ diff --git a/output-gazetteer.hpp b/output-gazetteer.hpp index 1d8517d0e..d22f93cb2 100644 --- a/output-gazetteer.hpp +++ b/output-gazetteer.hpp @@ -16,13 +16,15 @@ class output_gazetteer_t : public output_t { virtual boost::shared_ptr clone(const middle_query_t* cloned_middle) const; int start(); - middle_t::cb_func *relation_callback(); void stop(); void commit(); void enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added); int pending_way(osmid_t id, int exists); + void enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added); + int pending_relation(osmid_t id, int exists); + int node_add(osmid_t id, double lat, double lon, struct keyval *tags); int way_add(osmid_t id, osmid_t *nodes, int node_count, struct keyval *tags); int relation_add(osmid_t id, struct member *members, int member_count, struct keyval *tags); diff --git a/output-multi.cpp b/output-multi.cpp index 6b07811b1..ce99c1838 100644 --- a/output-multi.cpp +++ b/output-multi.cpp @@ -57,16 +57,6 @@ int output_multi_t::start() { return 0; } -middle_t::cb_func *output_multi_t::relation_callback() { - /* Processing any remaing to be processed relations */ - /* During this stage output tables also need to stay out of - * extended transactions, as the delete_way_from_output, called - * from process_relation, can deadlock if using multi-processing. - */ - rel_cb_func *rel_callback = new rel_cb_func(this); - return rel_callback; -} - size_t output_multi_t::pending_count() const { return ways_pending_tracker->size() + rels_pending_tracker->size(); } @@ -122,50 +112,50 @@ int output_multi_t::pending_way(osmid_t id, int exists) { return ret; } -output_multi_t::rel_cb_func::rel_cb_func(output_multi_t *ptr) - : m_ptr(ptr), m_sql(), - m_next_internal_id(m_ptr->rels_pending_tracker->pop_mark()) { -} +void output_multi_t::enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) { + int ret = 0; + + //make sure we get the one passed in + job_queue.push(pending_job_t(id, output_id)); + added++; + + //grab the first one or bail if its not valid + osmid_t popped = rels_pending_tracker->pop_mark(); + if(!id_tracker::is_valid(popped)) + return; + + //get all the ones up to the id that was passed in + while (popped < id) { + job_queue.push(pending_job_t(popped, output_id)); + added++; + popped = rels_pending_tracker->pop_mark(); + } -output_multi_t::rel_cb_func::~rel_cb_func() {} + //make sure to get this one as well and move to the next + if(popped == id) { + popped = rels_pending_tracker->pop_mark(); + } + job_queue.push(pending_job_t(popped, output_id)); + added++; +} -int output_multi_t::rel_cb_func::do_single(osmid_t id, int exists) { +int output_multi_t::pending_relation(osmid_t id, int exists) { keyval tags_int; member *members_int; int count_int; int ret = 0; + initList(&tags_int); - if (!m_ptr->m_mid->relations_get(id, &members_int, &count_int, &tags_int)) { - ret = m_ptr->process_relation(id, members_int, count_int, &tags_int, exists); + // Try to fetch the relation from the DB + if (!m_mid->relations_get(id, &members_int, &count_int, &tags_int)) { + ret = process_relation(id, members_int, count_int, &tags_int, exists); free(members_int); } resetList(&tags_int); - return ret; -} - -int output_multi_t::rel_cb_func::operator()(osmid_t id, int exists) { - int ret = 0; - - //loop through the pending rels up to id - while (m_next_internal_id < id) { - ret = do_single(m_next_internal_id, exists) + ret > 0 ? 1 : 0; - m_next_internal_id = m_ptr->rels_pending_tracker->pop_mark(); - } - - //make sure to get this one as well and move to the next - ret = do_single(id, exists) + ret > 0 ? 1 : 0; - if(m_next_internal_id == id) { - m_next_internal_id = m_ptr->rels_pending_tracker->pop_mark(); - } - //non zero is bad return ret; } -void output_multi_t::rel_cb_func::finish(int exists) { - operator()(std::numeric_limits::max(), exists); -} - void output_multi_t::stop() { m_table->stop(); m_expire->output_and_destroy(); diff --git a/output-multi.hpp b/output-multi.hpp index fe5844cbe..0e1af41b1 100644 --- a/output-multi.hpp +++ b/output-multi.hpp @@ -31,13 +31,15 @@ class output_multi_t : public output_t { virtual boost::shared_ptr clone(const middle_query_t* cloned_middle) const; int start(); - middle_t::cb_func *relation_callback(); void stop(); void commit(); void enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added); int pending_way(osmid_t id, int exists); + void enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added); + int pending_relation(osmid_t id, int exists); + int node_add(osmid_t id, double lat, double lon, struct keyval *tags); int way_add(osmid_t id, osmid_t *nodes, int node_count, struct keyval *tags); int relation_add(osmid_t id, struct member *members, int member_count, struct keyval *tags); @@ -66,19 +68,6 @@ class output_multi_t : public output_t { int process_relation(osmid_t id, const member *members, int member_count, struct keyval *tags, bool exists); void copy_to_table(osmid_t id, const char *wkt, struct keyval *tags); - struct rel_cb_func : public middle_t::cb_func { - output_multi_t *m_ptr; - buffer m_sql; - osmid_t m_next_internal_id; - rel_cb_func(output_multi_t *ptr); - virtual ~rel_cb_func(); - int operator()(osmid_t id, int exists); - int do_single(osmid_t id, int exists); - void finish(int exists); - }; - - friend struct rel_cb_func; - boost::scoped_ptr m_tagtransform; boost::scoped_ptr m_export_list; boost::shared_ptr m_processor; diff --git a/output-null.cpp b/output-null.cpp index eeedad019..8aa117e02 100644 --- a/output-null.cpp +++ b/output-null.cpp @@ -20,10 +20,6 @@ int output_null_t::start() { return 0; } -middle_t::cb_func *output_null_t::relation_callback() { - return NULL; -} - void output_null_t::stop() { } @@ -37,6 +33,13 @@ int output_null_t::pending_way(osmid_t id, int exists) { return 0; } +void output_null_t::enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) { +} + +int output_null_t::pending_relation(osmid_t id, int exists) { + return 0; +} + int output_null_t::node_add(osmid_t a UNUSED, double b UNUSED, double c UNUSED, struct keyval *k UNUSED) { return 0; } diff --git a/output-null.hpp b/output-null.hpp index 88e51c4fd..4e329814d 100644 --- a/output-null.hpp +++ b/output-null.hpp @@ -15,7 +15,6 @@ class output_null_t : public output_t { virtual boost::shared_ptr clone(const middle_query_t* cloned_middle) const; int start(); - middle_t::cb_func *relation_callback(); void stop(); void commit(); void cleanup(void); @@ -23,6 +22,9 @@ class output_null_t : public output_t { void enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added); int pending_way(osmid_t id, int exists); + void enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added); + int pending_relation(osmid_t id, int exists); + int node_add(osmid_t id, double lat, double lon, struct keyval *tags); int way_add(osmid_t id, osmid_t *nodes, int node_count, struct keyval *tags); int relation_add(osmid_t id, struct member *members, int member_count, struct keyval *tags); diff --git a/output-pgsql.cpp b/output-pgsql.cpp index 849877980..8f0960482 100644 --- a/output-pgsql.cpp +++ b/output-pgsql.cpp @@ -258,17 +258,6 @@ extern "C" void *pthread_output_pgsql_stop_one(void *arg) { }; } // anonymous namespace -middle_t::cb_func *output_pgsql_t::relation_callback() -{ - /* Processing any remaing to be processed relations */ - /* During this stage output tables also need to stay out of - * extended transactions, as the delete_way_from_output, called - * from process_relation, can deadlock if using multi-processing. - */ - rel_cb_func *rel_callback = new rel_cb_func(this); - return rel_callback; -} - void output_pgsql_t::enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) { int ret = 0; @@ -321,50 +310,50 @@ int output_pgsql_t::pending_way(osmid_t id, int exists) { return ret; } -output_pgsql_t::rel_cb_func::rel_cb_func(output_pgsql_t *ptr) - : m_ptr(ptr), m_sql(), - m_next_internal_id(m_ptr->rels_pending_tracker->pop_mark()) { -} +void output_pgsql_t::enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) { + int ret = 0; -output_pgsql_t::rel_cb_func::~rel_cb_func() {} + //make sure we get the one passed in + job_queue.push(pending_job_t(id, output_id)); + added++; -int output_pgsql_t::rel_cb_func::do_single(osmid_t id, int exists) { + //grab the first one or bail if its not valid + osmid_t popped = rels_pending_tracker->pop_mark(); + if(!id_tracker::is_valid(popped)) + return; + + //get all the ones up to the id that was passed in + while (popped < id) { + job_queue.push(pending_job_t(popped, output_id)); + added++; + popped = rels_pending_tracker->pop_mark(); + } + + //make sure to get this one as well and move to the next + if(popped == id) { + popped = rels_pending_tracker->pop_mark(); + } + job_queue.push(pending_job_t(popped, output_id)); + added++; +} + +int output_pgsql_t::pending_relation(osmid_t id, int exists) { keyval tags_int; member *members_int; int count_int; int ret = 0; + initList(&tags_int); - if (!m_ptr->m_mid->relations_get(id, &members_int, &count_int, &tags_int)) { - ret = m_ptr->pgsql_process_relation(id, members_int, count_int, &tags_int, exists); + // Try to fetch the relation from the DB + if (!m_mid->relations_get(id, &members_int, &count_int, &tags_int)) { + ret = pgsql_process_relation(id, members_int, count_int, &tags_int, exists); free(members_int); } resetList(&tags_int); - return ret; -} - -int output_pgsql_t::rel_cb_func::operator()(osmid_t id, int exists) { - int ret = 0; - - //loop through the pending rels up to id - while (m_next_internal_id < id) { - ret = do_single(m_next_internal_id, exists) + ret > 0 ? 1 : 0; - m_next_internal_id = m_ptr->rels_pending_tracker->pop_mark(); - } - - //make sure to get this one as well and move to the next - ret = do_single(id, exists) + ret > 0 ? 1 : 0; - if(m_next_internal_id == id) { - m_next_internal_id = m_ptr->rels_pending_tracker->pop_mark(); - } - //non zero is bad return ret; } -void output_pgsql_t::rel_cb_func::finish(int exists) { - operator()(std::numeric_limits::max(), exists); -} - void output_pgsql_t::commit() { for (int i=0; i clone(const middle_query_t* cloned_middle) const; int start(); - middle_t::cb_func *relation_callback(); void stop(); void commit(); void enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added); int pending_way(osmid_t id, int exists); - int pending_way_count(); + + void enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added); + int pending_relation(osmid_t id, int exists); int node_add(osmid_t id, double lat, double lon, struct keyval *tags); int way_add(osmid_t id, osmid_t *nodes, int node_count, struct keyval *tags); @@ -59,19 +60,6 @@ class output_pgsql_t : public output_t { virtual boost::shared_ptr get_expire_tree(); protected: - - struct rel_cb_func : public middle_t::cb_func { - output_pgsql_t *m_ptr; - buffer m_sql; - osmid_t m_next_internal_id; - rel_cb_func(output_pgsql_t *ptr); - virtual ~rel_cb_func(); - int operator()(osmid_t id, int exists); - int do_single(osmid_t id, int exists); - void finish(int exists); - }; - - friend struct rel_cb_func; int pgsql_out_node(osmid_t id, struct keyval *tags, double node_lat, double node_lon); int pgsql_out_way(osmid_t id, struct keyval *tags, const struct osmNode *nodes, int count, int exists); diff --git a/output.hpp b/output.hpp index 660a117f2..251bb6061 100644 --- a/output.hpp +++ b/output.hpp @@ -30,13 +30,15 @@ class output_t : public boost::noncopyable { virtual boost::shared_ptr clone(const middle_query_t* cloned_middle) const = 0; virtual int start() = 0; - virtual middle_t::cb_func *relation_callback() = 0; virtual void stop() = 0; virtual void commit() = 0; virtual void enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) = 0; virtual int pending_way(osmid_t id, int exists) = 0; + virtual void enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) = 0; + virtual int pending_relation(osmid_t id, int exists) = 0; + virtual int node_add(osmid_t id, double lat, double lon, struct keyval *tags) = 0; virtual int way_add(osmid_t id, osmid_t *nodes, int node_count, struct keyval *tags) = 0; virtual int relation_add(osmid_t id, struct member *members, int member_count, struct keyval *tags) = 0; diff --git a/tests/middle-tests.cpp b/tests/middle-tests.cpp index d10dcb266..4f9dbad67 100644 --- a/tests/middle-tests.cpp +++ b/tests/middle-tests.cpp @@ -50,22 +50,29 @@ int test_node_set(middle_t *mid) return 0; } -struct way_processor : public middle_t::pending_processor { - way_processor(): pending_ways() {} - virtual ~way_processor() {} - virtual void enqueue(osmid_t id) { +struct test_pending_processor : public middle_t::pending_processor { + test_pending_processor(): pending_ways(), pending_rels() {} + virtual ~test_pending_processor() {} + virtual void enqueue_ways(osmid_t id) { pending_ways.push_back(id); } virtual void process_ways() { pending_ways.clear(); } + virtual void enqueue_relations(osmid_t id) { + pending_rels.push_back(id); + } + virtual void process_relations() { + pending_rels.clear(); + } virtual int thread_count() { return 0; } virtual int size() { - return 0; + return pending_ways.size() + pending_rels.size(); } std::list pending_ways; + std::list pending_rels; }; int test_way_set(middle_t *mid) @@ -125,8 +132,8 @@ int test_way_set(middle_t *mid) } // the way we just inserted should not be pending - way_processor way_cb; - mid->iterate_ways(way_cb); + test_pending_processor tpp; + mid->iterate_ways(tpp); if (mid->pending_count() != 0) { std::cerr << "ERROR: Was expecting no pending ways, but got " << mid->pending_count() << " from middle.\n"; @@ -144,7 +151,7 @@ int test_way_set(middle_t *mid) // pending, so any change must be due to the node changing. status = slim->node_changed(nds[0]); if (status != 0) { std::cerr << "ERROR: Unable to reset node.\n"; return 1; } - slim->iterate_ways(way_cb); + slim->iterate_ways(tpp); if (slim->pending_count() != 1) { std::cerr << "ERROR: Was expecting a single pending way from node update, but got " << slim->pending_count() << " from middle.\n"; diff --git a/tests/test-parse-xml2.cpp b/tests/test-parse-xml2.cpp index 8949f7b66..0387abd89 100644 --- a/tests/test-parse-xml2.cpp +++ b/tests/test-parse-xml2.cpp @@ -40,7 +40,7 @@ struct test_middle_t : public middle_t { int relations_get(osmid_t id, struct member **members, int *member_count, struct keyval *tags) const { return 0; } void iterate_ways(pending_processor& pf) { } - void iterate_relations(cb_func &cb) { } + void iterate_relations(pending_processor& pf) { } virtual size_t pending_count() const { return 0; } @@ -98,7 +98,6 @@ struct test_output_t : public output_t { int start() { return 0; } int connect(int startTransaction) { return 0; } - middle_t::cb_func *relation_callback() { return NULL; } void stop() { } void commit() { } void cleanup(void) { } @@ -107,6 +106,9 @@ struct test_output_t : public output_t { void enqueue_ways(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) { } int pending_way(osmid_t id, int exists) { return 0; } + void enqueue_relations(pending_queue_t &job_queue, osmid_t id, size_t output_id, size_t& added) { } + int pending_relation(osmid_t id, int exists) { return 0; } + int node_modify(osmid_t id, double lat, double lon, struct keyval *tags) { return 0; } int way_modify(osmid_t id, osmid_t *nodes, int node_count, struct keyval *tags) { return 0; } int relation_modify(osmid_t id, struct member *members, int member_count, struct keyval *tags) { return 0; }