Skip to content

Commit

Permalink
parallel-checkout: reallocate tasks when a worker completes early
Browse files Browse the repository at this point in the history
Some entries may take longer to be checked out than others. To maximize
parallelism, allow the main process to reallocate entries from slower
workers to workers that have already completed their tasks.

Original-patch-by: Nguyễn Thái Ngọc Duy <pclouds@gmail.com>
Signed-off-by: Nguyễn Thái Ngọc Duy <pclouds@gmail.com>
Signed-off-by: Matheus Tavares <matheus.bernardino@usp.br>
  • Loading branch information
matheustavares committed Nov 16, 2020
1 parent 7deb5b3 commit a483df5
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 30 deletions.
40 changes: 22 additions & 18 deletions builtin/checkout--helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,30 +78,34 @@ static void release_pc_item_data(struct parallel_checkout_item *pc_item)

static void worker_loop(struct checkout *state)
{
struct parallel_checkout_item *items = NULL;
size_t i, nr = 0, alloc = 0;

while (1) {
int len;
char *line = packet_read_line(0, &len);
struct parallel_checkout_item *items = NULL;
size_t i, nr = 0, alloc = 0;

if (!line)
break;
while (1) {
int len;
char *line = packet_read_line(0, &len);

ALLOC_GROW(items, nr + 1, alloc);
packet_to_pc_item(line, len, &items[nr++]);
}
if (!line)
break;

for (i = 0; i < nr; i++) {
struct parallel_checkout_item *pc_item = &items[i];
write_pc_item(pc_item, state);
report_result(pc_item, state);
release_pc_item_data(pc_item);
}
ALLOC_GROW(items, nr + 1, alloc);
packet_to_pc_item(line, len, &items[nr++]);
}

packet_flush(1);
if (!nr)
break;

for (i = 0; i < nr; ++i) {
struct parallel_checkout_item *pc_item = &items[i];
write_pc_item(pc_item, state);
report_result(pc_item, state);
release_pc_item_data(pc_item);
}

free(items);
packet_flush(1);
free(items);
}
}

static const char * const checkout_helper_usage[] = {
Expand Down
107 changes: 95 additions & 12 deletions parallel-checkout.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
struct pc_worker {
struct child_process cp;
size_t next_to_complete, nr_to_complete;
size_t next_to_send, nr_to_send;
};

struct parallel_checkout {
Expand Down Expand Up @@ -474,12 +475,68 @@ static void send_one_item(int fd, struct parallel_checkout_item *pc_item)
free(data);
}

static void send_batch(int fd, size_t start, size_t nr)
#define PC_BATCH_SIZE 20

static int steal_work(struct pc_worker *this, struct pc_worker *workers,
int num_workers)
{
size_t i;
for (i = 0; i < nr; i++)
send_one_item(fd, &parallel_checkout.items[start + i]);
int i, laziest_id;
size_t laziest_nr = 0;

/* Should not steal if we still have items to send to this worker. */
assert(!this->nr_to_send);

for (i = 0; i < num_workers; i++) {
if (workers[i].nr_to_send > laziest_nr) {
laziest_nr = workers[i].nr_to_send;
laziest_id = i;
}
}

if (laziest_nr > 2 * PC_BATCH_SIZE) {
struct pc_worker *laziest = &workers[laziest_id];
size_t steal_size = DIV_ROUND_UP(laziest_nr, 2);
size_t remaining = laziest_nr - steal_size;

laziest->nr_to_send = remaining;
this->next_to_send = laziest->next_to_send + remaining;
this->nr_to_send = steal_size;

return steal_size;
}

return 0;
}

static int send_next_batch(struct pc_worker *this, struct pc_worker *workers,
int num_workers)
{
int fd = this->cp.in;
size_t sent = 0, to_send_now;

/* Worker must have finished the previous batch to receive a new one. */
assert(!this->nr_to_complete);

if (!this->nr_to_send && !steal_work(this, workers, num_workers))
goto out;

to_send_now = PC_BATCH_SIZE < this->nr_to_send ?
PC_BATCH_SIZE : this->nr_to_send;

this->next_to_complete = this->next_to_send;

while (sent < to_send_now) {
send_one_item(fd, &parallel_checkout.items[this->next_to_send]);
this->next_to_send++;
sent++;
}

this->nr_to_send -= sent;
this->nr_to_complete = sent;

out:
packet_flush(fd);
return sent;
}

static struct pc_worker *setup_workers(struct checkout *state, int num_workers)
Expand Down Expand Up @@ -518,9 +575,9 @@ static struct pc_worker *setup_workers(struct checkout *state, int num_workers)
if (i < workers_with_one_extra_item)
batch_size++;

send_batch(worker->cp.in, next_to_assign, batch_size);
worker->next_to_complete = next_to_assign;
worker->nr_to_complete = batch_size;
worker->next_to_send = next_to_assign;
worker->nr_to_send = batch_size;
worker->nr_to_complete = 0;

next_to_assign += batch_size;
}
Expand Down Expand Up @@ -596,12 +653,35 @@ static void parse_and_save_result(const char *line, int len,
advance_progress_meter();
}

static void gather_results_from_workers(struct pc_worker *workers,
int num_workers, struct checkout *state)
/*
* Protocol between main checkout process and the workers is quite
* simple. All messages are packaged in pkt-line format. PKT-FLUSH
* marks the end of input (from both sides):
*
* Main process Worker
* ---- <binary data for item n> ---->
* ---- <binary data for item n+1> -->
* ...
* ------------- <flush> ------------>
* <------- <binary result n> --------
* <------- <binary result n+1> ------
* ...
* <------------ <flush> -------------
* Repeat from begining, or main
* process sends a flush with no
* items to indicate conclusion.
*/
static void main_loop(struct pc_worker *workers, int num_workers,
struct checkout *state)
{
int i, active_workers = num_workers;
struct pollfd *pfds;

for (i = 0; i < num_workers; i++) {
if (!send_next_batch(&workers[i], workers, num_workers))
BUG("intialized worker with no work");
}

CALLOC_ARRAY(pfds, num_workers);
for (i = 0; i < num_workers; i++) {
pfds[i].fd = workers[i].cp.out;
Expand Down Expand Up @@ -629,8 +709,11 @@ static void gather_results_from_workers(struct pc_worker *workers,
const char *line = packet_read_line(pfd->fd, &len);

if (!line) {
pfd->fd = -1;
active_workers--;
if (!send_next_batch(worker, workers,
num_workers)) {
pfd->fd = -1;
active_workers--;
}
} else {
parse_and_save_result(line, len, worker,
state);
Expand Down Expand Up @@ -680,7 +763,7 @@ int run_parallel_checkout(struct checkout *state, int num_workers, int threshold
write_items_sequentially(state);
} else {
struct pc_worker *workers = setup_workers(state, num_workers);
gather_results_from_workers(workers, num_workers, state);
main_loop(workers, num_workers, state);
finish_workers(workers, num_workers);
}

Expand Down

0 comments on commit a483df5

Please sign in to comment.