From 24da0c63c80be64e1adc3f24c27459ebe18a19af Mon Sep 17 00:00:00 2001 From: Phillip Lougher Date: Sun, 30 Jun 2019 02:40:12 +0100 Subject: [PATCH] Make Squashfs filesystem creation reproducible Ever since Mksquashfs was parallelised back in 2006, there has been a certain randomness in how fragments and multi-block files are ordered in the output filesystem even if the input remains the same. This is because the multiple parallel threads can be scheduled differently between Mksquashfs runs. For example, the thread given fragment 10 to compress may finish before the thread given fragment 9 to compress on one run (writing fragment 10 to the output filesystem before fragment 9), but, on the next run it could be vice-versa. There are many different scheduling scenarios here, all of which can have a knock on effect causing different scheduling and ordering later in the filesystem too. Mkquashfs doesn't care about the ordering of fragments and multi-block files within the filesystem, as this does not affect the correctness of the filesystem. In fact not caring about the ordering, as it doesn't matter, allows Mksquashfs to run as fast as possible, maximising CPU and I/O performance. But, in the last couple of years, Squashfs has become used in scenarios (cloud etc) where this randomness is causing problems. Specifically this appears to be where downloaders, installers etc. try to work out the differences between Squashfs filesystem updates to minimise the amount of data that needs to transferred to update an image. There are two changes which need to be made to Mksquashfs to eliminate this random ordering, and to make Mksquashfs generate reproducible filesystems, that are the same on multiple runs: 1. When starting to output a "multi-block file" Mksquashfs needs to ensure no fragments are written interleaved between the file blocks. This is obviously because the filesystem layout doesn't allow that to happen. There are two solutions to prevent this interleaving by the parallel fragment output threads. 1.1 The first is to "lock" the fragment threads so that they can not write fragments while a "multi-block file" is being output. During this time the fragment threads will continue compressing, but will queue the fragments for later writing. 1.2 The second solution is when a "muti-block file" is to output, Mksquashfs waits for all current "in-flight" fragments to be compressed and written to disk first. Initially Mksquashfs used the second solution, but, switched over to the first solution, as it doesn't produce a fragment compression stall. The first solution generates output randomness, because it is entirely dependent on scheduling how many outstanding fragments have been written before the fragment threads get "locked" to output a "multi-block file". The second solution always produces the same ordering (the total amount of fragments produced at that point is always the same). But, this is at the potential cost of a pipeline stall (you need to wait). To make the output reproducible, Mksquashfs needs to switch to the second original solution. 2. The second change relates to the behaviour of the multiple parallel fragment compressor threads. It is entirely dependent on scheduling which thread compresses a fragment block first, and outputs it to the filesystem, and that produces random ordering in the output. The solution here is to add a sequence number, and use a "sequenced queue". The sequenced queue outputs fragments in sequence order, rather than the order in which the fragments were queued. This makes the output reproducible. This commit adds the necessary code changes. Subsequent commits will add various configuration options and Mksquashfs options to control how Mksquashfs behaves. It should also be clear that the necessary changes to make Mksquashfs reproducible are fairly minimal, if it is done correctly. Signed-off-by: Phillip Lougher --- squashfs-tools/mksquashfs.c | 128 +++++++++++++++++++++++++++++++----- squashfs-tools/restore.c | 19 +++++- 2 files changed, 129 insertions(+), 18 deletions(-) diff --git a/squashfs-tools/mksquashfs.c b/squashfs-tools/mksquashfs.c index 8801a945..55faa4cd 100644 --- a/squashfs-tools/mksquashfs.c +++ b/squashfs-tools/mksquashfs.c @@ -3,7 +3,7 @@ * filesystem. * * Copyright (c) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, - * 2012, 2013, 2014, 2017 + * 2012, 2013, 2014, 2017, 2019 * Phillip Lougher * * This program is free software; you can redistribute it and/or @@ -268,6 +268,13 @@ pthread_mutex_t fragment_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t pos_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t dup_mutex = PTHREAD_MUTEX_INITIALIZER; +/* reproducible image queues and threads */ +struct seq_queue *to_order; +pthread_t order_thread; +pthread_cond_t fragment_waiting = PTHREAD_COND_INITIALIZER; + +int reproducible = TRUE; + /* user options that control parallelisation */ int processors = -1; int bwriter_size; @@ -1469,6 +1476,18 @@ unsigned short get_fragment_checksum(struct file_info *file) } +void ensure_fragments_flushed() +{ + pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); + pthread_mutex_lock(&fragment_mutex); + + while(fragments_outstanding) + pthread_cond_wait(&fragment_waiting, &fragment_mutex); + + pthread_cleanup_pop(1); +} + + void lock_fragments() { pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); @@ -1529,12 +1548,15 @@ void add_pending_fragment(struct file_buffer *write_buffer, int c_byte, void write_fragment(struct file_buffer *fragment) { + static long long sequence = 0; + if(fragment == NULL) return; pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); pthread_mutex_lock(&fragment_mutex); fragment_table[fragment->block].unused = 0; + fragment->sequence = sequence ++; fragments_outstanding ++; queue_put(to_frag, fragment); pthread_cleanup_pop(1); @@ -2452,6 +2474,60 @@ void *frag_deflator(void *arg) } +void *frag_order_deflator(void *arg) +{ + void *stream = NULL; + int res; + + res = compressor_init(comp, &stream, block_size, 1); + if(res) + BAD_ERROR("frag_deflator:: compressor_init failed\n"); + + while(1) { + int c_byte; + struct file_buffer *file_buffer = queue_get(to_frag); + struct file_buffer *write_buffer = + cache_get(fwriter_buffer, file_buffer->block); + + c_byte = mangle2(stream, write_buffer->data, file_buffer->data, + file_buffer->size, block_size, noF, 1); + write_buffer->block = file_buffer->block; + write_buffer->sequence = file_buffer->sequence; + write_buffer->size = SQUASHFS_COMPRESSED_SIZE_BLOCK(c_byte); + write_buffer->fragment = FALSE; + seq_queue_put(to_order, write_buffer); + TRACE("Writing fragment %lld, uncompressed size %d, " + "compressed size %d\n", file_buffer->block, + file_buffer->size, SQUASHFS_COMPRESSED_SIZE_BLOCK(c_byte)); + cache_block_put(file_buffer); + } +} + + +void *frag_orderer(void *arg) +{ + pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); + + while(1) { + struct file_buffer *write_buffer = seq_queue_get(to_order); + int block = write_buffer->block; + + pthread_mutex_lock(&fragment_mutex); + fragment_table[block].size = write_buffer->size; + fragment_table[block].start_block = bytes; + write_buffer->block = bytes; + bytes += SQUASHFS_COMPRESSED_SIZE_BLOCK(write_buffer->size); + fragments_outstanding --; + log_fragment(block, write_buffer->block); + queue_put(to_writer, write_buffer); + pthread_cond_signal(&fragment_waiting); + pthread_mutex_unlock(&fragment_mutex); + } + + pthread_cleanup_pop(0); +} + + struct file_buffer *get_file_buffer() { struct file_buffer *file_buffer = seq_queue_get(to_main); @@ -2525,7 +2601,10 @@ int write_file_process(squashfs_inode *inode, struct dir_ent *dir_ent, *duplicate_file = FALSE; - lock_fragments(); + if(reproducible) + ensure_fragments_flushed(); + else + lock_fragments(); file_bytes = 0; start = bytes; @@ -2562,7 +2641,9 @@ int write_file_process(squashfs_inode *inode, struct dir_ent *dir_ent, goto read_err; } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); + fragment = get_and_fill_fragment(fragment_buffer, dir_ent); if(duplicate_checking) @@ -2599,7 +2680,8 @@ int write_file_process(squashfs_inode *inode, struct dir_ent *dir_ent, BAD_ERROR("Failed to truncate dest file because %s\n", strerror(errno)); } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); free(block_list); cache_block_put(read_buffer); return status; @@ -2630,7 +2712,10 @@ int write_file_blocks_dup(squashfs_inode *inode, struct dir_ent *dir_ent, if(buffer_list == NULL) MEM_ERROR(); - lock_fragments(); + if(reproducible) + ensure_fragments_flushed(); + else + lock_fragments(); file_bytes = 0; start = dup_start = bytes; @@ -2698,7 +2783,8 @@ int write_file_blocks_dup(squashfs_inode *inode, struct dir_ent *dir_ent, } } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); cache_block_put(fragment_buffer); free(buffer_list); file_count ++; @@ -2740,7 +2826,8 @@ int write_file_blocks_dup(squashfs_inode *inode, struct dir_ent *dir_ent, BAD_ERROR("Failed to truncate dest file because %s\n", strerror(errno)); } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); for(blocks = thresh; blocks < block; blocks ++) cache_block_put(buffer_list[blocks]); free(buffer_list); @@ -2771,7 +2858,10 @@ int write_file_blocks(squashfs_inode *inode, struct dir_ent *dir_ent, if(block_list == NULL) MEM_ERROR(); - lock_fragments(); + if(reproducible) + ensure_fragments_flushed(); + else + lock_fragments(); file_bytes = 0; start = bytes; @@ -2802,7 +2892,8 @@ int write_file_blocks(squashfs_inode *inode, struct dir_ent *dir_ent, } } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); fragment = get_and_fill_fragment(fragment_buffer, dir_ent); if(duplicate_checking) @@ -2850,7 +2941,8 @@ int write_file_blocks(squashfs_inode *inode, struct dir_ent *dir_ent, BAD_ERROR("Failed to truncate dest file because %s\n", strerror(errno)); } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); free(block_list); cache_block_put(read_buffer); return status; @@ -4295,8 +4387,8 @@ void initialise_threads(int readq, int fragq, int bwriteq, int fwriteq, for(i = 0; i < processors; i++) { if(pthread_create(&deflator_thread[i], NULL, deflator, NULL)) BAD_ERROR("Failed to create thread\n"); - if(pthread_create(&frag_deflator_thread[i], NULL, frag_deflator, - NULL) != 0) + if(pthread_create(&frag_deflator_thread[i], NULL, reproducible ? + frag_order_deflator : frag_deflator, NULL) != 0) BAD_ERROR("Failed to create thread\n"); if(pthread_create(&frag_thread[i], NULL, frag_thrd, (void *) destination_file) != 0) @@ -4305,6 +4397,11 @@ void initialise_threads(int readq, int fragq, int bwriteq, int fwriteq, main_thread = pthread_self(); + if(reproducible) { + to_order = seq_queue_init(); + pthread_create(&order_thread, NULL, frag_orderer, NULL); + } + if(!quiet) printf("Parallel mksquashfs: Using %d processor%s\n", processors, processors == 1 ? "" : "s"); @@ -5139,8 +5236,8 @@ void open_log_file(char *filename) #define VERSION() \ - printf("mksquashfs version 4.3-git (2019/04/27)\n");\ - printf("copyright (C) 2017 Phillip Lougher "\ + printf("mksquashfs version 4.3-git (2019/06/30)\n");\ + printf("copyright (C) 2019 Phillip Lougher "\ "\n\n"); \ printf("This program is free software; you can redistribute it and/or"\ "\n");\ @@ -6099,7 +6196,8 @@ int main(int argc, char *argv[]) while((fragment = get_frag_action(fragment))) write_fragment(*fragment); - unlock_fragments(); + if(!reproducible) + unlock_fragments(); pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); pthread_mutex_lock(&fragment_mutex); while(fragments_outstanding) { diff --git a/squashfs-tools/restore.c b/squashfs-tools/restore.c index 5e336b35..aedeca29 100644 --- a/squashfs-tools/restore.c +++ b/squashfs-tools/restore.c @@ -2,7 +2,7 @@ * Create a squashfs filesystem. This is a highly compressed read only * filesystem. * - * Copyright (c) 2013, 2014 + * Copyright (c) 2013, 2014, 2019 * Phillip Lougher * * This program is free software; you can redistribute it and/or @@ -46,12 +46,13 @@ #define FALSE 0 #define TRUE 1 -extern pthread_t reader_thread, writer_thread, main_thread; +extern pthread_t reader_thread, writer_thread, main_thread, order_thread; extern pthread_t *deflator_thread, *frag_deflator_thread, *frag_thread; extern struct queue *to_deflate, *to_writer, *to_frag, *to_process_frag; -extern struct seq_queue *to_main; +extern struct seq_queue *to_main, *to_order; extern void restorefs(); extern int processors; +extern int reproducible; static int interrupted = 0; static pthread_t restore_thread; @@ -131,6 +132,18 @@ void *restore_thrd(void *arg) for(i = 0; i < processors; i++) pthread_join(frag_deflator_thread[i], NULL); + if(reproducible) { + /* then flush the fragment deflator_threads(s) + * to frag orderer thread. The frag orderer + * thread will idle + */ + seq_queue_flush(to_order); + + /* now kill the frag orderer thread */ + pthread_cancel(order_thread); + pthread_join(order_thread, NULL); + } + /* * then flush the main thread/fragment deflator thread(s) * to writer thread queue. The writer thread will idle