Skip to content

Commit

Permalink
Make Squashfs filesystem creation reproducible
Browse files Browse the repository at this point in the history
Ever since Mksquashfs was parallelised back in 2006, there
has been a certain randomness in how fragments and multi-block
files are ordered in the output filesystem even if the input
remains the same.

This is because the multiple parallel threads can be scheduled
differently between Mksquashfs runs.  For example, the thread
given fragment 10 to compress may finish before the thread
given fragment 9 to compress on one run (writing fragment 10
to the output filesystem before fragment 9), but, on the next
run it could be vice-versa.  There are many different scheduling
scenarios here, all of which can have a knock on effect causing
different scheduling and ordering later in the filesystem too.

Mkquashfs doesn't care about the ordering of fragments and
multi-block files within the filesystem, as this does not
affect the correctness of the filesystem.

In fact not caring about the ordering, as it doesn't matter, allows
Mksquashfs to run as fast as possible, maximising CPU and I/O
performance.

But, in the last couple of years, Squashfs has become used in
scenarios (cloud etc) where this randomness is causing problems.
Specifically this appears to be where downloaders, installers etc.
try to work out the differences between Squashfs filesystem
updates to minimise the amount of data that needs to transferred
to update an image.

There are two changes which need to be made to Mksquashfs to
eliminate this random ordering, and to make Mksquashfs
generate reproducible filesystems, that are the same on
multiple runs:

1. When starting to output a "multi-block file" Mksquashfs
needs to ensure no fragments are written interleaved
between the file blocks.  This is obviously because the
filesystem layout doesn't allow that to happen.

There are two solutions to prevent this interleaving by
the parallel fragment output threads.

1.1 The first is to "lock" the fragment threads so that they
can not write fragments while a "multi-block file" is being
output.  During this time the fragment threads will continue
compressing, but will queue the fragments for later writing.

1.2 The second solution is when a "muti-block file" is to
output, Mksquashfs waits for all current "in-flight"
fragments to be compressed and written to disk first.

Initially Mksquashfs used the second solution, but, switched
over to the first solution, as it doesn't produce
a fragment compression stall.

The first solution generates output randomness, because it
is entirely dependent on scheduling how many outstanding
fragments have been written before the fragment threads
get "locked" to output a "multi-block file".

The second solution always produces the same ordering (the
total amount of fragments produced at that point is always
the same).  But, this is at the potential cost of a pipeline
stall (you need to wait).

To make the output reproducible, Mksquashfs needs to switch
to the second original solution.

2. The second change relates to the behaviour of the multiple
parallel fragment compressor threads.  It is entirely
dependent on scheduling which thread compresses a fragment
block first, and outputs it to the filesystem, and that
produces random ordering in the output.

The solution here is to add a sequence number, and use a
"sequenced queue".  The sequenced queue outputs fragments in
sequence order, rather than the order in which the fragments
were queued.  This makes the output reproducible.

This commit adds the necessary code changes.  Subsequent
commits will add various configuration options and Mksquashfs
options to control how Mksquashfs behaves.

It should also be clear that the necessary changes to make
Mksquashfs reproducible are fairly minimal, if it is
done correctly.

Signed-off-by: Phillip Lougher <phillip@squashfs.org.uk>
  • Loading branch information
plougher committed Jun 30, 2019
1 parent 46bdc17 commit 24da0c6
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 18 deletions.
128 changes: 113 additions & 15 deletions squashfs-tools/mksquashfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* filesystem.
*
* Copyright (c) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011,
* 2012, 2013, 2014, 2017
* 2012, 2013, 2014, 2017, 2019
* Phillip Lougher <phillip@squashfs.org.uk>
*
* This program is free software; you can redistribute it and/or
Expand Down Expand Up @@ -268,6 +268,13 @@ pthread_mutex_t fragment_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t pos_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t dup_mutex = PTHREAD_MUTEX_INITIALIZER;

/* reproducible image queues and threads */
struct seq_queue *to_order;
pthread_t order_thread;
pthread_cond_t fragment_waiting = PTHREAD_COND_INITIALIZER;

int reproducible = TRUE;

/* user options that control parallelisation */
int processors = -1;
int bwriter_size;
Expand Down Expand Up @@ -1469,6 +1476,18 @@ unsigned short get_fragment_checksum(struct file_info *file)
}


void ensure_fragments_flushed()
{
pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex);
pthread_mutex_lock(&fragment_mutex);

while(fragments_outstanding)
pthread_cond_wait(&fragment_waiting, &fragment_mutex);

pthread_cleanup_pop(1);
}


