Skip to content

Commit

Permalink
convert: add "status=delayed" to filter process protocol
Browse files Browse the repository at this point in the history
Some `clean` / `smudge` filters might require a significant amount of
time to process a single blob (e.g. the Git LFS smudge filter might
perform network requests). During this process the Git checkout
operation is blocked and Git needs to wait until the filter is done to
continue with the checkout.

Teach the filter process protocol (introduced in edcc858) to accept the
status "delayed" as response to a filter request. Upon this response Git
continues with the checkout operation. After the checkout operation Git
calls "finish_delayed_checkout" which queries the filter for remaining
blobs. If the filter is still working on the completion, then the filter
is expected to block. If the filter has completed all remaining blobs
then an empty response is expected.

Git has a multiple code paths that checkout a blob. Support delayed
checkouts only in `clone` (in unpack-trees.c) and `checkout` operations.

Signed-off-by: Lars Schneider <larsxschneider@gmail.com>
  • Loading branch information
larsxschneider committed Jun 1, 2017
1 parent e32ab92 commit 23001c0
Show file tree
Hide file tree
Showing 9 changed files with 498 additions and 91 deletions.
65 changes: 61 additions & 4 deletions Documentation/gitattributes.txt
Expand Up @@ -425,8 +425,8 @@ packet: git< capability=clean
packet: git< capability=smudge
packet: git< 0000
------------------------
Supported filter capabilities in version 2 are "clean" and
"smudge".
Supported filter capabilities in version 2 are "clean", "smudge",
and "delay".

Afterwards Git sends a list of "key=value" pairs terminated with
a flush packet. The list will contain at least the filter command
Expand Down Expand Up @@ -512,12 +512,69 @@ the protocol then Git will stop the filter process and restart it
with the next file that needs to be processed. Depending on the
`filter.<driver>.required` flag Git will interpret that as error.

After the filter has processed a blob it is expected to wait for
the next "key=value" list containing a command. Git will close
After the filter has processed a command it is expected to wait for
a "key=value" list containing the next command. Git will close
the command pipe on exit. The filter is expected to detect EOF
and exit gracefully on its own. Git will wait until the filter
process has stopped.

Delay
^^^^^

If the filter supports the "delay" capability, then Git can send the
flag "can-delay" after the filter command and pathname. This flag
denotes that the filter can delay filtering the current blob (e.g. to
compensate network latencies) by responding with no content but with
the status "delayed" and a flush packet.
------------------------
packet: git> command=smudge
packet: git> pathname=path/testfile.dat
packet: git> can-delay=1
packet: git> 0000
packet: git> CONTENT
packet: git> 0000
packet: git< status=delayed
packet: git< 0000
------------------------

If the filter supports the "delay" capability then it must support the
"list_available_blobs" command. If Git sends this command, then the
filter is expected to return a list of pathnames of blobs that are
available. The list must be terminated with a flush packet followed
by a "success" status that is also terminated with a flush packet. If
no blobs for the delayed paths are available, yet, then the filter is
expected to block the response until at least one blob becomes
available. The filter can tell Git that it has no more delayed blobs
by sending an empty list.
------------------------
packet: git> command=list_available_blobs
packet: git> 0000
packet: git< pathname=path/testfile.dat
packet: git< pathname=path/otherfile.dat
packet: git< 0000
packet: git< status=success
packet: git< 0000
------------------------

After Git received the pathnames, it will request the corresponding
blobs again. These requests contain a pathname and an empty content
section. The filter is expected to respond with the smudged content
in the usual way as explained above.
------------------------
packet: git> command=smudge
packet: git> pathname=path/testfile.dat
packet: git> 0000
packet: git> 0000 # empty content!
packet: git< status=success
packet: git< 0000
packet: git< SMUDGED_CONTENT
packet: git< 0000
packet: git< 0000 # empty list, keep "status=success" unchanged!
------------------------

Example
^^^^^^^

