Skip to content

Commit

Permalink
Git commit - rehash. Incorrect earlier commit.
Browse files Browse the repository at this point in the history
Implement Separate metadata stream.
Fix blatant wrong check in Bzip2 compressor.
Implement E8E9 filter fallback in Dispack.
Improve dict buffer size checks.
Reduce thread count to control memory usage in archive mode.
  • Loading branch information
moinakg committed Oct 24, 2014
1 parent 1a00613 commit e7081eb
Show file tree
Hide file tree
Showing 18 changed files with 963 additions and 205 deletions.
4 changes: 2 additions & 2 deletions Makefile.in
Expand Up @@ -32,10 +32,10 @@ LIBVER=1
MAINSRCS = utils/utils.c allocator.c lzma_compress.c ppmd_compress.c \
adaptive_compress.c lzfx_compress.c lz4_compress.c none_compress.c \
utils/xxhash_base.c utils/heap.c utils/cpuid.c filters/analyzer/analyzer.c \
pcompress.c
meta_stream.c pcompress.c
MAINHDRS = allocator.h pcompress.h utils/utils.h utils/xxhash.h utils/heap.h \
utils/cpuid.h utils/xxhash.h archive/pc_archive.h filters/dispack/dis.hpp \
filters/analyzer/analyzer.h
meta_stream.h filters/analyzer/analyzer.h
MAINOBJS = $(MAINSRCS:.c=.o)

PROGSRCS = main.c
Expand Down
4 changes: 4 additions & 0 deletions README.md
Expand Up @@ -123,6 +123,10 @@ Standard Usage

The fastest checksum is the BLAKE2 family.

-T
Disable Metadata Streams. Pathname metadata is normally packed into separate
chunks distinct from file data. With this option this behavior is disabled.

<archive filename>
Pathname of the resulting archive. A '.pz' extension is automatically added
if not already present. This can also be specified as '-' in order to send
Expand Down
6 changes: 6 additions & 0 deletions archive/libarchive/libarchive/archive.h
Expand Up @@ -518,6 +518,12 @@ __LA_DECL int archive_read_set_options(struct archive *_a,
*/
__LA_DECL int archive_request_is_metadata(struct archive *a);

/*
* Indicate whether separate metadata handling is being done by the
* callbacks. This triggers special behavior during archive read.
*/
__LA_DECL int archive_set_metadata_streaming(struct archive *a, int flag);

/*-
* Convenience function to recreate the current entry (whose header
* has just been read) on disk.
Expand Down
9 changes: 9 additions & 0 deletions archive/libarchive/libarchive/archive_private.h
Expand Up @@ -127,6 +127,15 @@ struct archive {
* separately store/handle metadata and data.
*/
int cb_is_metadata;

/*
* Set the metadata handling flag. This indicates to libarchive
* that callback routines are processing metadata as a separate
* stream. This means that cb_is_metadata flag is set during
* read/write. In addition, archive reads are handled differently
* using a shadow filter structure with separate copy buffer.
*/
int is_metadata_streaming;
};

