Skip to content
Permalink
Browse files Browse the repository at this point in the history
remove frag_deflator_thread
frag_deflator_thread compress fragments.
Replace the deflator_thread with a function and
use the function instead of the to_frag queue.
  • Loading branch information
lynxis committed Jan 13, 2017
1 parent 0574cd6 commit afc0c76
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 68 deletions.
5 changes: 0 additions & 5 deletions squashfs-tools/info.c
Expand Up @@ -96,11 +96,6 @@ void dump_state()
printf("compressed block queue (deflate thread(s) -> main thread)\n");
dump_seq_queue(to_main, 0);

printf("uncompressed packed fragment queue (main thread -> fragment"
" deflate thread(s))\n");
dump_queue(to_frag);


printf("locked frag queue (compressed frags waiting while multi-block"
" file is written)\n");
dump_queue(locked_fragment);
Expand Down
76 changes: 27 additions & 49 deletions squashfs-tools/mksquashfs.c
Expand Up @@ -270,10 +270,10 @@ unsigned int sid_count = 0, suid_count = 0, sguid_count = 0;
struct cache *reader_buffer, *fragment_buffer, *reserve_cache;
struct cache *bwriter_buffer, *fwriter_buffer;
struct queue *to_reader, *to_deflate, *to_writer, *from_writer,
*to_frag, *locked_fragment, *to_process_frag;
*locked_fragment, *to_process_frag;
struct seq_queue *to_main;
pthread_t reader_thread, writer_thread, main_thread;
pthread_t *deflator_thread, *frag_deflator_thread, *frag_thread;
pthread_t *deflator_thread, *frag_thread;
pthread_t *restore_thread = NULL;
pthread_mutex_t fragment_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t pos_mutex = PTHREAD_MUTEX_INITIALIZER;
Expand Down Expand Up @@ -323,7 +323,7 @@ struct dir_info *scan1_opendir(char *pathname, char *subpath, int depth);
void write_filesystem_tables(struct squashfs_super_block *sBlk, int nopad);
unsigned short get_checksum_mem(char *buff, int bytes);
void check_usable_phys_mem(int total_mem);

void frag_deflator(struct file_buffer *file_buffer);

void prep_exit()
{
Expand Down Expand Up @@ -1540,7 +1540,7 @@ void write_fragment(struct file_buffer *fragment)
pthread_mutex_lock(&fragment_mutex);
fragment_table[fragment->block].unused = 0;
fragments_outstanding ++;
queue_put(to_frag, fragment);
frag_deflator(fragment);
pthread_cleanup_pop(1);
}

Expand Down Expand Up @@ -2412,51 +2412,34 @@ void *deflator(void *arg)
}