A long running filter demo implementation can be found in
`contrib/long-running-filter/example.pl` located in the Git
core repository. If you develop your own long running filter
Expand Down
3 changes: 3 additions & 0 deletions builtin/checkout.c
Expand Up @@ -376,6 +376,8 @@ static int checkout_paths(const struct checkout_opts *opts,
state.force = 1;
state.refresh_cache = 1;
state.istate = &the_index;

enable_delayed_checkout(&state);
for (pos = 0; pos < active_nr; pos++) {
struct cache_entry *ce = active_cache[pos];
if (ce->ce_flags & CE_MATCHED) {
Expand All @@ -390,6 +392,7 @@ static int checkout_paths(const struct checkout_opts *opts,
pos = skip_same_name(ce, pos) - 1;
}
}
errs |= finish_delayed_checkout(&state);

if (write_locked_index(&the_index, lock_file, COMMIT_LOCK))
die(_("unable to write new index file"));
Expand Down
6 changes: 5 additions & 1 deletion cache.h
Expand Up @@ -1543,16 +1543,20 @@ extern int ident_cmp(const struct ident_split *, const struct ident_split *);
struct checkout {
struct index_state *istate;
const char *base_dir;
struct delayed_checkout *delayed_checkout;
int base_dir_len;
unsigned force:1,
quiet:1,
not_new:1,
refresh_cache:1;
};
#define CHECKOUT_INIT { NULL, "" }
#define CHECKOUT_INIT { NULL, "", NULL }


#define TEMPORARY_FILENAME_LENGTH 25
extern int checkout_entry(struct cache_entry *ce, const struct checkout *state, char *topath);
extern void enable_delayed_checkout(struct checkout *state);
extern int finish_delayed_checkout(struct checkout *state);

struct cache_def {
struct strbuf path;
Expand Down
119 changes: 102 additions & 17 deletions convert.c
Expand Up @@ -496,6 +496,7 @@ static int apply_single_file_filter(const char *path, const char *src, size_t le

#define CAP_CLEAN (1u<<0)
#define CAP_SMUDGE (1u<<1)
#define CAP_DELAY (1u<<2)

struct cmd2process {
struct subprocess_entry subprocess; /* must be the first member! */
Expand Down Expand Up @@ -533,7 +534,8 @@ static int start_multi_file_filter_fn(struct subprocess_entry *subprocess)
if (err)
goto done;

err = packet_writel(process->in, "capability=clean", "capability=smudge", NULL);
err = packet_writel(process->in,
"capability=clean", "capability=smudge", "capability=delay", NULL);

for (;;) {
cap_buf = packet_read_line(process->out, NULL);
Expand All @@ -549,6 +551,8 @@ static int start_multi_file_filter_fn(struct subprocess_entry *subprocess)
entry->supported_capabilities |= CAP_CLEAN;
} else if (!strcmp(cap_name, "smudge")) {
entry->supported_capabilities |= CAP_SMUDGE;
} else if (!strcmp(cap_name, "delay")) {
entry->supported_capabilities |= CAP_DELAY;
} else {
warning(
"external filter '%s' requested unsupported filter capability '%s'",
Expand Down Expand Up @@ -590,9 +594,11 @@ static void handle_filter_error(const struct strbuf *filter_status,

static int apply_multi_file_filter(const char *path, const char *src, size_t len,
int fd, struct strbuf *dst, const char *cmd,
const unsigned int wanted_capability)
const unsigned int wanted_capability,
struct delayed_checkout *dco)
{
int err;
int can_delay = 0;
struct cmd2process *entry;
struct child_process *process;
struct strbuf nbuf = STRBUF_INIT;
Expand Down Expand Up @@ -647,6 +653,14 @@ static int apply_multi_file_filter(const char *path, const char *src, size_t len
if (err)
goto done;

if (CAP_DELAY & entry->supported_capabilities &&
dco && dco->state == CE_CAN_DELAY) {
can_delay = 1;
err = packet_write_fmt_gently(process->in, "can-delay=1\n");
if (err)
goto done;
}

err = packet_flush_gently(process->in);
if (err)
goto done;
Expand All @@ -662,14 +676,78 @@ static int apply_multi_file_filter(const char *path, const char *src, size_t len
if (err)
goto done;

err = strcmp(filter_status.buf, "success");
if (can_delay && !strcmp(filter_status.buf, "delayed")) {
dco->state = CE_DELAYED;
string_list_insert(&dco->filters, cmd);
string_list_insert(&dco->paths, path);
} else {
/* The filter got the blob and wants to send us a response. */
err = strcmp(filter_status.buf, "success");
if (err)
goto done;

err = read_packetized_to_strbuf(process->out, &nbuf) < 0;
if (err)
goto done;

err = subprocess_read_status(process->out, &filter_status);
if (err)
goto done;

err = strcmp(filter_status.buf, "success");
}

done:
sigchain_pop(SIGPIPE);

if (err)
handle_filter_error(&filter_status, entry, wanted_capability);
else
strbuf_swap(dst, &nbuf);
strbuf_release(&nbuf);
return !err;
}


int async_query_available_blobs(const char *cmd, struct string_list *delayed_paths)
{
int err;
char *line;
struct cmd2process *entry;
struct child_process *process;
struct strbuf filter_status = STRBUF_INIT;

assert(subprocess_map_initialized);
entry = (struct cmd2process *)subprocess_find_entry(&subprocess_map, cmd);
if (!entry) {
error("external filter '%s' is not available anymore although "
"not all paths have been filtered", cmd);
return 0;
}
process = &entry->subprocess.process;
sigchain_push(SIGPIPE, SIG_IGN);

err = packet_write_fmt_gently(
process->in, "command=list_available_blobs\n");
if (err)
goto done;

err = read_packetized_to_strbuf(process->out, &nbuf) < 0;
err = packet_flush_gently(process->in);
if (err)
goto done;

for (;;) {
const char* pre = "pathname=";
const int pre_len = strlen(pre);
line = packet_read_line(process->out, NULL);
if (!line)
break;
err = strlen(line) <= pre_len || strncmp(line, pre, pre_len);
if (err)
goto done;
string_list_insert(delayed_paths, xstrdup(line+pre_len));
}

err = subprocess_read_status(process->out, &filter_status);
if (err)
goto done;
Expand All @@ -680,10 +758,7 @@ static int apply_multi_file_filter(const char *path, const char *src, size_t len
sigchain_pop(SIGPIPE);

if (err)
handle_filter_error(&filter_status, entry, wanted_capability);
else
strbuf_swap(dst, &nbuf);
strbuf_release(&nbuf);
handle_filter_error(&filter_status, entry, 0);
return !err;
}

Expand All @@ -698,7 +773,8 @@ static struct convert_driver {

static int apply_filter(const char *path, const char *src, size_t len,
int fd, struct strbuf *dst, struct convert_driver *drv,
const unsigned int wanted_capability)
const unsigned int wanted_capability,
struct delayed_checkout *dco)
{
const char *cmd = NULL;

Expand All @@ -716,7 +792,8 @@ static int apply_filter(const char *path, const char *src, size_t len,
if (cmd && *cmd)
return apply_single_file_filter(path, src, len, fd, dst, cmd);
else if (drv->process && *drv->process)
return apply_multi_file_filter(path, src, len, fd, dst, drv->process, wanted_capability);
return apply_multi_file_filter(path, src, len, fd, dst,
drv->process, wanted_capability, dco);

return 0;
}
Expand Down Expand Up @@ -1057,7 +1134,7 @@ int would_convert_to_git_filter_fd(const char *path)
if (!ca.drv->required)
return 0;

return apply_filter(path, NULL, 0, -1, NULL, ca.drv, CAP_CLEAN);
return apply_filter(path, NULL, 0, -1, NULL, ca.drv, CAP_CLEAN, NULL);
}

const char *get_convert_attr_ascii(const char *path)
Expand Down Expand Up @@ -1094,7 +1171,7 @@ int convert_to_git(const char *path, const char *src, size_t len,

convert_attrs(&ca, path);

ret |= apply_filter(path, src, len, -1, dst, ca.drv, CAP_CLEAN);
ret |= apply_filter(path, src, len, -1, dst, ca.drv, CAP_CLEAN, NULL);
if (!ret && ca.drv && ca.drv->required)
die("%s: clean filter '%s' failed", path, ca.drv->name);

Expand All @@ -1119,7 +1196,7 @@ void convert_to_git_filter_fd(const char *path, int fd, struct strbuf *dst,
assert(ca.drv);
assert(ca.drv->clean || ca.drv->process);

if (!apply_filter(path, NULL, 0, fd, dst, ca.drv, CAP_CLEAN))
if (!apply_filter(path, NULL, 0, fd, dst, ca.drv, CAP_CLEAN, NULL))
die("%s: clean filter '%s' failed", path, ca.drv->name);

crlf_to_git(path, dst->buf, dst->len, dst, ca.crlf_action, checksafe);
Expand All @@ -1128,7 +1205,7 @@ void convert_to_git_filter_fd(const char *path, int fd, struct strbuf *dst,

static int convert_to_working_tree_internal(const char *path, const char *src,
size_t len, struct strbuf *dst,
int normalizing)
int normalizing, struct delayed_checkout *dco)
{
int ret = 0, ret_filter = 0;
struct conv_attrs ca;
Expand All @@ -1153,21 +1230,29 @@ static int convert_to_working_tree_internal(const char *path, const char *src,
}
}

ret_filter = apply_filter(path, src, len, -1, dst, ca.drv, CAP_SMUDGE);
ret_filter = apply_filter(
path, src, len, -1, dst, ca.drv, CAP_SMUDGE, dco);
if (!ret_filter && ca.drv && ca.drv->required)
die("%s: smudge filter %s failed", path, ca.drv->name);

return ret | ret_filter;
}

int async_convert_to_working_tree(const char *path, const char *src,
size_t len, struct strbuf *dst,
void *dco)
{
return convert_to_working_tree_internal(path, src, len, dst, 0, dco);
}

int convert_to_working_tree(const char *path, const char *src, size_t len, struct strbuf *dst)
{
return convert_to_working_tree_internal(path, src, len, dst, 0);
return convert_to_working_tree_internal(path, src, len, dst, 0, NULL);
}

int renormalize_buffer(const char *path, const char *src, size_t len, struct strbuf *dst)
{
int ret = convert_to_working_tree_internal(path, src, len, dst, 1);
int ret = convert_to_working_tree_internal(path, src, len, dst, 1, NULL);
if (ret) {
src = dst->buf;
len = dst->len;
Expand Down

0 comments on commit 23001c0

Please sign in to comment.