/* Check magic value and state; return(ARCHIVE_FATAL) if it isn't valid. */
Expand Down
57 changes: 55 additions & 2 deletions archive/libarchive/libarchive/archive_read.c
Expand Up @@ -453,14 +453,16 @@ int
archive_read_open1(struct archive *_a)
{
struct archive_read *a = (struct archive_read *)_a;
struct archive_read_filter *filter, *tmp;
struct archive_read_filter *filter, *tmp = NULL; /* silence compiler */
int slot, e;
unsigned int i;

archive_check_magic(_a, ARCHIVE_READ_MAGIC, ARCHIVE_STATE_NEW,
"archive_read_open");
archive_clear_error(&a->archive);

if (_a->is_metadata_streaming)
_a->cb_is_metadata = 1;
if (a->client.reader == NULL) {
archive_set_error(&a->archive, EINVAL,
"No reader function provided to archive_read_open");
Expand All @@ -485,6 +487,13 @@ archive_read_open1(struct archive *_a)
filter = calloc(1, sizeof(*filter));
if (filter == NULL)
return (ARCHIVE_FATAL);
if (_a->is_metadata_streaming) {
tmp = calloc(1, sizeof(*filter));
if (tmp == NULL) {
free(filter);
return (ARCHIVE_FATAL);
}
}
filter->bidder = NULL;
filter->upstream = NULL;
filter->archive = a;
Expand All @@ -497,6 +506,10 @@ archive_read_open1(struct archive *_a)
filter->sswitch = client_switch_proxy;
filter->name = "none";
filter->code = ARCHIVE_FILTER_NONE;
if (_a->is_metadata_streaming) {
memcpy(tmp, filter, sizeof (*filter));
filter->shadow = tmp;
}

a->client.dataset[0].begin_position = 0;
if (!a->filter || !a->bypass_filter_bidding)
Expand Down Expand Up @@ -533,6 +546,8 @@ archive_read_open1(struct archive *_a)

/* Ensure libarchive starts from the first node in a multivolume set */
client_switch_proxy(a->filter, 0);
if (_a->is_metadata_streaming)
_a->cb_is_metadata = 0;
return (e);
}

Expand All @@ -546,7 +561,7 @@ choose_filters(struct archive_read *a)
{
int number_bidders, i, bid, best_bid;
struct archive_read_filter_bidder *bidder, *best_bidder;
struct archive_read_filter *filter;
struct archive_read_filter *filter, *tmp;
ssize_t avail;
int r;

Expand Down Expand Up @@ -585,9 +600,20 @@ choose_filters(struct archive_read *a)
= (struct archive_read_filter *)calloc(1, sizeof(*filter));
if (filter == NULL)
return (ARCHIVE_FATAL);
if (a->archive.is_metadata_streaming) {
tmp = calloc(1, sizeof(*filter));
if (tmp == NULL) {
free(filter);
return (ARCHIVE_FATAL);
}
}
filter->bidder = best_bidder;
filter->archive = a;
filter->upstream = a->filter;
if (a->archive.is_metadata_streaming) {
memcpy(tmp, filter, sizeof (*filter));
filter->shadow = tmp;
}
a->filter = filter;
r = (best_bidder->init)(a->filter);
if (r != ARCHIVE_OK) {
Expand Down Expand Up @@ -933,6 +959,8 @@ __archive_read_free_filters(struct archive_read *a)
{
while (a->filter != NULL) {
struct archive_read_filter *t = a->filter->upstream;
if (a->archive.is_metadata_streaming)
free(a->filter->shadow);
free(a->filter);
a->filter = t;
}
Expand Down Expand Up @@ -1221,6 +1249,15 @@ __archive_read_filter_ahead(struct archive_read_filter *filter,
return (NULL);
}

/*
* Switch to shadow filter for metadata reads when using separate
* metadata stream.
*/
if (filter->archive->archive.is_metadata_streaming &&
filter->archive->archive.cb_is_metadata) {
filter = filter->shadow;
}

/*
* Keep pulling more data until we can satisfy the request.
*/
Expand Down Expand Up @@ -1397,6 +1434,15 @@ __archive_read_filter_consume(struct archive_read_filter * filter,
if (request == 0)
return 0;

/*
* Switch to shadow filter for metadata reads when using separate
* metadata stream.
*/
if (filter->archive->archive.is_metadata_streaming &&
filter->archive->archive.cb_is_metadata) {
filter = filter->shadow;
}

skipped = advance_file_pointer(filter, request);
if (skipped == request)
return (skipped);
Expand Down Expand Up @@ -1522,6 +1568,13 @@ __archive_read_filter_seek(struct archive_read_filter *filter, int64_t offset,
if (filter->seek == NULL)
return (ARCHIVE_FAILED);

/*
* No seeking when metadata streams are enabled.
*/
if (filter->archive->archive.is_metadata_streaming) {
return (ARCHIVE_FAILED);
}

client = &(filter->archive->client);
switch (whence) {
case SEEK_CUR:
Expand Down
14 changes: 13 additions & 1 deletion archive/libarchive/libarchive/archive_read_append_filter.c
Expand Up @@ -40,7 +40,7 @@ archive_read_append_filter(struct archive *_a, int code)
int r1, r2, number_bidders, i;
char str[20];
struct archive_read_filter_bidder *bidder;
struct archive_read_filter *filter;
struct archive_read_filter *filter, *tmp;
struct archive_read *a = (struct archive_read *)_a;

r1 = r2 = (ARCHIVE_OK);
Expand Down Expand Up @@ -126,6 +126,18 @@ archive_read_append_filter(struct archive *_a, int code)
filter->bidder = bidder;
filter->archive = a;
filter->upstream = a->filter;
if (_a->is_metadata_streaming)
{
tmp = calloc(1, sizeof(*filter));
if (tmp == NULL)
{
free(filter);
archive_set_error(&a->archive, ENOMEM, "Out of memory");
return (ARCHIVE_FATAL);
}
memcpy(tmp, filter, sizeof (*filter));
filter->shadow = tmp;
}
a->filter = filter;
r2 = (bidder->init)(a->filter);
if (r2 != ARCHIVE_OK) {
Expand Down
6 changes: 6 additions & 0 deletions archive/libarchive/libarchive/archive_read_private.h
Expand Up @@ -114,6 +114,12 @@ struct archive_read_filter {
char end_of_file;
char closed;
char fatal;

/*
* Shadow structure for keeping track of metadata requests if
* metadata streaming is enabled.
*/
struct archive_read_filter *shadow;
};

/*
Expand Down
25 changes: 19 additions & 6 deletions archive/libarchive/libarchive/archive_virtual.c
Expand Up @@ -113,9 +113,11 @@ archive_write_header(struct archive *a, struct archive_entry *entry)
int rv;

++a->file_count;
a->cb_is_metadata = 1;
if (a->is_metadata_streaming)
a->cb_is_metadata = 1;
rv = (a->vtable->archive_write_header)(a, entry);
a->cb_is_metadata = 0;
if (a->is_metadata_streaming)
a->cb_is_metadata = 0;
return (rv);
}

Expand Down Expand Up @@ -148,9 +150,11 @@ archive_read_next_header(struct archive *a, struct archive_entry **entry)
{
int rv;

a->cb_is_metadata = 1;
if (a->is_metadata_streaming)
a->cb_is_metadata = 1;
rv = (a->vtable->archive_read_next_header)(a, entry);
a->cb_is_metadata = 0;
if (a->is_metadata_streaming)
a->cb_is_metadata = 0;
return (rv);
}

Expand All @@ -159,9 +163,11 @@ archive_read_next_header2(struct archive *a, struct archive_entry *entry)
{
int rv;

a->cb_is_metadata = 1;
if (a->is_metadata_streaming)
a->cb_is_metadata = 1;
rv = (a->vtable->archive_read_next_header2)(a, entry);
a->cb_is_metadata = 0;
if (a->is_metadata_streaming)
a->cb_is_metadata = 0;
return (rv);
}

Expand All @@ -177,3 +183,10 @@ archive_request_is_metadata(struct archive *a)
{
return (a->cb_is_metadata);
}

int
archive_set_metadata_streaming(struct archive *a, int flag)
{
a->is_metadata_streaming = flag;
return (0);
}
51 changes: 48 additions & 3 deletions archive/pc_archive.c
Expand Up @@ -48,7 +48,8 @@
#include <phash/phash.h>
#include <phash/extensions.h>
#include <phash/standard.h>
#include "pc_archive.h"
#include "archive/pc_archive.h"
#include "meta_stream.h"

#undef _FEATURES_H
#define _XOPEN_SOURCE 700
Expand Down Expand Up @@ -149,6 +150,24 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len
return (-1);
}

if (archive_request_is_metadata(arc) && pctx->meta_stream) {
int rv;

/*
* Send the buf pointer over to the metadata thread.
*/
rv = meta_ctx_send(pctx->meta_ctx, &buf, &len);
if (rv == 0) {
archive_set_error(arc, ARCHIVE_EOF, "Metadata Thread communication error.");
return (-1);

} else if (rv == -1) {
archive_set_error(arc, ARCHIVE_EOF, "Error reported by Metadata Thread.");
return (-1);
}
return (len);
}

if (!pctx->arc_writing) {
Sem_Wait(&(pctx->write_sem));
}
Expand Down Expand Up @@ -178,7 +197,7 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len
} else {
if (pctx->arc_buf_pos < pctx->min_chunk) {
int diff = pctx->min_chunk - (int)(pctx->arc_buf_pos);
if (len > diff)
if (len >= diff)
pctx->btype = pctx->ctype;
else
pctx->ctype = pctx->btype;
Expand Down Expand Up @@ -282,6 +301,26 @@ extract_read_callback(struct archive *arc, void *ctx, const void **buf)
return (-1);
}

if (archive_request_is_metadata(arc) && pctx->meta_stream) {
int rv;
size_t len;

/*
* Send the buf pointer over to the metadata thread.
*/
len = 0;
rv = meta_ctx_send(pctx->meta_ctx, buf, &len);
if (rv == 0) {
archive_set_error(arc, ARCHIVE_EOF, "Metadata Thread communication error.");
return (-1);

} else if (rv == -1) {
archive_set_error(arc, ARCHIVE_EOF, "Error reported by Metadata Thread.");
return (-1);
}
return (len);
}

if (!pctx->arc_writing) {
Sem_Wait(&(pctx->read_sem));
} else {
Expand All @@ -295,6 +334,7 @@ extract_read_callback(struct archive *arc, void *ctx, const void **buf)
archive_set_error(arc, ARCHIVE_EOF, "End of file when extracting archive.");
return (-1);
}

pctx->arc_writing = 1;
*buf = pctx->arc_buf;

Expand Down Expand Up @@ -817,6 +857,9 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf)
unlink(tmpfile);
return (-1);
}

if (pctx->meta_stream)
archive_set_metadata_streaming(arc, 1);
archive_write_set_format_pax_restricted(arc);
archive_write_set_bytes_per_block(arc, 0);
archive_write_open(arc, pctx, arc_open_callback,
Expand Down Expand Up @@ -861,6 +904,8 @@ setup_extractor(pc_ctx_t *pctx)
close(pipefd[0]); close(pipefd[1]);
return (-1);
}
if (pctx->meta_stream)
archive_set_metadata_streaming(arc, 1);
archive_read_support_format_all(arc);
pctx->archive_ctx = arc;
pctx->arc_writing = 0;
Expand Down Expand Up @@ -1169,7 +1214,7 @@ archiver_thread_func(void *dat) {
} else {
archive_entry_set_size(entry, archive_entry_size(entry));
}
log_msg(LOG_VERBOSE, 0, "%5d/%5d %8" PRIu64 " %s", ctr, pctx->archive_members_count,
log_msg(LOG_VERBOSE, 0, "%5d/%d %8" PRIu64 " %s", ctr, pctx->archive_members_count,
archive_entry_size(entry), name);

archive_entry_linkify(resolver, &entry, &spare_entry);
Expand Down
2 changes: 1 addition & 1 deletion archive/wavpack_helper.c
Expand Up @@ -608,7 +608,7 @@ wavpack_filter_decode(uchar_t *in_buf, size_t len, uchar_t **out_buf, ssize_t ou
}
WavpackFreeWrapper(wpc);
} else {
log_msg(LOG_ERR, 0, "Wavpack: RIFF wrapper size if zero. File corrupt?");
log_msg(LOG_ERR, 0, "Wavpack: RIFF wrapper size is zero. File corrupt?");
WavpackCloseFile(wpc);
return (0);
}
Expand Down

0 comments on commit e7081eb

Please sign in to comment.