From e7081eb5a38bd41906afe1e51074dc3dadb1cbc9 Mon Sep 17 00:00:00 2001 From: Moinak Ghosh Date: Fri, 24 Oct 2014 23:30:40 +0530 Subject: [PATCH] Git commit - rehash. Incorrect earlier commit. Implement Separate metadata stream. Fix blatant wrong check in Bzip2 compressor. Implement E8E9 filter fallback in Dispack. Improve dict buffer size checks. Reduce thread count to control memory usage in archive mode. --- Makefile.in | 4 +- README.md | 4 + archive/libarchive/libarchive/archive.h | 6 + .../libarchive/libarchive/archive_private.h | 9 + archive/libarchive/libarchive/archive_read.c | 57 ++- .../libarchive/archive_read_append_filter.c | 14 +- .../libarchive/archive_read_private.h | 6 + .../libarchive/libarchive/archive_virtual.c | 25 +- archive/pc_archive.c | 51 +- archive/wavpack_helper.c | 2 +- bzip2_compress.c | 26 +- filters/dict/DictFilter.cpp | 22 +- filters/dispack/dis.cpp | 163 ++++++- lz4_compress.c | 4 +- meta_stream.c | 440 ++++++++++++++---- meta_stream.h | 47 +- pcompress.c | 279 ++++++++--- pcompress.h | 9 +- 18 files changed, 963 insertions(+), 205 deletions(-) diff --git a/Makefile.in b/Makefile.in index 8a71942..d48e687 100644 --- a/Makefile.in +++ b/Makefile.in @@ -32,10 +32,10 @@ LIBVER=1 MAINSRCS = utils/utils.c allocator.c lzma_compress.c ppmd_compress.c \ adaptive_compress.c lzfx_compress.c lz4_compress.c none_compress.c \ utils/xxhash_base.c utils/heap.c utils/cpuid.c filters/analyzer/analyzer.c \ - pcompress.c + meta_stream.c pcompress.c MAINHDRS = allocator.h pcompress.h utils/utils.h utils/xxhash.h utils/heap.h \ utils/cpuid.h utils/xxhash.h archive/pc_archive.h filters/dispack/dis.hpp \ - filters/analyzer/analyzer.h + meta_stream.h filters/analyzer/analyzer.h MAINOBJS = $(MAINSRCS:.c=.o) PROGSRCS = main.c diff --git a/README.md b/README.md index cf1a162..f8f93f7 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,10 @@ Standard Usage The fastest checksum is the BLAKE2 family. + -T + Disable Metadata Streams. Pathname metadata is normally packed into separate + chunks distinct from file data. With this option this behavior is disabled. + Pathname of the resulting archive. A '.pz' extension is automatically added if not already present. This can also be specified as '-' in order to send diff --git a/archive/libarchive/libarchive/archive.h b/archive/libarchive/libarchive/archive.h index 7c5adef..9c99d63 100644 --- a/archive/libarchive/libarchive/archive.h +++ b/archive/libarchive/libarchive/archive.h @@ -518,6 +518,12 @@ __LA_DECL int archive_read_set_options(struct archive *_a, */ __LA_DECL int archive_request_is_metadata(struct archive *a); +/* + * Indicate whether separate metadata handling is being done by the + * callbacks. This triggers special behavior during archive read. + */ +__LA_DECL int archive_set_metadata_streaming(struct archive *a, int flag); + /*- * Convenience function to recreate the current entry (whose header * has just been read) on disk. diff --git a/archive/libarchive/libarchive/archive_private.h b/archive/libarchive/libarchive/archive_private.h index 720de77..614f47f 100644 --- a/archive/libarchive/libarchive/archive_private.h +++ b/archive/libarchive/libarchive/archive_private.h @@ -127,6 +127,15 @@ struct archive { * separately store/handle metadata and data. */ int cb_is_metadata; + + /* + * Set the metadata handling flag. This indicates to libarchive + * that callback routines are processing metadata as a separate + * stream. This means that cb_is_metadata flag is set during + * read/write. In addition, archive reads are handled differently + * using a shadow filter structure with separate copy buffer. + */ + int is_metadata_streaming; }; /* Check magic value and state; return(ARCHIVE_FATAL) if it isn't valid. */ diff --git a/archive/libarchive/libarchive/archive_read.c b/archive/libarchive/libarchive/archive_read.c index 048c316..9fdbd1a 100644 --- a/archive/libarchive/libarchive/archive_read.c +++ b/archive/libarchive/libarchive/archive_read.c @@ -453,7 +453,7 @@ int archive_read_open1(struct archive *_a) { struct archive_read *a = (struct archive_read *)_a; - struct archive_read_filter *filter, *tmp; + struct archive_read_filter *filter, *tmp = NULL; /* silence compiler */ int slot, e; unsigned int i; @@ -461,6 +461,8 @@ archive_read_open1(struct archive *_a) "archive_read_open"); archive_clear_error(&a->archive); + if (_a->is_metadata_streaming) + _a->cb_is_metadata = 1; if (a->client.reader == NULL) { archive_set_error(&a->archive, EINVAL, "No reader function provided to archive_read_open"); @@ -485,6 +487,13 @@ archive_read_open1(struct archive *_a) filter = calloc(1, sizeof(*filter)); if (filter == NULL) return (ARCHIVE_FATAL); + if (_a->is_metadata_streaming) { + tmp = calloc(1, sizeof(*filter)); + if (tmp == NULL) { + free(filter); + return (ARCHIVE_FATAL); + } + } filter->bidder = NULL; filter->upstream = NULL; filter->archive = a; @@ -497,6 +506,10 @@ archive_read_open1(struct archive *_a) filter->sswitch = client_switch_proxy; filter->name = "none"; filter->code = ARCHIVE_FILTER_NONE; + if (_a->is_metadata_streaming) { + memcpy(tmp, filter, sizeof (*filter)); + filter->shadow = tmp; + } a->client.dataset[0].begin_position = 0; if (!a->filter || !a->bypass_filter_bidding) @@ -533,6 +546,8 @@ archive_read_open1(struct archive *_a) /* Ensure libarchive starts from the first node in a multivolume set */ client_switch_proxy(a->filter, 0); + if (_a->is_metadata_streaming) + _a->cb_is_metadata = 0; return (e); } @@ -546,7 +561,7 @@ choose_filters(struct archive_read *a) { int number_bidders, i, bid, best_bid; struct archive_read_filter_bidder *bidder, *best_bidder; - struct archive_read_filter *filter; + struct archive_read_filter *filter, *tmp; ssize_t avail; int r; @@ -585,9 +600,20 @@ choose_filters(struct archive_read *a) = (struct archive_read_filter *)calloc(1, sizeof(*filter)); if (filter == NULL) return (ARCHIVE_FATAL); + if (a->archive.is_metadata_streaming) { + tmp = calloc(1, sizeof(*filter)); + if (tmp == NULL) { + free(filter); + return (ARCHIVE_FATAL); + } + } filter->bidder = best_bidder; filter->archive = a; filter->upstream = a->filter; + if (a->archive.is_metadata_streaming) { + memcpy(tmp, filter, sizeof (*filter)); + filter->shadow = tmp; + } a->filter = filter; r = (best_bidder->init)(a->filter); if (r != ARCHIVE_OK) { @@ -933,6 +959,8 @@ __archive_read_free_filters(struct archive_read *a) { while (a->filter != NULL) { struct archive_read_filter *t = a->filter->upstream; + if (a->archive.is_metadata_streaming) + free(a->filter->shadow); free(a->filter); a->filter = t; } @@ -1221,6 +1249,15 @@ __archive_read_filter_ahead(struct archive_read_filter *filter, return (NULL); } + /* + * Switch to shadow filter for metadata reads when using separate + * metadata stream. + */ + if (filter->archive->archive.is_metadata_streaming && + filter->archive->archive.cb_is_metadata) { + filter = filter->shadow; + } + /* * Keep pulling more data until we can satisfy the request. */ @@ -1397,6 +1434,15 @@ __archive_read_filter_consume(struct archive_read_filter * filter, if (request == 0) return 0; + /* + * Switch to shadow filter for metadata reads when using separate + * metadata stream. + */ + if (filter->archive->archive.is_metadata_streaming && + filter->archive->archive.cb_is_metadata) { + filter = filter->shadow; + } + skipped = advance_file_pointer(filter, request); if (skipped == request) return (skipped); @@ -1522,6 +1568,13 @@ __archive_read_filter_seek(struct archive_read_filter *filter, int64_t offset, if (filter->seek == NULL) return (ARCHIVE_FAILED); + /* + * No seeking when metadata streams are enabled. + */ + if (filter->archive->archive.is_metadata_streaming) { + return (ARCHIVE_FAILED); + } + client = &(filter->archive->client); switch (whence) { case SEEK_CUR: diff --git a/archive/libarchive/libarchive/archive_read_append_filter.c b/archive/libarchive/libarchive/archive_read_append_filter.c index 017d7c6..32fea06 100644 --- a/archive/libarchive/libarchive/archive_read_append_filter.c +++ b/archive/libarchive/libarchive/archive_read_append_filter.c @@ -40,7 +40,7 @@ archive_read_append_filter(struct archive *_a, int code) int r1, r2, number_bidders, i; char str[20]; struct archive_read_filter_bidder *bidder; - struct archive_read_filter *filter; + struct archive_read_filter *filter, *tmp; struct archive_read *a = (struct archive_read *)_a; r1 = r2 = (ARCHIVE_OK); @@ -126,6 +126,18 @@ archive_read_append_filter(struct archive *_a, int code) filter->bidder = bidder; filter->archive = a; filter->upstream = a->filter; + if (_a->is_metadata_streaming) + { + tmp = calloc(1, sizeof(*filter)); + if (tmp == NULL) + { + free(filter); + archive_set_error(&a->archive, ENOMEM, "Out of memory"); + return (ARCHIVE_FATAL); + } + memcpy(tmp, filter, sizeof (*filter)); + filter->shadow = tmp; + } a->filter = filter; r2 = (bidder->init)(a->filter); if (r2 != ARCHIVE_OK) { diff --git a/archive/libarchive/libarchive/archive_read_private.h b/archive/libarchive/libarchive/archive_read_private.h index 8a6c859..be0a890 100644 --- a/archive/libarchive/libarchive/archive_read_private.h +++ b/archive/libarchive/libarchive/archive_read_private.h @@ -114,6 +114,12 @@ struct archive_read_filter { char end_of_file; char closed; char fatal; + + /* + * Shadow structure for keeping track of metadata requests if + * metadata streaming is enabled. + */ + struct archive_read_filter *shadow; }; /* diff --git a/archive/libarchive/libarchive/archive_virtual.c b/archive/libarchive/libarchive/archive_virtual.c index 5ab26b6..7edbf52 100644 --- a/archive/libarchive/libarchive/archive_virtual.c +++ b/archive/libarchive/libarchive/archive_virtual.c @@ -113,9 +113,11 @@ archive_write_header(struct archive *a, struct archive_entry *entry) int rv; ++a->file_count; - a->cb_is_metadata = 1; + if (a->is_metadata_streaming) + a->cb_is_metadata = 1; rv = (a->vtable->archive_write_header)(a, entry); - a->cb_is_metadata = 0; + if (a->is_metadata_streaming) + a->cb_is_metadata = 0; return (rv); } @@ -148,9 +150,11 @@ archive_read_next_header(struct archive *a, struct archive_entry **entry) { int rv; - a->cb_is_metadata = 1; + if (a->is_metadata_streaming) + a->cb_is_metadata = 1; rv = (a->vtable->archive_read_next_header)(a, entry); - a->cb_is_metadata = 0; + if (a->is_metadata_streaming) + a->cb_is_metadata = 0; return (rv); } @@ -159,9 +163,11 @@ archive_read_next_header2(struct archive *a, struct archive_entry *entry) { int rv; - a->cb_is_metadata = 1; + if (a->is_metadata_streaming) + a->cb_is_metadata = 1; rv = (a->vtable->archive_read_next_header2)(a, entry); - a->cb_is_metadata = 0; + if (a->is_metadata_streaming) + a->cb_is_metadata = 0; return (rv); } @@ -177,3 +183,10 @@ archive_request_is_metadata(struct archive *a) { return (a->cb_is_metadata); } + +int +archive_set_metadata_streaming(struct archive *a, int flag) +{ + a->is_metadata_streaming = flag; + return (0); +} diff --git a/archive/pc_archive.c b/archive/pc_archive.c index cd527f4..21cb58a 100644 --- a/archive/pc_archive.c +++ b/archive/pc_archive.c @@ -48,7 +48,8 @@ #include #include #include -#include "pc_archive.h" +#include "archive/pc_archive.h" +#include "meta_stream.h" #undef _FEATURES_H #define _XOPEN_SOURCE 700 @@ -149,6 +150,24 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len return (-1); } + if (archive_request_is_metadata(arc) && pctx->meta_stream) { + int rv; + + /* + * Send the buf pointer over to the metadata thread. + */ + rv = meta_ctx_send(pctx->meta_ctx, &buf, &len); + if (rv == 0) { + archive_set_error(arc, ARCHIVE_EOF, "Metadata Thread communication error."); + return (-1); + + } else if (rv == -1) { + archive_set_error(arc, ARCHIVE_EOF, "Error reported by Metadata Thread."); + return (-1); + } + return (len); + } + if (!pctx->arc_writing) { Sem_Wait(&(pctx->write_sem)); } @@ -178,7 +197,7 @@ creat_write_callback(struct archive *arc, void *ctx, const void *buf, size_t len } else { if (pctx->arc_buf_pos < pctx->min_chunk) { int diff = pctx->min_chunk - (int)(pctx->arc_buf_pos); - if (len > diff) + if (len >= diff) pctx->btype = pctx->ctype; else pctx->ctype = pctx->btype; @@ -282,6 +301,26 @@ extract_read_callback(struct archive *arc, void *ctx, const void **buf) return (-1); } + if (archive_request_is_metadata(arc) && pctx->meta_stream) { + int rv; + size_t len; + + /* + * Send the buf pointer over to the metadata thread. + */ + len = 0; + rv = meta_ctx_send(pctx->meta_ctx, buf, &len); + if (rv == 0) { + archive_set_error(arc, ARCHIVE_EOF, "Metadata Thread communication error."); + return (-1); + + } else if (rv == -1) { + archive_set_error(arc, ARCHIVE_EOF, "Error reported by Metadata Thread."); + return (-1); + } + return (len); + } + if (!pctx->arc_writing) { Sem_Wait(&(pctx->read_sem)); } else { @@ -295,6 +334,7 @@ extract_read_callback(struct archive *arc, void *ctx, const void **buf) archive_set_error(arc, ARCHIVE_EOF, "End of file when extracting archive."); return (-1); } + pctx->arc_writing = 1; *buf = pctx->arc_buf; @@ -817,6 +857,9 @@ setup_archiver(pc_ctx_t *pctx, struct stat *sbuf) unlink(tmpfile); return (-1); } + + if (pctx->meta_stream) + archive_set_metadata_streaming(arc, 1); archive_write_set_format_pax_restricted(arc); archive_write_set_bytes_per_block(arc, 0); archive_write_open(arc, pctx, arc_open_callback, @@ -861,6 +904,8 @@ setup_extractor(pc_ctx_t *pctx) close(pipefd[0]); close(pipefd[1]); return (-1); } + if (pctx->meta_stream) + archive_set_metadata_streaming(arc, 1); archive_read_support_format_all(arc); pctx->archive_ctx = arc; pctx->arc_writing = 0; @@ -1169,7 +1214,7 @@ archiver_thread_func(void *dat) { } else { archive_entry_set_size(entry, archive_entry_size(entry)); } - log_msg(LOG_VERBOSE, 0, "%5d/%5d %8" PRIu64 " %s", ctr, pctx->archive_members_count, + log_msg(LOG_VERBOSE, 0, "%5d/%d %8" PRIu64 " %s", ctr, pctx->archive_members_count, archive_entry_size(entry), name); archive_entry_linkify(resolver, &entry, &spare_entry); diff --git a/archive/wavpack_helper.c b/archive/wavpack_helper.c index 81f32fa..4681d89 100644 --- a/archive/wavpack_helper.c +++ b/archive/wavpack_helper.c @@ -608,7 +608,7 @@ wavpack_filter_decode(uchar_t *in_buf, size_t len, uchar_t **out_buf, ssize_t ou } WavpackFreeWrapper(wpc); } else { - log_msg(LOG_ERR, 0, "Wavpack: RIFF wrapper size if zero. File corrupt?"); + log_msg(LOG_ERR, 0, "Wavpack: RIFF wrapper size is zero. File corrupt?"); WavpackCloseFile(wpc); return (0); } diff --git a/bzip2_compress.c b/bzip2_compress.c index 86b4537..3152f00 100644 --- a/bzip2_compress.c +++ b/bzip2_compress.c @@ -105,6 +105,19 @@ bzip2_compress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, char *dst1 = (char *)dst; char *src1 = (char *)src; + /* + * If the data is known to be compressed then certain types less compressed data + * can be attempted to be compressed again for a possible gain. For others it is + * a waste of time. + */ + if (PC_TYPE(btype) == TYPE_COMPRESSED && level < 7) { + int subtype = PC_SUBTYPE(btype); + + if (subtype != TYPE_COMPRESSED_LZW && subtype != TYPE_COMPRESSED_GZ && + subtype != TYPE_COMPRESSED_LZ && subtype != TYPE_COMPRESSED_LZO) { + return (-1); + } + } bzs.bzalloc = slab_alloc_i; bzs.bzfree = slab_free; bzs.opaque = NULL; @@ -174,19 +187,6 @@ bzip2_decompress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, char *dst1 = (char *)dst; char *src1 = (char *)src; - /* - * If the data is known to be compressed then certain types less compressed data - * can be attempted to be compressed again for a possible gain. For others it is - * a waste of time. - */ - if (PC_TYPE(btype) == TYPE_COMPRESSED && level < 7) { - int subtype = PC_SUBTYPE(btype); - - if (subtype != TYPE_COMPRESSED_LZW && subtype != TYPE_COMPRESSED_GZ && - subtype != TYPE_COMPRESSED_LZ && subtype != TYPE_COMPRESSED_LZO) { - return (-1); - } - } bzs.bzalloc = slab_alloc_i; bzs.bzfree = slab_free; bzs.opaque = NULL; diff --git a/filters/dict/DictFilter.cpp b/filters/dict/DictFilter.cpp index 759bd73..299b661 100644 --- a/filters/dict/DictFilter.cpp +++ b/filters/dict/DictFilter.cpp @@ -156,7 +156,7 @@ DictFilter::Forward_Dict(u8 *src, u32 size, u8 *dst, u32 *dstsize) u32 i,j,treePos = 0; u32 lastSymbol = 0; u32 dstSize = 0; - u32 idx; + int idx; for(i = 0; i < size-5;) { @@ -268,12 +268,20 @@ int dict_encode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) { DictFilter *df = static_cast(dict_ctx); - u32 fl = fromlen; - u32 dl = *dstlen; + u32 fl; + u32 dl; int atype; uchar_t *dst; DEBUG_STAT_EN(double strt, en); + /* + * Dict can't handle > 4GB buffers :-O + */ + if (fromlen > UINT32_MAX) + return (-1); + + fl = (u32)fromlen; + dl = (u32)(*dstlen); DEBUG_STAT_EN(strt = get_wtime_millis()); atype = analyze_buffer(from, fromlen); if (PC_TYPE(atype) == TYPE_TEXT) { @@ -298,11 +306,17 @@ int dict_decode(void *dict_ctx, uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) { DictFilter *df = static_cast(dict_ctx); - u32 fl = fromlen; + u32 fl; u32 dl; u8 *src; DEBUG_STAT_EN(double strt, en); + if (fromlen > UINT32_MAX) { + log_msg(LOG_ERR, 0, "Dict decode buffer too big!"); + return (-1); + } + + fl = (u32)fromlen; DEBUG_STAT_EN(strt = get_wtime_millis()); dl = U32_P(from); if (dl > *dstlen) { diff --git a/filters/dispack/dis.cpp b/filters/dispack/dis.cpp index 669a0a0..781fe25 100644 --- a/filters/dispack/dis.cpp +++ b/filters/dispack/dis.cpp @@ -150,8 +150,8 @@ using namespace std; #define DISFILTER_BLOCK (32768) #define DISFILTERED 1 -#define ORIGSIZE 2 -#define CLEAR_DISFILTER 0xfe +#define ORIGSIZE 2 +#define E8E9 4 #define NORMAL_HDR (1 + 2) #define EXTENDED_HDR (1 + 2 + 2) // Dispack min reduction should be 8%, otherwise we abort @@ -927,6 +927,135 @@ is_x86_code(uchar_t *buf, int len) return (freq[0x8b] > avgFreq && freq[0x00] > avgFreq * 2 && freq[0xE8] > 6); } +/* + * E8E9 Filter from CSC 3.2 (Fu Siyuan). This is applied to blocks that can't + * be Disfiltered. + */ +class EFilter +{ +public: + static void Forward_E89(sU8 *src, sU32 size) + { + sU32 i,j; + sS32 c; + + E89init(); + for(i=0, j=0; i < size; i++) { + c = E89forward(src[i]); + if (c >= 0) src[j++]=c; + } + while((c = E89flush()) >= 0) src[j++] = c; + } + + static void Inverse_E89( sU8* src, sU32 size) + { + sU32 i,j; + sS32 c; + + E89init(); + for(i=0, j=0; i < size; i++) { + c = E89inverse(src[i]); + if (c >= 0) src[j++]=c; + } + while((c = E89flush()) >= 0) src[j++] = c; + } + +protected: + static sU32 x0,x1; + static sU32 i,k; + static sU8 cs; // cache size, F8 - 5 bytes + + ~EFilter() {} + EFilter() {} + + static void E89init(void) + { + cs = 0xFF; + x0 = x1 = 0; + i = 0; + k = 5; + } + + static sS32 E89cache_byte(sS32 c) + { + sS32 d = cs&0x80 ? -1 : (sU8)(x1); + x1 >>= 8; + x1 |= (x0<<24); + x0 >>= 8; + x0 |= (c<<24); + cs <<= 1; i++; + return d; + } + + static sU32 E89xswap(sU32 x) + { + x<<=7; + return (x>>24)|((sU8)(x>>16)<<8)|((sU8)(x>>8)<<16)|((sU8)(x)<<(24-7)); + } + + static sU32 E89yswap(sU32 x) + { + x = ((sU8)(x>>24)<<7)|((sU8)(x>>16)<<8)|((sU8)(x>>8)<<16)|(x<<24); + return x>>7; + } + + static sS32 E89forward(sS32 c) + { + sU32 x; + if(i >= k) { + if((x1&0xFE000000) == 0xE8000000) { + k = i+4; + x= x0 - 0xFF000000; + if( x<0x02000000 ) { + x = (x+i) & 0x01FFFFFF; + x = E89xswap(x); + x0 = x + 0xFF000000; + } + } + } + return E89cache_byte(c); + } + + static sS32 E89inverse(sS32 c) + { + sU32 x; + if(i >= k) { + if((x1&0xFE000000) == 0xE8000000) { + k = i+4; + x = x0 - 0xFF000000; + if(x < 0x02000000) { + x = E89yswap(x); + x = (x-i) & 0x01FFFFFF; + x0 = x + 0xFF000000; + } + } + } + return E89cache_byte(c); + } + + static sS32 E89flush(void) + { + sS32 d; + if(cs != 0xFF) { + while(cs & 0x80) E89cache_byte(0),++cs; + d = E89cache_byte(0); ++cs; + return d; + } else { + E89init(); + return -1; + } + } +}; + +/* + * Linker weirdo! + */ +sU32 EFilter::x0; +sU32 EFilter::x1; +sU32 EFilter::i; +sU32 EFilter::k; +sU8 EFilter::cs; + #ifdef __cplusplus extern "C" { #endif @@ -941,11 +1070,14 @@ int dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) { uchar_t *pos, *hdr, type, *pos_to, *to_last; - uint64_t len; + sU32 len; #ifdef DEBUG_STATS double strt, en; #endif + if (fromlen > UINT32_MAX) + return (-1); + if (fromlen < DISFILTER_BLOCK) return (-1); @@ -953,7 +1085,7 @@ dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) strt = get_wtime_millis(); #endif pos = from; - len = fromlen; + len = (sU32)fromlen; pos_to = to; to_last = to + *dstlen; while (len > 0) { @@ -962,6 +1094,7 @@ dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) sU16 origsize; sU32 out; sU8 *rv; + int dis_tried; if (len > DISFILTER_BLOCK) sz = DISFILTER_BLOCK; @@ -980,9 +1113,11 @@ dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) } out = sz; + dis_tried = 0; if (is_x86_code(pos, sz)) { ctx.ResetCtx(0, sz); rv = DisFilter(ctx, pos, sz, 0, pos_to, out); + dis_tried = 1; } else { rv = NULL; } @@ -990,11 +1125,19 @@ dispack_encode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) if (pos_to + origsize >= to_last) { return (-1); } - type &= CLEAR_DISFILTER; + memcpy(pos_to, pos, origsize); + + /* + * If Dispack failed, we apply a simple E8E9 filter + * on the block. + */ + if (dis_tried) { + EFilter::Forward_E89(pos_to, origsize); + type |= E8E9; + } *hdr = type; hdr++; U16_P(hdr) = LE16(origsize); - memcpy(pos_to, pos, origsize); pos_to += origsize; } else { sU16 csize; @@ -1069,6 +1212,14 @@ dispack_decode(uchar_t *from, uint64_t fromlen, uchar_t *to, uint64_t *dstlen) return (-1); } memcpy(pos_to, pos, cmpsz); + + /* + * If E8E9 was applied on this block, apply the inverse transform. + * This only happens if this block was detected as x86 instruction + * stream and Dispack was tried but it failed. + */ + if (type & E8E9) + EFilter::Inverse_E89(pos_to, cmpsz); pos += cmpsz; pos_to += cmpsz; len -= cmpsz; diff --git a/lz4_compress.c b/lz4_compress.c index b0e5a36..dba0a66 100644 --- a/lz4_compress.c +++ b/lz4_compress.c @@ -20,7 +20,7 @@ * If not, see . * * moinakg@belenix.org, http://moinakg.wordpress.com/ - * + * */ #include @@ -89,7 +89,7 @@ int lz4_deinit(void **data) { struct lz4_params *lzdat = (struct lz4_params *)(*data); - + if (lzdat) { slab_free(NULL, lzdat); } diff --git a/meta_stream.c b/meta_stream.c index ba4212e..86f281e 100644 --- a/meta_stream.c +++ b/meta_stream.c @@ -34,31 +34,47 @@ #include #include #include +#include "pcompress.h" +#include "delta2/delta2.h" #include "utils/utils.h" +#include "lzma/lzma_crc.h" #include "allocator.h" #include "meta_stream.h" -extern int lz4_compress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, +#define METADATA_CHUNK_SIZE (2 * 1024 * 1024) + +extern int bzip2_compress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, int level, uchar_t chdr, int btype, void *data); +extern int bzip2_decompress(void *src, uint64_t srclen, void *dst, uint64_t *dstlen, + int level, uchar_t chdr, int btype, void *data); +extern void bzip2_props(algo_props_t *data, int level, uint64_t chunksize); enum { SRC_CHANNEL = 0, SINK_CHANNEL }; -/* - * Metadata chunk header format: - * 64-bit integer = 1: Compressed length: This indicates that this is a metadata chunk - * 64-bit integer: Real Compressed length - * 64-bit integer: Uncompressed original length - * 1 Byte: Chunk flag - * Upto 64-bytes: Checksum, HMAC if encrypting - * 32-bit integer: Header CRC32 if not encrypting - */ +struct _meta_ctx { + int meta_pipes[2]; + pc_ctx_t *pctx; + pthread_t meta_thread; + uchar_t *frombuf, *tobuf; + uint64_t frompos, topos, tosize; + uchar_t checksum[CKSUM_MAX_BYTES]; + void *bzip2_dat; + int comp_level, id; + int comp_fd; + int running; + int delta2_nstrides; + int do_compress; + mac_ctx_t chunk_hmac; + algo_props_t props; +}; + static int compress_and_write(meta_ctx_t *mctx) { - pc_ctx_t pctx = mctx->pctx; + pc_ctx_t *pctx = mctx->pctx; uchar_t type; uchar_t *comp_chunk, *tobuf; int rv; @@ -74,21 +90,42 @@ compress_and_write(meta_ctx_t *mctx) mctx->frompos, 0, 1); } - type = COMPRESSED; - U64_P(mctx->tobuf) = LE64(1); // Indicate metadata chunk - tobuf = mctx->tobuf + 8; - comp_chunk = mctx->tobuf + METADATA_HDR_SZ; - dstlen = METADATA_CHUNK_SIZE; + type = 0; + /* + * This is normally the compressed chunk size for data chunks. Here we + * set it to 1 to indicate that this is a metadata chunk. This value is + * always big-endian format. The next value is the real compressed + * chunk size. + */ + tobuf = mctx->tobuf; + U64_P(tobuf) = htonll(METADATA_INDICATOR); + comp_chunk = tobuf + METADATA_HDR_SZ; + dstlen = mctx->frompos; + + /* + * Apply Delta2 filter. + */ + rv = delta2_encode(mctx->frombuf, mctx->frompos, comp_chunk, &dstlen, + mctx->props.delta2_span, mctx->delta2_nstrides); + if (rv != -1) { + memcpy(mctx->frombuf, comp_chunk, dstlen); + mctx->frompos = dstlen; + type |= PREPROC_TYPE_DELTA2; + } else { + dstlen = mctx->frompos; + } /* * Ok, now compress. */ - rv = lz4_compress(mctx->frombuf, mctx->frompos, comp_chunk, &dstlen, mctx->level, - 0, TYPE_BINARY, mctx->lz4_dat); + rv = bzip2_compress(mctx->frombuf, mctx->frompos, comp_chunk, &dstlen, mctx->comp_level, + 0, TYPE_BINARY, mctx->bzip2_dat); if (rv < 0) { - type = UNCOMPRESSED; - memcpy(comp_chunk, mctx->frombuf, mctx->frompos); + dstlen = mctx->frompos; + memcpy(comp_chunk, mctx->frombuf, dstlen); + } else { + type |= PREPROC_COMPRESSED; } if (pctx->encrypt_type) { @@ -96,46 +133,47 @@ compress_and_write(meta_ctx_t *mctx) if (rv == -1) { pctx->main_cancel = 1; pctx->t_errored = 1; - log_msg(LOG_ERR, 0, "Metadata Encrypion failed") + log_msg(LOG_ERR, 0, "Metadata Encrypion failed"); return (0); } } /* - * Add header size to the compressed length minus the initial 64-bit value. - * That one is a flag value which is read separately during decompression. + * Store the compressed length of the data segment. While reading we have to account + * for the header. */ - dstlen += METADATA_HDR_SZ - COMPRESSED_CHUNKSIZE; - U64_P(mctx->tobuf + 8) = LE64(dstlen); - U64_P(mctx->tobuf + 16) = LE64(mctx->frompos); + U64_P(tobuf + 8) = LE64(dstlen); + U64_P(tobuf + 16) = LE64(mctx->frompos); + *(tobuf + 24) = type; + if (!pctx->encrypt_type) - serialize_checksum(mctx->checksum, mctx->tobuf + 24, pctx->cksum_bytes); + serialize_checksum(mctx->checksum, tobuf + 25, pctx->cksum_bytes); if (pctx->encrypt_type) { - uchar_t mac_ptr; uchar_t chash[pctx->mac_bytes]; unsigned int hlen; + uchar_t *mac_ptr; - mac_ptr = mctx->tobuf + 24 + pctx->cksum_bytes; // cksum_bytes will be 0 here but ... - memset(mac_ptr, 0, pctx->mac_bytes); + mac_ptr = tobuf + 25; + memset(mac_ptr, 0, CKSUM_MAX + CRC32_SIZE); hmac_reinit(&mctx->chunk_hmac); - hmac_update(&tdat->chunk_hmac, tobuf, dstlen); - hmac_final(&tdat->chunk_hmac, chash, &hlen); + hmac_update(&mctx->chunk_hmac, tobuf, dstlen + METADATA_HDR_SZ); + hmac_final(&mctx->chunk_hmac, chash, &hlen); serialize_checksum(chash, mac_ptr, hlen); } else { - uchar_t mac_ptr; uint32_t crc; + uchar_t *mac_ptr; - mac_ptr = mctx->tobuf + 24 + pctx->cksum_bytes; - memset(mac_ptr, 0, pctx->mac_bytes); - crc = lzma_crc32(tdat->cmp_seg, rbytes, 0); - U32_P(mac_ptr) = LE32(crc); + mac_ptr = tobuf + 25 + CKSUM_MAX; + memset(mac_ptr, 0, CRC32_SIZE); + crc = lzma_crc32(tobuf, METADATA_HDR_SZ, 0); + U32_P(tobuf + 25 + CKSUM_MAX) = LE32(crc); } /* * All done. Now grab lock and write. */ - dstlen += COMPRESSED_CHUNKSIZE; // The 'full' chunk now + dstlen += METADATA_HDR_SZ; // The 'full' chunk now pthread_mutex_lock(&pctx->write_mutex); wbytes = Write(mctx->comp_fd, mctx->tobuf, dstlen); pthread_mutex_unlock(&pctx->write_mutex); @@ -153,56 +191,74 @@ compress_and_write(meta_ctx_t *mctx) void meta_ctx_close_sink_channel(meta_ctx_t *mctx) { - if (mctx->pipes[SINK_CHANNEL]) { - close(mctx->pipes[SINK_CHANNEL]); - mctx->pipes[SINK_CHANNEL] = 0; + if (mctx->meta_pipes[SINK_CHANNEL]) { + close(mctx->meta_pipes[SINK_CHANNEL]); + mctx->meta_pipes[SINK_CHANNEL] = 0; } } void meta_ctx_close_src_channel(meta_ctx_t *mctx) { - if (mctx->pipes[SRC_CHANNEL]) { - close(mctx->pipes[SRC_CHANNEL]); - mctx->pipes[SRC_CHANNEL] = 0; + if (mctx->meta_pipes[SRC_CHANNEL]) { + close(mctx->meta_pipes[SRC_CHANNEL]); + mctx->meta_pipes[SRC_CHANNEL] = 0; } } +/* + * Accumulate metadata into a memory buffer. Once the buffer gets filled or + * data stream ends, the buffer is compressed and written out. + */ static void * metadata_compress(void *dat) { meta_ctx_t *mctx = (meta_ctx_t *)dat; - meta_msg_t msg; + meta_msg_t *msgp; int ack; - while (Read(mctx->meta_pipes[SINK_CHANNEL], &msg, sizeof (msg)) == sizeof (msg)) { + mctx->running = 1; + while (Read(mctx->meta_pipes[SINK_CHANNEL], &msgp, sizeof (msgp)) == sizeof (msgp)) { ack = 0; - if (mctx->frompos + msg.len > METADATA_CHUNK_SIZE) { + if (mctx->frompos + msgp->len > METADATA_CHUNK_SIZE) { + /* + * Accumulating the metadata block will overflow buffer. Compress + * and write the current buffer and then copy the new data into it. + */ if (!compress_and_write(mctx)) { Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); return (NULL); } mctx->frompos = 0; - memcpy(mctx->frombuf, msg.buf, msg.len); - mctx->frompos = msg.len; - - } else if (mctx->frompos + msg.len == METADATA_CHUNK_SIZE) { - memcpy(mctx->frombuf + mctx->frompos, msg.buf, msg.len); + memcpy(mctx->frombuf, msgp->buf, msgp->len); + mctx->frompos = msgp->len; + + } else if (mctx->frompos + msgp->len == METADATA_CHUNK_SIZE) { + /* + * Accumulating the metadata block fills the buffer. Fill it then + * compress and write the buffer. + */ + memcpy(mctx->frombuf + mctx->frompos, msgp->buf, msgp->len); if (!compress_and_write(mctx)) { Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); return (NULL); } mctx->frompos = 0; } else { - memcpy(mctx->frombuf + mctx->frompos, msg.buf, msg.len); - mctx->frompos += msg.len; + /* + * Accumulate the metadata block into the buffer for future + * compression. + */ + memcpy(mctx->frombuf + mctx->frompos, msgp->buf, msgp->len); + mctx->frompos += msgp->len; } ack = 1; - Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)) + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); } + mctx->running = 0; /* - * Flush pending chunk. + * Flush any accumulated data in the buffer. */ if (mctx->frompos) { if (!compress_and_write(mctx)) { @@ -212,16 +268,225 @@ metadata_compress(void *dat) } mctx->frompos = 0; } + return (NULL); +} + +static int +decompress_data(meta_ctx_t *mctx) +{ + uint64_t origlen, len_cmp, dstlen; + uchar_t *cbuf, *cseg, *ubuf, type; + pc_ctx_t *pctx = mctx->pctx; + uchar_t checksum[CKSUM_MAX_BYTES]; + int rv; + + cbuf = mctx->frombuf; + ubuf = mctx->tobuf; + len_cmp = LE64(U64_P(cbuf + 8)); + origlen = LE64(U64_P(cbuf + 16)); + type = *(cbuf + 24); + cseg = cbuf + METADATA_HDR_SZ; + dstlen = origlen; + + /* + * If this was encrypted: + * Verify HMAC first before anything else and then decrypt compressed data. + */ + if (pctx->encrypt_type) { + unsigned int len; + + len = pctx->mac_bytes; + deserialize_checksum(checksum, cbuf + 25, pctx->mac_bytes); + memset(cbuf + 25, 0, pctx->mac_bytes + CRC32_SIZE); + hmac_reinit(&mctx->chunk_hmac); + hmac_update(&mctx->chunk_hmac, cbuf, mctx->frompos); + hmac_final(&mctx->chunk_hmac, mctx->checksum, &len); + if (memcmp(checksum, mctx->checksum, len) != 0) { + log_msg(LOG_ERR, 0, "Metadata chunk %d, HMAC verification failed", + mctx->id); + return (0); + } + rv = crypto_buf(&(pctx->crypto_ctx), cseg, cseg, len_cmp, mctx->id); + if (rv == -1) { + /* + * Decryption failure is fatal. + */ + log_msg(LOG_ERR, 0, "Metadata chunk %d, Decryption failed", + mctx->id); + return (0); + } + } else { + uint32_t crc1, crc2; + + /* + * Verify Header CRC32 in non-crypto mode. + */ + crc1 = U32_P(cbuf + 25 + CKSUM_MAX); + memset(cbuf + 25 + CKSUM_MAX, 0, CRC32_SIZE); + crc2 = lzma_crc32(cbuf, METADATA_HDR_SZ, 0); + + if (crc1 != crc2) { + /* + * Header CRC32 verification failure is fatal. + */ + log_msg(LOG_ERR, 0, "Metadata chunk %d, Header CRC verification failed", + mctx->id); + return (0); + } + } + + if (type & PREPROC_COMPRESSED) { + rv = bzip2_decompress(cseg, len_cmp, ubuf, &dstlen, mctx->comp_level, + 0, TYPE_BINARY, mctx->bzip2_dat); + if (rv == -1) { + log_msg(LOG_ERR, 0, "Metadata chunk %d, decompression failed.", mctx->id); + return (0); + } + } else { + memcpy(ubuf, cseg, len_cmp); + } + + if (type & PREPROC_TYPE_DELTA2) { + dstlen = origlen; + rv = delta2_decode(ubuf, len_cmp, cseg, &dstlen); + if (rv == -1) { + log_msg(LOG_ERR, 0, "Metadata chunk %d, Delta2 decoding failed.", mctx->id); + return (0); + } + memcpy(ubuf, cseg, dstlen); + } + mctx->topos = 0; + mctx->tosize = dstlen; + return (1); } static void * metadata_decompress(void *dat) { + meta_ctx_t *mctx = (meta_ctx_t *)dat; + pc_ctx_t *pctx; + meta_msg_t *msgp; + int ack; + + pctx = mctx->pctx; + mctx->running = 1; + mctx->topos = mctx->tosize = 0; + mctx->id = -1; + while (Read(mctx->meta_pipes[SINK_CHANNEL], &msgp, sizeof (msgp)) == sizeof (msgp)) { + int64_t rb; + uint64_t len_cmp; + + /* + * Scan to the next metadata chunk and decompress it, if our in-memory data + * is fully consumed or not filled. + */ + if (mctx->topos == mctx->tosize) { + uchar_t *frombuf = mctx->frombuf; + + mctx->id++; + while ((rb = Read(mctx->comp_fd, &len_cmp, sizeof (len_cmp)) + == sizeof(len_cmp))) { + len_cmp = ntohll(len_cmp); + if (len_cmp != METADATA_INDICATOR) { + uint64_t skiplen; + + if (len_cmp == 0) { + /* + * We have reached the end of the file. + */ + msgp->len = 0; + ack = 1; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + skiplen = len_cmp + pctx->cksum_bytes + pctx->mac_bytes + + CHUNK_FLAG_SZ; + int64_t cpos = lseek(mctx->comp_fd, skiplen, SEEK_CUR); + if (cpos == -1) { + log_msg(LOG_ERR, 1, "Cannot find/seek next metadata block."); + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + } else { + break; + } + } + if (rb == -1) { + log_msg(LOG_ERR, 1, "Failed read from metadata fd: "); + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + + } else if (rb == 0) { + /* + * We have reached the end of the file. + */ + msgp->len = 0; + ack = 1; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + U64_P(frombuf) = htonll(len_cmp); + frombuf += 8; + + /* + * We are at the start of a metadata chunk. Read the size. + */ + if ((rb = Read(mctx->comp_fd, &len_cmp, sizeof (len_cmp))) + != sizeof(len_cmp)) { + log_msg(LOG_ERR, 1, "Failed to read size from metadata fd: %lld", rb); + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + U64_P(frombuf) = len_cmp; + frombuf += 8; + len_cmp = LE64(len_cmp); + + /* + * Now read the rest of the chunk. This is rest of the header plus the + * data segment. + */ + len_cmp = len_cmp + (METADATA_HDR_SZ - (frombuf - mctx->frombuf)); + rb = Read(mctx->comp_fd, frombuf, len_cmp); + if (rb != len_cmp) { + log_msg(LOG_ERR, 1, "Failed to read chunk from metadata fd: "); + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + mctx->topos = 0; + mctx->frompos = len_cmp + METADATA_HDR_SZ; + + /* + * Now decompress. + */ + if (!decompress_data(mctx)) { + ack = 0; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + return (NULL); + } + } + + msgp->buf = mctx->tobuf; + msgp->len = mctx->tosize; + mctx->topos = mctx->tosize; + ack = 1; + Write(mctx->meta_pipes[SINK_CHANNEL], &ack, sizeof (ack)); + } + + return (NULL); } +/* + * Create the metadata thread and associated buffers. This writes out compressed + * metadata chunks into the archive. This is libarchive metadata. + */ meta_ctx_t * -meta_ctx_create(pc_ctx_t *pctx, int file_version, int comp_fd) +meta_ctx_create(void *pc, int file_version, int comp_fd) { + pc_ctx_t *pctx = (pc_ctx_t *)pc; meta_ctx_t *mctx; mctx = (meta_ctx_t *)malloc(sizeof (meta_ctx_t)); @@ -230,8 +495,10 @@ meta_ctx_create(pc_ctx_t *pctx, int file_version, int comp_fd) return (NULL); } + mctx->running = 0; if (pctx->encrypt_type) { - if (hmac_init(&mctx->chunk_hmac, pctx->cksum, &(pctx->crypto_ctx)) == -1) { + if (hmac_init(&mctx->chunk_hmac, pctx->cksum, + &(pctx->crypto_ctx)) == -1) { (void) free(mctx); log_msg(LOG_ERR, 0, "Cannot initialize metadata hmac."); return (NULL); @@ -255,6 +522,10 @@ meta_ctx_create(pc_ctx_t *pctx, int file_version, int comp_fd) return (NULL); } + /* + * The archiver passes metadata via this socketpair. Memory buffer pointers + * are passed through the socket for speed rather than the contents. + */ mctx->pctx = pctx; if (socketpair(AF_UNIX, SOCK_STREAM, 0, mctx->meta_pipes) == -1) { (void) free(mctx->frombuf); @@ -264,60 +535,69 @@ meta_ctx_create(pc_ctx_t *pctx, int file_version, int comp_fd) return (NULL); } + if (pctx->level > 9) + mctx->delta2_nstrides = NSTRIDES_EXTRA; + else + mctx->delta2_nstrides = NSTRIDES_STANDARD; if (pctx->do_compress) { - int rv, level; + int rv; mctx->comp_level = pctx->level; - rv = lz4_init(&mctx->lz4_dat, &mctx->comp_level, 1, METADATA_CHUNK_SIZE, + rv = bzip2_init(&mctx->bzip2_dat, &mctx->comp_level, 1, METADATA_CHUNK_SIZE, file_version, COMPRESS); + bzip2_props(&mctx->props, pctx->level, METADATA_CHUNK_SIZE); if (rv != 0 || pthread_create(&(mctx->meta_thread), NULL, metadata_compress, (void *)mctx) != 0) { - (void) close(pctx->meta_pipes[0]); - (void) close(pctx->meta_pipes[1]); + (void) close(mctx->meta_pipes[0]); + (void) close(mctx->meta_pipes[1]); (void) free(mctx->frombuf); (void) free(mctx->tobuf); (void) free(mctx); if (rv == 0) - log_msg(LOG_ERR, 0, "LZ4 init failed."); + log_msg(LOG_ERR, 0, "Lzma init failed."); else log_msg(LOG_ERR, 1, "Unable to create metadata thread."); return (NULL); } } else { - int rv, level; + int rv; mctx->comp_level = pctx->level; - rv = lz4_init(&mctx->lz4_dat, &mctx->comp_level, 1, METADATA_CHUNK_SIZE, + rv = bzip2_init(&mctx->bzip2_dat, &mctx->comp_level, 1, METADATA_CHUNK_SIZE, file_version, DECOMPRESS); if (rv != 0 || pthread_create(&(mctx->meta_thread), NULL, metadata_decompress, (void *)mctx) != 0) { - (void) close(pctx->meta_pipes[0]); - (void) close(pctx->meta_pipes[1]); + (void) close(mctx->meta_pipes[0]); + (void) close(mctx->meta_pipes[1]); (void) free(mctx->frombuf); (void) free(mctx->tobuf); (void) free(mctx); if (rv == 0) - log_msg(LOG_ERR, 0, "LZ4 init failed."); + log_msg(LOG_ERR, 0, "Lzma init failed."); else log_msg(LOG_ERR, 1, "Unable to create metadata thread."); return (NULL); } } + mctx->do_compress = pctx->do_compress; return (mctx); } - int -meta_ctx_write(meta_ctx_t *mctx, meta_msg_t *msg) +meta_ctx_send(meta_ctx_t *mctx, const void **buf, size_t *len) { int ack; - meta_msg_t msg; + meta_msg_t msg, *msgp; /* * Write the message buffer to the pipe. */ - if (Write(pctx->meta_pipes[SRC_CHANNEL], &msg, sizeof (msg)) < sizeof (meta_msg_t)) { + msg.buf = *buf; + msg.len = *len; + msgp = &msg; + if (Write(mctx->meta_pipes[SRC_CHANNEL], &msgp, sizeof (msgp)) < + sizeof (msgp)) { log_msg(LOG_ERR, 1, "Meta socket write error."); return (0); } @@ -325,7 +605,8 @@ meta_ctx_write(meta_ctx_t *mctx, meta_msg_t *msg) /* * Wait for ACK. */ - if (Read(pctx->meta_pipes[SRC_CHANNEL], &ack, sizeof (ack)) < sizeof (ack)) { + if (Read(mctx->meta_pipes[SRC_CHANNEL], &ack, sizeof (ack)) < + sizeof (ack)) { log_msg(LOG_ERR, 1, "Meta socket read error."); return (0); } @@ -334,16 +615,19 @@ meta_ctx_write(meta_ctx_t *mctx, meta_msg_t *msg) return (-1); } + *len = msg.len; + *buf = msg.buf; return (1); } -int -meta_ctx_read(meta_ctx_t *mctx, meta_msg_t *msg) -{ -} - int meta_ctx_done(meta_ctx_t *mctx) { + meta_ctx_close_src_channel(mctx); + meta_ctx_close_sink_channel(mctx); + if (!mctx->do_compress) + close(mctx->comp_fd); + pthread_join(mctx->meta_thread, NULL); + return (0); } diff --git a/meta_stream.h b/meta_stream.h index 65e34eb..d34cc17 100644 --- a/meta_stream.h +++ b/meta_stream.h @@ -23,39 +23,40 @@ * */ -#ifndef _PCOMPRESS_H -#define _PCOMPRESS_H - -#include -#include "utils/utils.h" +#ifndef _META_STREAM_H +#define _META_STREAM_H #ifdef __cplusplus extern "C" { #endif -#define METADATA_CHUNK_SIZE (2 * 1024 * 1024) -#define METADATA_HDR_SZ (CHUNK_HDR_SZ + COMPRESSED_CHUNKSIZE + pctx->mac_bytes) +/* + * The chunk size value which indicates a metadata chunk. + */ +#define METADATA_INDICATOR 1 + +/* + * Metadata chunk header format: + * 64-bit integer = 1: Compressed length: This indicates that this is a metadata chunk + * 64-bit integer: Compressed length (data portion only) + * 64-bit integer: Uncompressed original length + * 1 Byte: Chunk flag + * Upto 64-bytes: Checksum. This is HMAC if encrypting + * 32-bit integer: Header CRC32 if not encrypting, otherwise empty. + */ +#define CKSUM_MAX 64 +#define CRC32_SIZE 4 +#define METADATA_HDR_SZ (8 * 3 + 1 + CKSUM_MAX + CRC32_SIZE) -struct _meta_ctx { - int meta_pipes[2]; - pc_ctx_t *pctx; - pthread_t meta_thread; - uchar_t *frombuf, *tobuf; - uint64_t frompos; - uchar_t checksum[CKSUM_MAX_BYTES]; - void *lz4_dat; - int comp_level; - mac_ctx_t chunk_hmac; -} meta_ctx_t; +typedef struct _meta_ctx meta_ctx_t; -struct _meta_msg { - uchar_t *buf; +typedef struct _meta_msg { + const uchar_t *buf; size_t len; } meta_msg_t; -meta_ctx_t *meta_ctx_create(pc_ctx_t *pctx, int file_version int comp_fd); -int meta_ctx_write(meta_ctx_t *mctx, meta_msg_t *msg); -int meta_ctx_read(meta_ctx_t *mctx, meta_msg_t *msg); +meta_ctx_t *meta_ctx_create(void *pc, int file_version, int comp_fd); +int meta_ctx_send(meta_ctx_t *mctx, const void **buf, size_t *len); int meta_ctx_done(meta_ctx_t *mctx); void meta_ctx_close_sink_channel(meta_ctx_t *mctx); void meta_ctx_close_src_channel(meta_ctx_t *mctx); diff --git a/pcompress.c b/pcompress.c index ea05d12..9ab84f9 100644 --- a/pcompress.c +++ b/pcompress.c @@ -109,6 +109,7 @@ usage(pc_ctx_t *pctx) " -v Enables verbose mode.\n\n" " -t \n" " Sets the number of compression threads. Default: core count.\n" +" -T Disable separate metadata stream.\n" " -S \n" " The chunk verification checksum. Default: BLAKE256. Others are: CRC64, SHA256,\n" " SHA512, KECCAK256, KECCAK512, BLAKE256, BLAKE512.\n" @@ -206,7 +207,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t algo_props_t *props, int interesting) { uchar_t *dest = (uchar_t *)dst, type = 0; - int64_t result; + int result; uint64_t _dstlen, fromlen; uchar_t *from, *to; int stype, dict; @@ -264,6 +265,7 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t #ifndef _MPLV2_LICENSE_ if (pctx->lzp_preprocess && stype != TYPE_BMP && stype != TYPE_TIFF) { int hashsize; + int64_t result; hashsize = lzp_hash_size(level); result = lzp_compress((const uchar_t *)from, to, fromlen, @@ -310,7 +312,8 @@ preproc_compress(pc_ctx_t *pctx, compress_func_ptr cmp_func, void *src, uint64_t U64_P(dest + 1) = htonll(srclen); _dstlen = srclen; DEBUG_STAT_EN(strt = get_wtime_millis()); - result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, (dict?TYPE_TEXT:btype), data); + result = cmp_func(src, srclen, dest+9, &_dstlen, level, chdr, + (dict?TYPE_TEXT:btype), data); DEBUG_STAT_EN(en = get_wtime_millis()); if (result > -1 && _dstlen < srclen) { @@ -341,7 +344,7 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64 algo_props_t *props) { uchar_t *sorc = (uchar_t *)src, type; - int64_t result; + int result; uint64_t _dstlen = *dstlen, _dstlen1 = *dstlen; DEBUG_STAT_EN(double strt, en); @@ -380,6 +383,8 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64 if (type & PREPROC_TYPE_LZP) { #ifndef _MPLV2_LICENSE_ int hashsize; + int64_t result; + hashsize = lzp_hash_size(level); result = lzp_decompress((const uchar_t *)src, (uchar_t *)dst, srclen, hashsize, LZP_DEFAULT_LZPMINLEN, 0); @@ -390,10 +395,11 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64 _dstlen = result; } else { log_msg(LOG_ERR, 0, "LZP decompression failed."); - return (result); + return ((int)result); } #else - log_msg(LOG_ERR, 0, "LZP feature not available in this build (MPLv2). Aborting."); + log_msg(LOG_ERR, 0, "LZP feature not available in this build" + " (MPLv2). Aborting."); return (-1); #endif } @@ -422,7 +428,8 @@ preproc_decompress(pc_ctx_t *pctx, compress_func_ptr dec_func, void *src, uint64 } } - if (!(type & (PREPROC_COMPRESSED|PREPROC_TYPE_DELTA2|PREPROC_TYPE_LZP|PREPROC_TYPE_DISPACK)) + if (!(type & (PREPROC_COMPRESSED|PREPROC_TYPE_DELTA2|PREPROC_TYPE_LZP| + PREPROC_TYPE_DISPACK|PREPROC_TYPE_DICT)) && type > 0) { log_msg(LOG_ERR, 0, "Invalid preprocessing flags: %d", type); return (-1); @@ -526,6 +533,7 @@ perform_decompress(void *dat) /* * Decryption failure is fatal. */ + log_msg(LOG_ERR, 0, "Chunk %d, Decryption failed", tdat->id); pctx->main_cancel = 1; tdat->len_cmp = 0; Sem_Post(&tdat->cmp_done_sem); @@ -589,8 +597,9 @@ perform_decompress(void *dat) ubuf = tdat->uncompressed_chunk + RABIN_HDR_SIZE + dedupe_index_sz; if (HDR & COMPRESSED) { if (HDR & CHUNK_FLAG_PREPROC) { - rv = preproc_decompress(pctx, tdat->decompress, cmpbuf, dedupe_data_sz_cmp, - ubuf, &_chunksize, tdat->level, HDR, pctx->btype, tdat->data, tdat->props); + rv = preproc_decompress(pctx, tdat->decompress, cmpbuf, + dedupe_data_sz_cmp, ubuf, &_chunksize, tdat->level, + HDR, pctx->btype, tdat->data, tdat->props); } else { DEBUG_STAT_EN(double strt, en); @@ -753,10 +762,10 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) char algorithm[ALGO_SZ]; struct stat sbuf; struct wdata w; - int compfd = -1, p, dedupe_flag; + int compfd = -1, compfd2 = -1, p, dedupe_flag; int uncompfd = -1, err, np, bail; - int nprocs = 1, thread = 0, level; - unsigned int i; + int thread = 0, level; + uint32_t nprocs = 1, i; unsigned short version, flags; int64_t chunksize, compressed_chunksize; struct cmp_data **dary, *tdat; @@ -774,6 +783,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) */ if (!pctx->pipe_mode) { if (filename == NULL) { + pctx->pipe_mode = 1; compfd = fileno(stdin); if (compfd == -1) { log_msg(LOG_ERR, 1, "fileno "); @@ -849,7 +859,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) err = 1; goto uncomp_done; } - if (version < VERSION-3) { + if (version < VERSION-4) { log_msg(LOG_ERR, 0, "Unsupported version: %d", version); err = 1; goto uncomp_done; @@ -859,6 +869,17 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) * First check for archive mode. In that case the to_filename must be a directory. */ if (flags & FLAG_ARCHIVE) { + if (flags & FLAG_META_STREAM && version > 9) + pctx->meta_stream = 1; + + /* + * Archives with metadata streams cannot be decoded in pipe mode. + */ + if (pctx->pipe_mode && pctx->meta_stream) { + log_msg(LOG_ERR, 0, + "Cannot extract archive with metadata stream in pipe mode."); + } + /* * If to_filename is not set, we just use the current directory. */ @@ -873,13 +894,16 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) err = 1; goto uncomp_done; } - if (mkdir(to_filename, S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH) == -1) { - log_msg(LOG_ERR, 1, "Unable to create target directory %s.", to_filename); + if (mkdir(to_filename, + S_IRUSR|S_IWUSR|S_IXUSR|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH) == -1) { + log_msg(LOG_ERR, 1, "Unable to create target directory %s.", + to_filename); err = 1; goto uncomp_done; } if (stat(to_filename, &sbuf) == -1) { - log_msg(LOG_ERR, 1, "Unable to correctly create target directory %s.", to_filename); + log_msg(LOG_ERR, 1, "Unable to correctly create target directory %s.", + to_filename); err = 1; goto uncomp_done; } @@ -889,11 +913,24 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) err = 1; goto uncomp_done; } + + /* + * Open another fd to the compressed archive. This is used by the metadata + * thread. + */ + if (pctx->meta_stream) { + if ((compfd2 = open(filename, O_RDONLY, 0)) == -1) { + log_msg(LOG_ERR, 1, "Cannot open: %s", filename); + err = 1; + goto uncomp_done; + } + } } else { const char *origf; if (pctx->list_mode) { - log_msg(LOG_ERR, 0, "Nothing to list. The compressed file is not an archive."); + log_msg(LOG_ERR, 0, "Nothing to list. The compressed file " + "is not an archive."); err = 1; goto uncomp_done; } @@ -908,7 +945,8 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) pctx->to_filename = pctx->archive_members_file; pos = strrchr(filename, '.'); if (pos != NULL) { - if ((pos[0] == 'p' || pos[0] == 'P') && (pos[1] == 'z' || pos[1] == 'Z')) { + if ((pos[0] == 'p' || pos[0] == 'P') && + (pos[1] == 'z' || pos[1] == 'Z')) { memcpy(to_filename, filename, pos - filename); } else { pos = NULL; @@ -987,8 +1025,10 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) if (pctx->cksum == CKSUM_BLAKE256) pctx->cksum = CKSUM_SKEIN256; if (pctx->cksum == CKSUM_BLAKE512) pctx->cksum = CKSUM_SKEIN512; } - if (get_checksum_props(NULL, &(pctx->cksum), &(pctx->cksum_bytes), &(pctx->mac_bytes), 1) == -1) { - log_msg(LOG_ERR, 0, "Invalid checksum algorithm code: %d. File corrupt ?", pctx->cksum); + if (get_checksum_props(NULL, &(pctx->cksum), &(pctx->cksum_bytes), + &(pctx->mac_bytes), 1) == -1) { + log_msg(LOG_ERR, 0, "Invalid checksum algorithm code: %d. " + "File corrupt ?", pctx->cksum); UNCOMP_BAIL; } @@ -1111,7 +1151,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) if (pw_len != -1) { if (pw_len > MAX_PW_LEN) pw_len = MAX_PW_LEN-1; lseek(fd, 0, SEEK_SET); - len = Read(fd, pw, pw_len); + len = (int)Read(fd, pw, pw_len); if (len != -1 && len == pw_len) { pw[pw_len] = '\0'; if (isspace(pw[pw_len - 1])) @@ -1139,7 +1179,8 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) if (pctx->user_pw) { if (init_crypto(&(pctx->crypto_ctx), pctx->user_pw, pctx->user_pw_len, - pctx->encrypt_type, salt2, saltlen, pctx->keylen, nonce, DECRYPT_FLAG) == -1) { + pctx->encrypt_type, salt2, saltlen, pctx->keylen, nonce, + DECRYPT_FLAG) == -1) { memset(salt2, 0, saltlen); free(salt2); memset(salt1, 0, saltlen); @@ -1246,6 +1287,32 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) } add_fname(pctx->archive_temp_file); } + + /* + * If we are having a metadata stream, get the current position of the main + * fd. The secondary fd must be set to the same position so that metadata + * thread can start scanning for chunks after the header and any info chunks. + * + * NOTE: This is done here to allow setup_extractor() call later to work. + */ + if (pctx->meta_stream) { + off_t cpos = lseek(compfd, 0, SEEK_CUR); + cpos = lseek(compfd2, cpos, SEEK_SET); + if (cpos == -1) { + log_msg(LOG_ERR, 1, "Can't seek in metadata fd: "); + UNCOMP_BAIL; + } + + /* + * Finally create the metadata context. + */ + pctx->meta_ctx = meta_ctx_create(pctx, VERSION, compfd2); + if (pctx->meta_ctx == NULL) { + close(compfd2); + UNCOMP_BAIL; + } + } + uncompfd = -1; if (setup_extractor(pctx) == -1) { log_msg(LOG_ERR, 0, "Setup of extraction context failed."); @@ -1272,7 +1339,16 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) } } - nprocs = sysconf(_SC_NPROCESSORS_ONLN); + /* + * WARNING: NO Further file header/info chunk processing beyond this point. + * Doing so will BREAK Separate Metadata stream processing. + */ + + nprocs = (uint32_t)sysconf(_SC_NPROCESSORS_ONLN); + if (pctx->archive_mode) { + nprocs = nprocs > 1 ? nprocs-1:nprocs; + } + if (pctx->nthreads > 0 && pctx->nthreads < nprocs) nprocs = pctx->nthreads; else @@ -1333,9 +1409,9 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) * The last parameter is freeram. It is not needed during decompression. */ if (pctx->enable_rabin_scan || pctx->enable_fixed_scan || pctx->enable_rabin_global) { - tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, pctx->rab_blk_size, - pctx->algo, &props, pctx->enable_delta_encode, dedupe_flag, version, DECOMPRESS, 0, - NULL, pctx->pipe_mode, nprocs, 0); + tdat->rctx = create_dedupe_context(chunksize, compressed_chunksize, + pctx->rab_blk_size, pctx->algo, &props, pctx->enable_delta_encode, + dedupe_flag, version, DECOMPRESS, 0, NULL, pctx->pipe_mode, nprocs, 0); if (tdat->rctx == NULL) { UNCOMP_BAIL; } @@ -1348,7 +1424,8 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) UNCOMP_BAIL; } } else { - if ((tdat->rctx->out_fd = open(to_filename, O_RDONLY, 0)) == -1) { + if ((tdat->rctx->out_fd = open(to_filename, O_RDONLY, 0)) + == -1) { log_msg(LOG_ERR, 1, "Unable to get new read handle" " to output file"); UNCOMP_BAIL; @@ -1397,6 +1474,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) log_msg(LOG_ERR, 1, "Error in thread creation: "); UNCOMP_BAIL; } + thread = 2; /* * Now read from the compressed file in variable compressed chunk size. @@ -1419,6 +1497,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) tdat->id = pctx->chunk_num; if (tdat->rctx) tdat->rctx->id = tdat->id; +redo: /* * First read length of compressed chunk. */ @@ -1448,6 +1527,30 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) if (tdat->len_cmp == 0) { bail = 1; break; + + } else if (tdat->len_cmp == METADATA_INDICATOR) { + if (!pctx->meta_stream) { + log_msg(LOG_ERR, 0, "Invalid chunk %d length: %" PRIu64 "\n", + pctx->chunk_num, tdat->len_cmp); + UNCOMP_BAIL; + } + /* + * If compressed length indicates a metadata chunk. Read it's length + * and skip the chunk. + */ + rb = Read(compfd, &tdat->len_cmp_be, sizeof (tdat->len_cmp_be)); + if (rb != sizeof (tdat->len_cmp_be)) { + if (rb < 0) log_msg(LOG_ERR, 1, "Read: "); + else + log_msg(LOG_ERR, 0, "Incomplete chunk %d header," + "file corrupt", pctx->chunk_num); + UNCOMP_BAIL; + } + + /* + * We will be reading and skipping this chunk next. + */ + tdat->len_cmp_be = LE64(tdat->len_cmp_be); } /* @@ -1457,7 +1560,7 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) * decompression allows to avoid allocating per-thread chunks which will * never be used. This can happen if chunk count < thread count. */ - if (!tdat->compressed_chunk) { + if (!tdat->compressed_chunk && tdat->len_cmp != METADATA_INDICATOR) { tdat->compressed_chunk = (uchar_t *)slab_alloc(NULL, compressed_chunksize); if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan)) @@ -1473,19 +1576,30 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) tdat->cmp_seg = tdat->uncompressed_chunk; } - if (tdat->len_cmp > pctx->largest_chunk) - pctx->largest_chunk = tdat->len_cmp; - if (tdat->len_cmp < pctx->smallest_chunk) - pctx->smallest_chunk = tdat->len_cmp; - pctx->avg_chunk += tdat->len_cmp; + if (tdat->len_cmp != METADATA_INDICATOR) { + if (tdat->len_cmp > pctx->largest_chunk) + pctx->largest_chunk = tdat->len_cmp; + if (tdat->len_cmp < pctx->smallest_chunk) + pctx->smallest_chunk = tdat->len_cmp; + pctx->avg_chunk += tdat->len_cmp; - /* - * Now read compressed chunk including the checksum. - */ - tdat->rbytes = Read(compfd, tdat->compressed_chunk, - tdat->len_cmp + pctx->cksum_bytes + pctx->mac_bytes + CHUNK_FLAG_SZ); + /* + * Now read compressed chunk including the checksum. + */ + rb = tdat->len_cmp + pctx->cksum_bytes + pctx->mac_bytes + + CHUNK_FLAG_SZ; + tdat->rbytes = Read(compfd, tdat->compressed_chunk, rb); + } else { + int64_t cpos = lseek(compfd, 0, SEEK_CUR); + + /* Two values already read */ + rb = tdat->len_cmp_be + METADATA_HDR_SZ - 16; + tdat->rbytes = lseek(compfd, rb, SEEK_CUR); + if (tdat->rbytes > 0) + tdat->rbytes = tdat->rbytes - cpos; + } if (pctx->main_cancel) break; - if (tdat->rbytes < tdat->len_cmp + pctx->cksum_bytes + pctx->mac_bytes + CHUNK_FLAG_SZ) { + if (tdat->rbytes < rb) { if (tdat->rbytes < 0) { log_msg(LOG_ERR, 1, "Read: "); UNCOMP_BAIL; @@ -1495,6 +1609,13 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) UNCOMP_BAIL; } } + + /* + * If we just skipped a metadata chunk, redo the read to go to the next one. + */ + if (tdat->len_cmp == METADATA_INDICATOR) { + goto redo; + } Sem_Post(&tdat->start_sem); ++(pctx->chunk_num); } @@ -1519,7 +1640,8 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) Sem_Post(&tdat->cmp_done_sem); pthread_join(tdat->thr, NULL); } - pthread_join(writer_thr, NULL); + if (thread == 2) + pthread_join(writer_thr, NULL); } /* @@ -1556,6 +1678,8 @@ start_decompress(pc_ctx_t *pctx, const char *filename, char *to_filename) if (uncompfd != -1) close(uncompfd); } if (pctx->archive_mode) { + if (pctx->meta_stream) + meta_ctx_done(pctx->meta_ctx); pthread_join(pctx->archive_thread, NULL); if (pctx->enable_rabin_global) { close(pctx->archive_temp_fd); @@ -1782,7 +1906,8 @@ perform_compress(void *dat) { len_cmp = tdat->len_cmp; *((typeof (len_cmp) *)(tdat->cmp_seg)) = htonll(tdat->len_cmp); if (!pctx->encrypt_type) - serialize_checksum(tdat->checksum, tdat->cmp_seg + sizeof (tdat->len_cmp), pctx->cksum_bytes); + serialize_checksum(tdat->checksum, tdat->cmp_seg + sizeof (tdat->len_cmp), + pctx->cksum_bytes); tdat->len_cmp += CHUNK_FLAG_SZ; tdat->len_cmp += sizeof (len_cmp); tdat->len_cmp += (pctx->cksum_bytes + pctx->mac_bytes); @@ -1879,14 +2004,16 @@ writer_thread(void *dat) { if (pctx->archive_mode && tdat->decompressing) { wbytes = archiver_write(pctx, tdat->cmp_seg, tdat->len_cmp); } else { + pthread_mutex_lock(&pctx->write_mutex); wbytes = Write(w->wfd, tdat->cmp_seg, tdat->len_cmp); + pthread_mutex_unlock(&pctx->write_mutex); } if (pctx->archive_temp_fd != -1 && wbytes == tdat->len_cmp) { wbytes = Write(pctx->archive_temp_fd, tdat->cmp_seg, tdat->len_cmp); } if (unlikely(wbytes != tdat->len_cmp)) { - log_msg(LOG_ERR, 1, "Chunk Write (expected: %" PRIu64 ", written: %" PRIu64 ") : ", - tdat->len_cmp, wbytes); + log_msg(LOG_ERR, 1, "Chunk Write (expected: %" PRIu64 + ", written: %" PRId64 ") : ", tdat->len_cmp, wbytes); do_cancel: pctx->main_cancel = 1; tdat->cancel = 1; @@ -1922,7 +2049,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev unsigned short version, flags; struct stat sbuf; int compfd = -1, uncompfd = -1, err; - int thread, wthread, bail, single_chunk; + int thread, bail, single_chunk; uint32_t i, nprocs, np, p, dedupe_flag; struct cmp_data **dary = NULL, *tdat; pthread_t writer_thr; @@ -1940,7 +2067,6 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev sbuf.st_size = 0; err = 0; thread = 0; - wthread = 0; dedupe_flag = RABIN_DEDUPE_SEGMENTED; // Silence the compiler compressed_chunksize = 0; @@ -1966,11 +2092,11 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev memset(zero, 0, MAX_PW_LEN); fd = open(pctx->pwd_file, O_RDWR); if (fd != -1) { - pw_len = lseek(fd, 0, SEEK_END); + pw_len = (int)lseek(fd, 0, SEEK_END); if (pw_len != -1) { if (pw_len > MAX_PW_LEN) pw_len = MAX_PW_LEN-1; lseek(fd, 0, SEEK_SET); - len = Read(fd, pw, pw_len); + len = (int)Read(fd, pw, pw_len); if (len != -1 && len == pw_len) { pw[pw_len] = '\0'; if (isspace(pw[pw_len - 1])) @@ -1989,8 +2115,8 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev close(fd); } if (pctx->user_pw) { - if (init_crypto(&(pctx->crypto_ctx), pctx->user_pw, pctx->user_pw_len, pctx->encrypt_type, - NULL, 0, pctx->keylen, 0, ENCRYPT_FLAG) == -1) { + if (init_crypto(&(pctx->crypto_ctx), pctx->user_pw, pctx->user_pw_len, + pctx->encrypt_type, NULL, 0, pctx->keylen, 0, ENCRYPT_FLAG) == -1) { memset(pctx->user_pw, 0, pctx->user_pw_len); log_msg(LOG_ERR, 0, "Failed to initialize crypto."); return (1); @@ -2016,7 +2142,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev * Get number of lCPUs. When archiving with advanced filters, we use one less * lCPU to reduce threads due to increased memory requirements. */ - nprocs = sysconf(_SC_NPROCESSORS_ONLN); + nprocs = (uint32_t)sysconf(_SC_NPROCESSORS_ONLN); if (pctx->archive_mode && (pctx->enable_packjpg || pctx->enable_wavpack)) { nprocs = nprocs > 1 ? nprocs-1:nprocs; } @@ -2093,7 +2219,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev } } else { if (pctx->nthreads == 0 || pctx->nthreads > sbuf.st_size / chunksize) { - pctx->nthreads = sbuf.st_size / chunksize; + pctx->nthreads = (int)(sbuf.st_size / chunksize); if (sbuf.st_size % chunksize) pctx->nthreads++; } @@ -2183,15 +2309,16 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev my_sysinfo msys_info; get_sys_limits(&msys_info); - global_dedupe_bufadjust(pctx->rab_blk_size, &chunksize, 0, pctx->algo, pctx->cksum, - CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, pctx->nthreads, pctx->pipe_mode); + global_dedupe_bufadjust(pctx->rab_blk_size, &chunksize, 0, pctx->algo, + pctx->cksum, CKSUM_BLAKE256, sbuf.st_size, msys_info.freeram, + pctx->nthreads, pctx->pipe_mode); } /* * Compressed buffer size must include zlib/dedup scratch space and * chunk header space. * See http://www.zlib.net/manual.html#compress2 - * + * * We do this unconditionally whether user mentioned zlib or not * to keep it simple. While zlib scratch space is only needed at * runtime, chunk header is stored in the file. @@ -2321,6 +2448,18 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev COMP_BAIL; } } + + /* + * Now create the metadata handler context. This is relevant in archive mode where + * the underlying libarchive metadata is compressed into a separate stream of + * metadata chunks. + */ + if (pctx->meta_stream) { + pctx->meta_ctx = meta_ctx_create(pctx, VERSION, compfd); + if (pctx->meta_ctx == NULL) { + COMP_BAIL; + } + } thread = 1; /* @@ -2373,7 +2512,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev log_msg(LOG_ERR, 1, "Error in thread creation: "); COMP_BAIL; } - wthread = 1; + thread = 2; /* * Start the archiver thread if needed. @@ -2420,7 +2559,8 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev pos += 8; } else if (pctx->encrypt_type == CRYPTO_ALG_SALSA20) { - serialize_checksum(crypto_nonce(&(pctx->crypto_ctx)), pos, XSALSA20_CRYPTO_NONCEBYTES); + serialize_checksum(crypto_nonce(&(pctx->crypto_ctx)), pos, + XSALSA20_CRYPTO_NONCEBYTES); pos += XSALSA20_CRYPTO_NONCEBYTES; } *((int *)pos) = htonl(pctx->keylen); @@ -2613,8 +2753,10 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev */ if (!pctx->pipe_mode) { if (uncompfd != -1) close(uncompfd); - if (pctx->archive_mode) - archiver_close(pctx); + } + if (pctx->meta_stream) { + meta_ctx_done(pctx->meta_ctx); + archiver_close(pctx); } if (pctx->t_errored) err = pctx->t_errored; @@ -2629,7 +2771,7 @@ start_compress(pc_ctx_t *pctx, const char *filename, uint64_t chunksize, int lev if (pctx->encrypt_type) hmac_cleanup(&tdat->chunk_hmac); } - if (wthread) + if (thread == 2) pthread_join(writer_thr, NULL); } @@ -2867,6 +3009,7 @@ create_pc_context(void) ctx->pagesize = sysconf(_SC_PAGE_SIZE); ctx->btype = TYPE_UNKNOWN; ctx->delta2_nstrides = NSTRIDES_STANDARD; + pthread_mutex_init(&ctx->write_mutex, NULL); return (ctx); } @@ -2923,7 +3066,7 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) ff.enable_wavpack = 0; pthread_mutex_lock(&opt_parse); - while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:LPS:B:Fk:avmKjxi")) != -1) { + while ((opt = getopt(argc, argv, "dc:s:l:pt:MCDGEe:w:LPS:B:Fk:avmKjxiT")) != -1) { int ovr; int64_t chunksize; @@ -3099,6 +3242,9 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) pctx->dispack_preprocess = 1; break; + case 'T': + pctx->meta_stream = -1; + case '?': default: return (2); @@ -3214,7 +3360,8 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) */ if ((pctx->dispack_preprocess || ff.enable_packjpg || ff.enable_wavpack) && !pctx->archive_mode) { - log_msg(LOG_ERR, 0, "Dispack Executable Preprocessor and PackJPG are only valid when archiving."); + log_msg(LOG_ERR, 0, "Dispack Executable Preprocessor and PackJPG are " + "only valid when archiving."); return (1); } @@ -3337,7 +3484,8 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) pctx->main_cancel = 0; if (pctx->cksum == 0) - get_checksum_props(DEFAULT_CKSUM, &(pctx->cksum), &(pctx->cksum_bytes), &(pctx->mac_bytes), 0); + get_checksum_props(DEFAULT_CKSUM, &(pctx->cksum), &(pctx->cksum_bytes), + &(pctx->mac_bytes), 0); if ((pctx->enable_rabin_scan || pctx->enable_fixed_scan) && pctx->cksum == CKSUM_CRC64) { log_msg(LOG_ERR, 0, "CRC64 checksum is not suitable for Deduplication."); @@ -3382,6 +3530,12 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) pctx->enable_packjpg = ff.enable_packjpg; pctx->enable_wavpack = ff.enable_wavpack; if (pctx->level > 8) pctx->dispack_preprocess = 1; + if (pctx->meta_stream != -1) + pctx->meta_stream = 1; + else + pctx->meta_stream = 0; + if (pctx->pipe_mode) + pctx->meta_stream = 0; } /* @@ -3410,7 +3564,8 @@ init_pc_context(pc_ctx_t *pctx, int argc, char *argv[]) if (pctx->level < 9) { pctx->chunksize = DEFAULT_CHUNKSIZE; } else { - pctx->chunksize = DEFAULT_CHUNKSIZE + (pctx->level - 8) * DEFAULT_CHUNKSIZE/4; + pctx->chunksize = DEFAULT_CHUNKSIZE + (pctx->level - 8) * + DEFAULT_CHUNKSIZE/4; } } } else if (pctx->do_uncompress) { diff --git a/pcompress.h b/pcompress.h index 5546463..c4683e1 100644 --- a/pcompress.h +++ b/pcompress.h @@ -36,14 +36,16 @@ extern "C" { #include #include +#include "meta_stream.h" #define CHUNK_FLAG_SZ 1 #define ALGO_SZ 8 #define MIN_CHUNK 2048 -#define VERSION 9 +#define VERSION 10 #define FLAG_DEDUP 1 #define FLAG_DEDUP_FIXED 2 #define FLAG_SINGLE_CHUNK 4 +#define FLAG_META_STREAM 256 #define FLAG_ARCHIVE 2048 #define UTILITY_VERSION "3.1" #define MASK_CRYPTO_ALG 0x30 @@ -220,10 +222,11 @@ typedef struct pc_ctx { int encrypt_type; int archive_mode; int enable_archive_sort; - int pagesize; + long pagesize; int force_archive_perms; int no_overwrite_newer; int advanced_opts; + int meta_stream; /* * Archiving related context data. @@ -241,6 +244,7 @@ typedef struct pc_ctx { uint64_t temp_mmap_len; struct fn_list *fn; Sem_t read_sem, write_sem; + pthread_mutex_t write_mutex; uchar_t *arc_buf; uint64_t arc_buf_size, arc_buf_pos; int arc_closed, arc_writing; @@ -268,6 +272,7 @@ typedef struct pc_ctx { unsigned char *user_pw; int user_pw_len; char *pwd_file, *f_name; + meta_ctx_t *meta_ctx; } pc_ctx_t; /*