void lock_fragments()
{
pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex);
Expand Down Expand Up @@ -1529,12 +1548,15 @@ void add_pending_fragment(struct file_buffer *write_buffer, int c_byte,

void write_fragment(struct file_buffer *fragment)
{
static long long sequence = 0;

if(fragment == NULL)
return;

pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex);
pthread_mutex_lock(&fragment_mutex);
fragment_table[fragment->block].unused = 0;
fragment->sequence = sequence ++;
fragments_outstanding ++;
queue_put(to_frag, fragment);
pthread_cleanup_pop(1);
Expand Down Expand Up @@ -2452,6 +2474,60 @@ void *frag_deflator(void *arg)
}


void *frag_order_deflator(void *arg)
{
void *stream = NULL;
int res;

res = compressor_init(comp, &stream, block_size, 1);
if(res)
BAD_ERROR("frag_deflator:: compressor_init failed\n");

while(1) {
int c_byte;
struct file_buffer *file_buffer = queue_get(to_frag);
struct file_buffer *write_buffer =
cache_get(fwriter_buffer, file_buffer->block);

c_byte = mangle2(stream, write_buffer->data, file_buffer->data,
file_buffer->size, block_size, noF, 1);
write_buffer->block = file_buffer->block;
write_buffer->sequence = file_buffer->sequence;
write_buffer->size = SQUASHFS_COMPRESSED_SIZE_BLOCK(c_byte);
write_buffer->fragment = FALSE;
seq_queue_put(to_order, write_buffer);
TRACE("Writing fragment %lld, uncompressed size %d, "
"compressed size %d\n", file_buffer->block,
file_buffer->size, SQUASHFS_COMPRESSED_SIZE_BLOCK(c_byte));
cache_block_put(file_buffer);
}
}


void *frag_orderer(void *arg)
{
pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex);

while(1) {
struct file_buffer *write_buffer = seq_queue_get(to_order);
int block = write_buffer->block;

pthread_mutex_lock(&fragment_mutex);
fragment_table[block].size = write_buffer->size;
fragment_table[block].start_block = bytes;
write_buffer->block = bytes;
bytes += SQUASHFS_COMPRESSED_SIZE_BLOCK(write_buffer->size);
fragments_outstanding --;
log_fragment(block, write_buffer->block);
queue_put(to_writer, write_buffer);
pthread_cond_signal(&fragment_waiting);
pthread_mutex_unlock(&fragment_mutex);
}

pthread_cleanup_pop(0);
}


