Skip to content

Commit

Permalink
enhancement:maptool:improve processing speed and fix some memory holes (
Browse files Browse the repository at this point in the history
#901)

* Convert turn relation processing to multi threaded
* Enhance item read function to block read the items for faster overall reading speed.
* Fix some memory holes found by valgrind. There are still many left. Specially in coastline and Country border code.
  • Loading branch information
metalstrolch committed Oct 18, 2019
1 parent 6c16e5a commit f1c0429
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 29 deletions.
44 changes: 38 additions & 6 deletions navit/maptool/itembin_buffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,51 @@
#include "maptool.h"
#include "debug.h"


/** Buffer for temporarily storing an item. */
static char misc_item_buffer[20000000];
/** An item_bin for temporary use. */
struct item_bin *tmp_item_bin=(struct item_bin *)(void *)misc_item_buffer;
/** A node_item for temporary use. */
static struct node_item *tmp_node_item=(struct node_item *)(void *)misc_item_buffer;

struct node_item *
read_node_item(FILE *in) {
if (fread(tmp_node_item, sizeof(struct node_item), 1, in) != 1)
return NULL;
return tmp_node_item;
#define ITEM_COUNT 1*1024*1024
/* we read in bigger chunks as file IO in small chunks is slow as hell */
static FILE * last_in = NULL;
static long last_pos = 0;
static int in_count;
static int out_count;
static struct node_item item_buffer[sizeof(struct node_item) * ITEM_COUNT];

struct node_item * retval = NULL;

if((last_in != in) || (last_pos != ftell(in))) {
if((out_count - in_count) > 0)
fprintf(stderr, "change file. Still %d items\n", out_count - in_count);
/* got new file. flush buffer. */
in_count=0;
out_count=0;
last_in=in;
}

/* check if we need to really read from file */
if ((in_count - out_count) > 0) {
/* no, return item from buffer */
retval=&(item_buffer[out_count]);
} else {
out_count=0;
in_count=fread(item_buffer, sizeof(struct node_item), ITEM_COUNT, in);
//fprintf(stderr, "read %d items\n", in_count);
if(in_count < 1) {
/* buffer still empty after read */
return NULL;
}
/* yes, try to read full buffer at once */
retval=&item_buffer[0];
}
out_count ++;
last_pos=ftell(in);

return retval;
}

struct item_bin *
Expand Down
4 changes: 4 additions & 0 deletions navit/maptool/maptool.c
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,8 @@ static void maptool_assemble_map(struct maptool_params *p, char *suffix, char **
map_information_attrs[1].u.str=p->url;
}
index_init(zip_info, 1);
g_free(zipdir);
g_free(zipindex);
}
if (!g_strcmp0(suffix,ch_suffix)) { /* Makes compiler happy due to bug 35903 in gcc */
ch_assemble_map(suffix0,suffix,zip_info);
Expand Down Expand Up @@ -1125,5 +1127,7 @@ int main(int argc, char **argv) {
}
phase+=2;
start_phase(&p,"done");
if(p.timestamp != NULL)
g_free(p.timestamp);
return 0;
}
1 change: 1 addition & 0 deletions navit/maptool/maptool.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ void relations_add_relation_member_entry(struct relations *rel, struct relations
void *member_priv, enum relation_member_type type, osmid id);
void relations_add_relation_default_entry(struct relations *rel, struct relations_func *func);
void relations_process(struct relations *rel, FILE *nodes, FILE *ways);
void relations_process_multi(struct relations **rel, int count, FILE *nodes, FILE *ways);
void relations_destroy(struct relations *rel);