void *frag_deflator(void *arg)
void frag_deflator(struct file_buffer *file_buffer)
{
void *stream = NULL;
int res;

res = compressor_init(comp, &stream, block_size, 1);
if(res)
BAD_ERROR("frag_deflator:: compressor_init failed\n");

pthread_cleanup_push((void *) pthread_mutex_unlock, &fragment_mutex);

while(1) {
int c_byte, compressed_size;
struct file_buffer *file_buffer = queue_get(to_frag);
struct file_buffer *write_buffer =
int c_byte, compressed_size;
struct file_buffer *write_buffer =
cache_get(fwriter_buffer, file_buffer->block);

c_byte = mangle2(stream, write_buffer->data, file_buffer->data,
file_buffer->size, block_size, noF, 1);
compressed_size = SQUASHFS_COMPRESSED_SIZE_BLOCK(c_byte);
write_buffer->size = compressed_size;
pthread_mutex_lock(&fragment_mutex);
if(fragments_locked == FALSE) {
fragment_table[file_buffer->block].size = c_byte;
fragment_table[file_buffer->block].start_block = bytes;
write_buffer->block = bytes;
bytes += compressed_size;
fragments_outstanding --;
queue_put(to_writer, write_buffer);
pthread_mutex_unlock(&fragment_mutex);
TRACE("Writing fragment %lld, uncompressed size %d, "
"compressed size %d\n", file_buffer->block,
file_buffer->size, compressed_size);
} else {
add_pending_fragment(write_buffer, c_byte,
file_buffer->block);
pthread_mutex_unlock(&fragment_mutex);
}
cache_block_put(file_buffer);
c_byte = mangle2(stream, write_buffer->data, file_buffer->data,
file_buffer->size, block_size, noF, 1);
compressed_size = SQUASHFS_COMPRESSED_SIZE_BLOCK(c_byte);
write_buffer->size = compressed_size;
if(fragments_locked == FALSE) {
fragment_table[file_buffer->block].size = c_byte;
fragment_table[file_buffer->block].start_block = bytes;
write_buffer->block = bytes;
bytes += compressed_size;
fragments_outstanding --;
queue_put(to_writer, write_buffer);
TRACE("Writing fragment %lld, uncompressed size %d, "
"compressed size %d\n", file_buffer->block,
file_buffer->size, compressed_size);
} else {
add_pending_fragment(write_buffer, c_byte,
file_buffer->block);
}

pthread_cleanup_pop(0);
cache_block_put(file_buffer);
}


struct file_buffer *get_file_buffer()
{
struct file_buffer *file_buffer = seq_queue_get(to_main);
Expand Down Expand Up @@ -4257,19 +4240,17 @@ void initialise_threads(int readq, int fragq, int bwriteq, int fwriteq,
multiply_overflow(processors * 3, sizeof(pthread_t)))
BAD_ERROR("Processors too large\n");

deflator_thread = malloc(processors * 3 * sizeof(pthread_t));
deflator_thread = malloc(processors * 2 * sizeof(pthread_t));
if(deflator_thread == NULL)
MEM_ERROR();

frag_deflator_thread = &deflator_thread[processors];
frag_thread = &frag_deflator_thread[processors];
frag_thread = &deflator_thread[processors];

to_reader = queue_init(1);
to_deflate = queue_init(reader_size);
to_process_frag = queue_init(reader_size);
to_writer = queue_init(bwriter_size + fwriter_size);
from_writer = queue_init(1);
to_frag = queue_init(fragment_size);
locked_fragment = queue_init(fragment_size);
to_main = seq_queue_init();
reader_buffer = cache_init(block_size, reader_size, 0, 0);
Expand All @@ -4285,9 +4266,6 @@ void initialise_threads(int readq, int fragq, int bwriteq, int fwriteq,
for(i = 0; i < processors; i++) {
if(pthread_create(&deflator_thread[i], NULL, deflator, NULL))
BAD_ERROR("Failed to create thread\n");
if(pthread_create(&frag_deflator_thread[i], NULL, frag_deflator,
NULL) != 0)
BAD_ERROR("Failed to create thread\n");
if(pthread_create(&frag_thread[i], NULL, frag_thrd,
(void *) destination_file) != 0)
BAD_ERROR("Failed to create thread\n");
Expand Down
2 changes: 1 addition & 1 deletion squashfs-tools/mksquashfs.h
Expand Up @@ -135,7 +135,7 @@ struct append_file {
extern struct cache *reader_buffer, *fragment_buffer, *reserve_cache;
struct cache *bwriter_buffer, *fwriter_buffer;
extern struct queue *to_reader, *to_deflate, *to_writer, *from_writer,
*to_frag, *locked_fragment, *to_process_frag;
*locked_fragment, *to_process_frag;
extern struct append_file **file_mapping;
extern struct seq_queue *to_main;
extern pthread_mutex_t fragment_mutex, dup_mutex;
Expand Down
15 changes: 2 additions & 13 deletions squashfs-tools/restore.c
Expand Up @@ -47,8 +47,8 @@
#define TRUE 1

extern pthread_t reader_thread, writer_thread, main_thread;
extern pthread_t *deflator_thread, *frag_deflator_thread, *frag_thread;
extern struct queue *to_deflate, *to_writer, *to_frag, *to_process_frag;
extern pthread_t *deflator_thread, *frag_thread;
extern struct queue *to_deflate, *to_writer, *to_process_frag;
extern struct seq_queue *to_main;
extern void restorefs();
extern int processors;
Expand Down Expand Up @@ -120,17 +120,6 @@ void *restore_thrd(void *arg)
pthread_cancel(main_thread);
pthread_join(main_thread, NULL);

/* then flush the main thread to fragment deflator thread(s)
* queue. The fragment deflator thread(s) will idle
*/
queue_flush(to_frag);

/* now kill the fragment deflator thread(s) */
for(i = 0; i < processors; i++)
pthread_cancel(frag_deflator_thread[i]);
for(i = 0; i < processors; i++)
pthread_join(frag_deflator_thread[i], NULL);

/*
* then flush the main thread/fragment deflator thread(s)
* to writer thread queue. The writer thread will idle
Expand Down

0 comments on commit afc0c76

Please sign in to comment.