struct file_buffer *get_file_buffer()
{
struct file_buffer *file_buffer = seq_queue_get(to_main);
Expand Down Expand Up @@ -2525,7 +2601,10 @@ int write_file_process(squashfs_inode *inode, struct dir_ent *dir_ent,

*duplicate_file = FALSE;

lock_fragments();
if(reproducible)
ensure_fragments_flushed();
else
lock_fragments();

file_bytes = 0;
start = bytes;
Expand Down Expand Up @@ -2562,7 +2641,9 @@ int write_file_process(squashfs_inode *inode, struct dir_ent *dir_ent,
goto read_err;
}

unlock_fragments();
if(!reproducible)
unlock_fragments();

fragment = get_and_fill_fragment(fragment_buffer, dir_ent);

if(duplicate_checking)
Expand Down Expand Up @@ -2599,7 +2680,8 @@ int write_file_process(squashfs_inode *inode, struct dir_ent *dir_ent,
BAD_ERROR("Failed to truncate dest file because %s\n",
strerror(errno));
}
unlock_fragments();
if(!reproducible)
unlock_fragments();
free(block_list);
cache_block_put(read_buffer);
return status;
Expand Down Expand Up @@ -2630,7 +2712,10 @@ int write_file_blocks_dup(squashfs_inode *inode, struct dir_ent *dir_ent,
if(buffer_list == NULL)
MEM_ERROR();

lock_fragments();
if(reproducible)
ensure_fragments_flushed();
else
lock_fragments();

file_bytes = 0;
start = dup_start = bytes;
Expand Down Expand Up @@ -2698,7 +2783,8 @@ int write_file_blocks_dup(squashfs_inode *inode, struct dir_ent *dir_ent,
}
}

unlock_fragments();
if(!reproducible)
unlock_fragments();
cache_block_put(fragment_buffer);
free(buffer_list);
file_count ++;
Expand Down Expand Up @@ -2740,7 +2826,8 @@ int write_file_blocks_dup(squashfs_inode *inode, struct dir_ent *dir_ent,
BAD_ERROR("Failed to truncate dest file because %s\n",
strerror(errno));
}
unlock_fragments();
if(!reproducible)
unlock_fragments();
for(blocks = thresh; blocks < block; blocks ++)
cache_block_put(buffer_list[blocks]);
free(buffer_list);
Expand Down Expand Up @@ -2771,7 +2858,10 @@ int write_file_blocks(squashfs_inode *inode, struct dir_ent *dir_ent,
if(block_list == NULL)
MEM_ERROR();

lock_fragments();
if(reproducible)
ensure_fragments_flushed();
else
lock_fragments();

file_bytes = 0;
start = bytes;
Expand Down Expand Up @@ -2802,7 +2892,8 @@ int write_file_blocks(squashfs_inode *inode, struct dir_ent *dir_ent,
}
}

unlock_fragments();
if(!reproducible)
unlock_fragments();
fragment = get_and_fill_fragment(fragment_buffer, dir_ent);

if(duplicate_checking)
Expand Down Expand Up @@ -2850,7 +2941,8 @@ int write_file_blocks(squashfs_inode *inode, struct dir_ent *dir_ent,
BAD_ERROR("Failed to truncate dest file because %s\n",
strerror(errno));
}
unlock_fragments();
if(!reproducible)
unlock_fragments();
free(block_list);
cache_block_put(read_buffer);
return status;
Expand Down Expand Up @@ -4295,8 +4387,8 @@ void initialise_threads(int readq, int fragq, int bwriteq, int fwriteq,
for(i = 0; i < processors; i++) {
if(pthread_create(&deflator_thread[i], NULL, deflator, NULL))
BAD_ERROR("Failed to create thread\n");
if(pthread_create(&frag_deflator_thread[i], NULL, frag_deflator,
NULL) != 0)
if(pthread_create(&frag_deflator_thread[i], NULL, reproducible ?
frag_order_deflator : frag_deflator, NULL) != 0)
BAD_ERROR("Failed to create thread\n");
if(pthread_create(&frag_thread[i], NULL, frag_thrd,
(void *) destination_file) != 0)
Expand All @@ -4305,6 +4397,11 @@ void initialise_threads(int readq, int fragq, int bwriteq, int fwriteq,

main_thread = pthread_self();

if(reproducible) {
to_order = seq_queue_init();
pthread_create(&order_thread, NULL, frag_orderer, NULL);
}

if(!quiet)
printf("Parallel mksquashfs: Using %d processor%s\n", processors,
processors == 1 ? "" : "s");
Expand Down Expand Up @@ -5139,8 +5236,8 @@ void open_log_file(char *filename)


#define VERSION() \
printf("mksquashfs version 4.3-git (2019/04/27)\n");\
printf("copyright (C) 2017 Phillip Lougher "\
printf("mksquashfs version 4.3-git (2019/06/30)\n");\
printf("copyright (C) 2019 Phillip Lougher "\
"<phillip@squashfs.org.uk>\n\n"); \
printf("This program is free software; you can redistribute it and/or"\
"\n");\
Expand Down Expand Up @@ -6099,7 +6196,8 @@ int main(int argc, char *argv[])

while((fragment = get_frag_action(fragment)))
write_fragment(*fragment);
unlock_fragments();
if(!reproducible)
unlock_fragments();
pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex);
pthread_mutex_lock(&fragment_mutex);
while(fragments_outstanding) {
Expand Down
19 changes: 16 additions & 3 deletions squashfs-tools/restore.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Create a squashfs filesystem. This is a highly compressed read only
* filesystem.
*
* Copyright (c) 2013, 2014
* Copyright (c) 2013, 2014, 2019
* Phillip Lougher <phillip@squashfs.org.uk>
*
* This program is free software; you can redistribute it and/or
Expand Down Expand Up @@ -46,12 +46,13 @@
#define FALSE 0
#define TRUE 1

extern pthread_t reader_thread, writer_thread, main_thread;
extern pthread_t reader_thread, writer_thread, main_thread, order_thread;
extern pthread_t *deflator_thread, *frag_deflator_thread, *frag_thread;
extern struct queue *to_deflate, *to_writer, *to_frag, *to_process_frag;
extern struct seq_queue *to_main;
extern struct seq_queue *to_main, *to_order;
extern void restorefs();
extern int processors;
extern int reproducible;

static int interrupted = 0;
static pthread_t restore_thread;
Expand Down Expand Up @@ -131,6 +132,18 @@ void *restore_thrd(void *arg)
for(i = 0; i < processors; i++)
pthread_join(frag_deflator_thread[i], NULL);

if(reproducible) {
/* then flush the fragment deflator_threads(s)
* to frag orderer thread. The frag orderer
* thread will idle
*/
seq_queue_flush(to_order);

/* now kill the frag orderer thread */
pthread_cancel(order_thread);
pthread_join(order_thread, NULL);
}

/*
* then flush the main thread/fragment deflator thread(s)
* to writer thread queue. The writer thread will idle
Expand Down

0 comments on commit 24da0c6

Please sign in to comment.