Expand Down
203 changes: 180 additions & 23 deletions navit/maptool/osm.c
Original file line number Diff line number Diff line change
Expand Up @@ -2826,6 +2826,7 @@ static int process_multipolygons_find_loops(osmid relid, int in_count, struct it
sequence_count = process_multipolygons_find_loop(in_count, parts, sequence, used);
if(sequence_count < 0) {
done = 1;
g_free(sequence);
} else if(sequence_count == 0) {
osm_warning("relation",relid,0,"multipolygon: skipping non loop sequence\n");
/* skip empty sequence */
Expand Down Expand Up @@ -3260,11 +3261,15 @@ void process_multipolygons(FILE *in, FILE *coords, FILE *ways, FILE *ways_index,
if(ways)
fseek(ways, 0,SEEK_SET);
fprintf(stderr,"process_multipolygons:process (thread %d)\n", i);
/* we could use relations_process_multi here as well, but this would
* use way more memory. */
relations_process(relations[i], coords, ways);
fprintf(stderr,"process_multipolygons:finish (thread %d)\n", i);
process_multipolygons_finish(multipolygons[i], out);
relations_destroy(relations[i]);
}
if(multipolygons != NULL)
g_free(multipolygons);
g_free(relations);
sig_alrm(0);
sig_alrm_end();
Expand Down Expand Up @@ -3386,6 +3391,8 @@ static void process_turn_restrictions_finish(GList *tr, FILE *out) {

}
}
/* just for fun...*/
processed_relations ++;
g_free(t->c[0]);
g_free(t->c[1]);
g_free(t->c[2]);
Expand All @@ -3395,60 +3402,64 @@ static void process_turn_restrictions_finish(GList *tr, FILE *out) {
g_list_free(tr);
}

static GList *process_turn_restrictions_setup(FILE *in, struct relations *relations) {
/**
* @brief prepare one multipolygon relation for relattion processing
*
* @param ib the relation
* @param relations the relation processing structure
* @param relations_func function to use for the members
* @param turn_restrictions write the resulting turn_restriction to the list
*/
static void process_turn_restrictions_setup_one(struct item_bin * ib, struct relations * relations,
struct relations_func * relations_func, GList ** turn_restrictions) {
struct relation_member fromm,tom,viam,tmpm;
long long relid;
struct item_bin *ib;
struct relations_func *relations_func;
int min_count;
GList *turn_restrictions=NULL;

fseek(in, 0, SEEK_SET);
relations_func=relations_func_new(process_turn_restrictions_member, NULL);
while ((ib=read_item(in))) {
if(ib != NULL) {
struct turn_restriction *turn_restriction;
relid=item_bin_get_relationid(ib);
min_count=0;
if (!search_relation_member(ib, "from",&fromm,&min_count)) {
osm_warning("relation",relid,0,"turn restriction: from member missing\n");
continue;
return;
}
if (search_relation_member(ib, "from",&tmpm,&min_count)) {
osm_warning("relation",relid,0,"turn restriction: multiple from members\n");
continue;
return;
}
min_count=0;
if (!search_relation_member(ib, "to",&tom,&min_count)) {
osm_warning("relation",relid,0,"turn restriction: to member missing\n");
continue;
return;
}
if (search_relation_member(ib, "to",&tmpm,&min_count)) {
osm_warning("relation",relid,0,"turn restriction: multiple to members\n");
continue;
return;
}
min_count=0;
if (!search_relation_member(ib, "via",&viam,&min_count)) {
osm_warning("relation",relid,0,"turn restriction: via member missing\n");
continue;
return;
}
if (search_relation_member(ib, "via",&tmpm,&min_count)) {
osm_warning("relation",relid,0,"turn restriction: multiple via member\n");
continue;
return;
}
if (fromm.type != rel_member_way) {
osm_warning("relation",relid,0,"turn restriction: wrong type for from member ");
osm_warning(osm_types[fromm.type],fromm.id,1,"\n");
continue;
return;
}
if (tom.type != rel_member_way) {
osm_warning("relation",relid,0,"turn restriction: wrong type for to member ");
osm_warning(osm_types[tom.type],tom.id,1,"\n");
continue;
return;
}
if (viam.type != rel_member_node && viam.type != rel_member_way) {
osm_warning("relation",relid,0,"turn restriction: wrong type for via member ");
osm_warning(osm_types[viam.type],viam.id,1,"\n");
continue;
return;
}
turn_restriction=g_new0(struct turn_restriction, 1);
turn_restriction->relid=relid;
Expand All @@ -3458,19 +3469,165 @@ static GList *process_turn_restrictions_setup(FILE *in, struct relations *relati
relations_add_relation_member_entry(relations, relations_func, turn_restriction, (gpointer) 0, fromm.type, fromm.id);
relations_add_relation_member_entry(relations, relations_func, turn_restriction, (gpointer) 1, viam.type, viam.id);
relations_add_relation_member_entry(relations, relations_func, turn_restriction, (gpointer) 2, tom.type, tom.id);
turn_restrictions=g_list_append(turn_restrictions, turn_restriction);
*turn_restrictions=g_list_append(*turn_restrictions, turn_restriction);
}
}

/**
* @brief worker thread private storage
*/
struct process_turn_restrictions_setup_thread {
int number;
GAsyncQueue * queue;
struct relations * relations;
struct relations_func * relations_func;
GList* turn_restrictions;
GThread * thread;
};

/**
* @brief turn restrictions setup worker thread.
*
* This thread processes any item passed to it via async queue into it's local relations
* function.
* @param data this threads local storage
*/
static gpointer process_turn_restrictions_setup_worker (gpointer data) {
struct item_bin * ib;
//long long relid;
struct process_turn_restrictions_setup_thread * me = (struct process_turn_restrictions_setup_thread*) data;
fprintf(stderr,"worker %d up\n", me->number);
while((ib=g_async_queue_pop (me->queue)) != &killer) {
processed_relations ++;
//relid=item_bin_get_relationid(ib);
//fprintf(stderr,"worker %d processing %lld\n", me->number, relid);
process_turn_restrictions_setup_one(ib, me->relations, me->relations_func, &(me->turn_restrictions));
/* done with that. Free the item_bin */
g_free(ib);
}
fprintf(stderr,"worker %d exit\n", me->number);
g_thread_exit(NULL);
return NULL;
}

/**
* @brief prepare turn restriction way matching
*
* This function reads all turn restriction relations and prepares relations structures
* for later way matching. Since this scales quite ugly, (O^3) i think, we use multiple threads
* creating their own hash each. This way none of the hashes get's that big, and we can utilize
* more cpu power.
*
* @param in file containing the relations
* @param thread_count number of threads to use
* @param relations array of preallocated relations structures. One per thread.
*
* @returns array of GLists. One per thread containing the resulting structures.
*/
static GList ** process_turn_restrictions_setup(FILE *in, int thread_count, struct relations **relations) {
struct process_turn_restrictions_setup_thread *sthread;

struct item_bin *ib;
struct relations_func *relations_func;
int i;
GList **turn_restrictions=NULL;
/* allocate and reference async queue */
GAsyncQueue * ib_queue=g_async_queue_new ();
g_async_queue_ref(ib_queue);
/* allocate per thread storage */
sthread=g_malloc0(sizeof(struct process_turn_restrictions_setup_thread) * thread_count);

fseek(in, 0, SEEK_SET);
relations_func=relations_func_new(process_turn_restrictions_member, NULL);

/* start the threads */
for(i=0; i < thread_count; i ++) {
sthread[i].number = i;
sthread[i].queue = ib_queue;
sthread[i].relations_func = relations_func;
sthread[i].relations = relations[i];
sthread[i].turn_restrictions = NULL;
sthread[i].thread = g_thread_new ("process_turn_restrictions_setup_worker", process_turn_restrictions_setup_worker,
&(sthread[i]));
}

while ((ib=read_item(in))) {
/* get a duplicate of the returned item, as the one returned shares buffer */
struct item_bin * dup = item_bin_dup(ib);
//long long relid;
//relid=item_bin_get_relationid(dup);
//fprintf(stderr,"Pushing %lld\n", relid);
/* the dup's will be freed by the thread processing them*/
g_async_queue_push(ib_queue,dup);
/* limit queue size. This is ugly, but since GAsyncQueue doesn't support
* push to block when the queue reached a decent size, I help myself
* with this ugly hack */
while(g_async_queue_length(ib_queue) > 1000)
usleep(200);
}

/* stop iand join all remaining threads */
for(i = 0; i < thread_count; i ++)
g_async_queue_push(ib_queue,&killer);
for(i=0; i < thread_count; i ++)
g_thread_join(sthread[i].thread);

/* rescue the resulting glist */
turn_restrictions = g_malloc0(sizeof(GList *) * thread_count);
for(i =0; i < thread_count; i ++)
turn_restrictions[i]=sthread[i].turn_restrictions;

/* free the thread storage */
g_free(sthread);

/* release the queue */
g_async_queue_unref(ib_queue);

/* return the list of turn_restrictions */
return turn_restrictions;
}

void process_turn_restrictions(FILE *in, FILE *coords, FILE *ways, FILE *ways_index, FILE *out) {
struct relations *relations=relations_new();
GList *turn_restrictions;
/* thread count is from maptool.c as commandline parameter */
int i;
struct relations **relations;
GList **turn_restrictions = NULL;
sig_alrm(0);

relations = g_malloc0(sizeof(struct relations *) * thread_count);
for(i=0; i < thread_count; i ++)
relations[i] = relations_new();
fseek(in, 0, SEEK_SET);
turn_restrictions=process_turn_restrictions_setup(in, relations);
relations_process(relations, coords, ways);
process_turn_restrictions_finish(turn_restrictions, out);
relations_destroy(relations);
fprintf(stderr,"process_turn_restrictions:setup (threads %d)\n", thread_count);
turn_restrictions=process_turn_restrictions_setup(in,thread_count,relations);
/* Here we get an array of resulting relations structures and resultin
* GLists.
* Of course we need to iterate the ways multiple times, but that's fast
* compared to hashing the relations structures
* This even saves a lot of main memory, as we can process every result from
* every thread at once completely. Since we know it's self containing
*/
sig_alrm(0);
processed_relations=0;
processed_ways=0;
sig_alrm(0);
if(coords)
fseek(coords, 0,SEEK_SET);
if(ways)
fseek(ways, 0,SEEK_SET);
fprintf(stderr,"process_multipolygons:process (thread %d)\n", i);
relations_process_multi(relations, thread_count, coords, ways);
for( i=0; i < thread_count; i ++) {

fprintf(stderr,"process_turn_restrictions:finish (thread %d)\n", i);
process_turn_restrictions_finish(turn_restrictions[i], out);
relations_destroy(relations[i]);
}
if(turn_restrictions != NULL)
g_free(turn_restrictions);
g_free(relations);
sig_alrm(0);
sig_alrm_end();
}
#if 0
void process_turn_restrictions_old(FILE *in, FILE *coords, FILE *ways, FILE *ways_index, FILE *out) {
Expand Down

0 comments on commit f1c0429

Please sign in to comment.