diff --git a/squashfs-tools/mksquashfs.c b/squashfs-tools/mksquashfs.c index 8801a945..55faa4cd 100644 --- a/squashfs-tools/mksquashfs.c +++ b/squashfs-tools/mksquashfs.c @@ -3,7 +3,7 @@ * filesystem. * * Copyright (c) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, - * 2012, 2013, 2014, 2017 + * 2012, 2013, 2014, 2017, 2019 * Phillip Lougher * * This program is free software; you can redistribute it and/or @@ -268,6 +268,13 @@ pthread_mutex_t fragment_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t pos_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t dup_mutex = PTHREAD_MUTEX_INITIALIZER; +/* reproducible image queues and threads */ +struct seq_queue *to_order; +pthread_t order_thread; +pthread_cond_t fragment_waiting = PTHREAD_COND_INITIALIZER; + +int reproducible = TRUE; + /* user options that control parallelisation */ int processors = -1; int bwriter_size; @@ -1469,6 +1476,18 @@ unsigned short get_fragment_checksum(struct file_info *file) } +void ensure_fragments_flushed() +{ + pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); + pthread_mutex_lock(&fragment_mutex); + + while(fragments_outstanding) + pthread_cond_wait(&fragment_waiting, &fragment_mutex); + + pthread_cleanup_pop(1); +} + + void lock_fragments() { pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); @@ -1529,12 +1548,15 @@ void add_pending_fragment(struct file_buffer *write_buffer, int c_byte, void write_fragment(struct file_buffer *fragment) { + static long long sequence = 0; + if(fragment == NULL) return; pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); pthread_mutex_lock(&fragment_mutex); fragment_table[fragment->block].unused = 0; + fragment->sequence = sequence ++; fragments_outstanding ++; queue_put(to_frag, fragment); pthread_cleanup_pop(1); @@ -2452,6 +2474,60 @@ void *frag_deflator(void *arg) } +void *frag_order_deflator(void *arg) +{ + void *stream = NULL; + int res; + + res = compressor_init(comp, &stream, block_size, 1); + if(res) + BAD_ERROR("frag_deflator:: compressor_init failed\n"); + + while(1) { + int c_byte; + struct file_buffer *file_buffer = queue_get(to_frag); + struct file_buffer *write_buffer = + cache_get(fwriter_buffer, file_buffer->block); + + c_byte = mangle2(stream, write_buffer->data, file_buffer->data, + file_buffer->size, block_size, noF, 1); + write_buffer->block = file_buffer->block; + write_buffer->sequence = file_buffer->sequence; + write_buffer->size = SQUASHFS_COMPRESSED_SIZE_BLOCK(c_byte); + write_buffer->fragment = FALSE; + seq_queue_put(to_order, write_buffer); + TRACE("Writing fragment %lld, uncompressed size %d, " + "compressed size %d\n", file_buffer->block, + file_buffer->size, SQUASHFS_COMPRESSED_SIZE_BLOCK(c_byte)); + cache_block_put(file_buffer); + } +} + + +void *frag_orderer(void *arg) +{ + pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); + + while(1) { + struct file_buffer *write_buffer = seq_queue_get(to_order); + int block = write_buffer->block; + + pthread_mutex_lock(&fragment_mutex); + fragment_table[block].size = write_buffer->size; + fragment_table[block].start_block = bytes; + write_buffer->block = bytes; + bytes += SQUASHFS_COMPRESSED_SIZE_BLOCK(write_buffer->size); + fragments_outstanding --; + log_fragment(block, write_buffer->block); + queue_put(to_writer, write_buffer); + pthread_cond_signal(&fragment_waiting); + pthread_mutex_unlock(&fragment_mutex); + } + + pthread_cleanup_pop(0); +} + + struct file_buffer *get_file_buffer() { struct file_buffer *file_buffer = seq_queue_get(to_main); @@ -2525,7 +2601,10 @@ int write_file_process(squashfs_inode *inode, struct dir_ent *dir_ent, *duplicate_file = FALSE; - lock_fragments(); + if(reproducible) + ensure_fragments_flushed(); + else + lock_fragments(); file_bytes = 0; start = bytes; @@ -2562,7 +2641,9 @@ int write_file_process(squashfs_inode *inode, struct dir_ent *dir_ent, goto read_err; } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); + fragment = get_and_fill_fragment(fragment_buffer, dir_ent); if(duplicate_checking) @@ -2599,7 +2680,8 @@ int write_file_process(squashfs_inode *inode, struct dir_ent *dir_ent, BAD_ERROR("Failed to truncate dest file because %s\n", strerror(errno)); } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); free(block_list); cache_block_put(read_buffer); return status; @@ -2630,7 +2712,10 @@ int write_file_blocks_dup(squashfs_inode *inode, struct dir_ent *dir_ent, if(buffer_list == NULL) MEM_ERROR(); - lock_fragments(); + if(reproducible) + ensure_fragments_flushed(); + else + lock_fragments(); file_bytes = 0; start = dup_start = bytes; @@ -2698,7 +2783,8 @@ int write_file_blocks_dup(squashfs_inode *inode, struct dir_ent *dir_ent, } } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); cache_block_put(fragment_buffer); free(buffer_list); file_count ++; @@ -2740,7 +2826,8 @@ int write_file_blocks_dup(squashfs_inode *inode, struct dir_ent *dir_ent, BAD_ERROR("Failed to truncate dest file because %s\n", strerror(errno)); } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); for(blocks = thresh; blocks < block; blocks ++) cache_block_put(buffer_list[blocks]); free(buffer_list); @@ -2771,7 +2858,10 @@ int write_file_blocks(squashfs_inode *inode, struct dir_ent *dir_ent, if(block_list == NULL) MEM_ERROR(); - lock_fragments(); + if(reproducible) + ensure_fragments_flushed(); + else + lock_fragments(); file_bytes = 0; start = bytes; @@ -2802,7 +2892,8 @@ int write_file_blocks(squashfs_inode *inode, struct dir_ent *dir_ent, } } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); fragment = get_and_fill_fragment(fragment_buffer, dir_ent); if(duplicate_checking) @@ -2850,7 +2941,8 @@ int write_file_blocks(squashfs_inode *inode, struct dir_ent *dir_ent, BAD_ERROR("Failed to truncate dest file because %s\n", strerror(errno)); } - unlock_fragments(); + if(!reproducible) + unlock_fragments(); free(block_list); cache_block_put(read_buffer); return status; @@ -4295,8 +4387,8 @@ void initialise_threads(int readq, int fragq, int bwriteq, int fwriteq, for(i = 0; i < processors; i++) { if(pthread_create(&deflator_thread[i], NULL, deflator, NULL)) BAD_ERROR("Failed to create thread\n"); - if(pthread_create(&frag_deflator_thread[i], NULL, frag_deflator, - NULL) != 0) + if(pthread_create(&frag_deflator_thread[i], NULL, reproducible ? + frag_order_deflator : frag_deflator, NULL) != 0) BAD_ERROR("Failed to create thread\n"); if(pthread_create(&frag_thread[i], NULL, frag_thrd, (void *) destination_file) != 0) @@ -4305,6 +4397,11 @@ void initialise_threads(int readq, int fragq, int bwriteq, int fwriteq, main_thread = pthread_self(); + if(reproducible) { + to_order = seq_queue_init(); + pthread_create(&order_thread, NULL, frag_orderer, NULL); + } + if(!quiet) printf("Parallel mksquashfs: Using %d processor%s\n", processors, processors == 1 ? "" : "s"); @@ -5139,8 +5236,8 @@ void open_log_file(char *filename) #define VERSION() \ - printf("mksquashfs version 4.3-git (2019/04/27)\n");\ - printf("copyright (C) 2017 Phillip Lougher "\ + printf("mksquashfs version 4.3-git (2019/06/30)\n");\ + printf("copyright (C) 2019 Phillip Lougher "\ "\n\n"); \ printf("This program is free software; you can redistribute it and/or"\ "\n");\ @@ -6099,7 +6196,8 @@ int main(int argc, char *argv[]) while((fragment = get_frag_action(fragment))) write_fragment(*fragment); - unlock_fragments(); + if(!reproducible) + unlock_fragments(); pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex); pthread_mutex_lock(&fragment_mutex); while(fragments_outstanding) { diff --git a/squashfs-tools/restore.c b/squashfs-tools/restore.c index 5e336b35..aedeca29 100644 --- a/squashfs-tools/restore.c +++ b/squashfs-tools/restore.c @@ -2,7 +2,7 @@ * Create a squashfs filesystem. This is a highly compressed read only * filesystem. * - * Copyright (c) 2013, 2014 + * Copyright (c) 2013, 2014, 2019 * Phillip Lougher * * This program is free software; you can redistribute it and/or @@ -46,12 +46,13 @@ #define FALSE 0 #define TRUE 1 -extern pthread_t reader_thread, writer_thread, main_thread; +extern pthread_t reader_thread, writer_thread, main_thread, order_thread; extern pthread_t *deflator_thread, *frag_deflator_thread, *frag_thread; extern struct queue *to_deflate, *to_writer, *to_frag, *to_process_frag; -extern struct seq_queue *to_main; +extern struct seq_queue *to_main, *to_order; extern void restorefs(); extern int processors; +extern int reproducible; static int interrupted = 0; static pthread_t restore_thread; @@ -131,6 +132,18 @@ void *restore_thrd(void *arg) for(i = 0; i < processors; i++) pthread_join(frag_deflator_thread[i], NULL); + if(reproducible) { + /* then flush the fragment deflator_threads(s) + * to frag orderer thread. The frag orderer + * thread will idle + */ + seq_queue_flush(to_order); + + /* now kill the frag orderer thread */ + pthread_cancel(order_thread); + pthread_join(order_thread, NULL); + } + /* * then flush the main thread/fragment deflator thread(s) * to writer thread queue. The writer thread will idle