diff --git a/src/fasta.rs b/src/fasta.rs index f78e6f1..c0bdf2e 100644 --- a/src/fasta.rs +++ b/src/fasta.rs @@ -1,354 +1,220 @@ // The Computer Language Benchmarks Game // http://benchmarksgame.alioth.debian.org/ // -// contributed by Joshua Landau -// -// built on top of Rust versions -// contributed by the Rust Project Developers -// contributed by TeXitoi -// multi-threaded version contributed by Alisdair Owens -// -// based off of Haskell version -// contributed by Bryan O'Sullivan -// parallelized by Maxim Sokolov based on -// go variant by Chris Bainbridge et al. - - +// contributed by the Rust Project Developers +// contributed by TeXitoi +// multi-threaded version contributed by Alisdair Owens extern crate num_cpus; - use std::cmp::min; use std::io; -use std::sync::{Arc, Mutex}; +use std::io::{Write, BufWriter, ErrorKind}; +use std::sync::{Mutex,Arc}; use std::thread; +const LINE_LENGTH: usize = 60; +const IM: u32 = 139968; +const LINES: usize = 1024; +const BLKLEN: usize = LINE_LENGTH * LINES; -const LINE_LEN: usize = 60; - -const BLOCK_LINES: usize = 512; -const BLOCK_THOROUGHPUT: usize = LINE_LEN * BLOCK_LINES; -const BLOCK_LEN: usize = BLOCK_THOROUGHPUT + BLOCK_LINES; - -const STDIN_BUF: usize = (LINE_LEN + 1) * 1024; - - -const ALU: &'static [u8] = - b"GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTTGG\ - GAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTCGAGA\ - CCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACTAAAAAT\ - ACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTGTAATCCCA\ - GCTACTCGGGAGGCTGAGGCAGGAGAATCGCTTGAACCCGGG\ - AGGCGGAGGTTGCAGTGAGCCGAGATCGCGCCACTGCACTCC\ - AGCCTGGGCGACAGAGCGAGACTCCGTCTCAAAAA"; - -const IUB: &'static [(u8, f32)] = - &[(b'a', 0.27), (b'c', 0.12), (b'g', 0.12), - (b't', 0.27), (b'B', 0.02), (b'D', 0.02), - (b'H', 0.02), (b'K', 0.02), (b'M', 0.02), - (b'N', 0.02), (b'R', 0.02), (b'S', 0.02), - (b'V', 0.02), (b'W', 0.02), (b'Y', 0.02)]; - -const HOMOSAPIENS: &'static [(u8, f32)] = - &[(b'a', 0.3029549426680), - (b'c', 0.1979883004921), - (b'g', 0.1975473066391), - (b't', 0.3015094502008)]; - - -// We need a specific Rng, -// so implement this manually -mod random { - const MODULUS: u32 = 139968; - const MULTIPLIER: u32 = 3877; - const ADDITIVE: u32 = 29573; +struct MyStdOut { + thread_count: u16, + next_thread_num: u16, + stdout: io::Stdout, +} - // Why doesn't rust already have this? - // Algorithm directly taken from Wikipedia - fn powmod(mut base: u64, mut exponent: u32, modulus: u64) -> u64 { - let mut ret = 1; - base %= modulus; +struct MyRandom { + last: u32, + count: usize, + thread_count: u16, + next_thread_num: u16, +} - while exponent > 0 { - if exponent & 1 == 1 { - ret *= base; - ret %= modulus; - } - exponent >>= 1; - base *= base; - base %= modulus; +impl MyRandom { + fn new(count: usize, thread_count: u16) -> MyRandom { + MyRandom { + last: 42, + count: count, + thread_count: thread_count, + next_thread_num: 0 } - - ret } + + fn normalize(p: f32) -> u32 {(p * IM as f32).floor() as u32} - // Just a typical LCRNG - pub struct Rng { - last: u32 + fn reset(&mut self, count: usize) { + self.next_thread_num = 0; + self.count = count; } - impl Rng { - pub fn new() -> Rng { - Rng { last: 42 } - } + fn gen(&mut self, buf: &mut [u32], cur_thread: u16) -> Result { - pub fn max_value() -> u32 { - MODULUS - 1 + if self.next_thread_num != cur_thread { + return Err(()) } - pub fn normalize(p: f32) -> u32 { - (p * MODULUS as f32).floor() as u32 + self.next_thread_num+=1; + if self.next_thread_num == self.thread_count { + self.next_thread_num = 0; } - pub fn gen(&mut self) -> u32 { - self.last = (self.last * MULTIPLIER + ADDITIVE) % MODULUS; - self.last - } - - // This allows us to fast-forward the RNG, - // allowing us to run it in parallel. - pub fn future(&self, n: u32) -> Rng { - let a = MULTIPLIER as u64; - let b = ADDITIVE as u64; - let m = MODULUS as u64; - - // (a^n - 1) mod (a-1) m - // x_k = ((a^n x_0 mod m) + --------------------- b) mod m - // a - 1 - // - // Since (a - 1) divides (a^n - 1) mod (a-1) m, - // the subtraction does not overflow and thus can be non-modular. - // - let new_seed = - (powmod(a, n, m) * self.last as u64) % m + - (powmod(a, n, (a-1) * m) - 1) / (a-1) * b; - - Rng { last: (new_seed % m) as u32 } + let to_gen = min(buf.len(), self.count); + for i in 0..to_gen { + self.last = (self.last * 3877 + 29573) % IM; + buf[i] = self.last; } + self.count -= to_gen; + Ok(to_gen) } } -// 'Cause modules are so old-hat -use random::Rng; - - -// This will end up keeping track of threads, like -// in the other multithreaded Rust version, in -// order to keep writes in order. -// -// This is stolen from another multithreaded Rust -// implementation, although that implementation -// was not able to parallelize the RNG itself. -struct BlockSubmitter { - writer: W, - pub waiting_on: usize, -} - -impl BlockSubmitter { - fn submit(&mut self, data: &[u8], block_num: usize) -> Option> { - if block_num == self.waiting_on { - self.waiting_on += 1; - Some(self.submit_async(data)) +impl MyStdOut { + fn new(thread_count: u16) -> MyStdOut { + MyStdOut { + thread_count: thread_count, + next_thread_num: 0, + stdout: io::stdout() + } + } + fn write(&mut self, data: &[u8], cur_thread: u16) -> io::Result<()> { + if self.next_thread_num != cur_thread { + return Err(io::Error::new(ErrorKind::Other, "")); } - else { - None + + self.next_thread_num+=1; + if self.next_thread_num == self.thread_count { + self.next_thread_num = 0; } - } - fn submit_async(&mut self, data: &[u8]) -> io::Result<()> { - self.writer.write_all(data) + self.stdout.write_all(data) } } +fn make_random(data: &[(char, f32)]) -> Vec<(u32, u8)> { + let mut acc = 0.; + data.iter() + .map(|&(ch, p)| { + acc += p; + (MyRandom::normalize(acc), ch as u8) + }) + .collect() +} -// For repeating strings as output -fn fasta_static( - writer: &mut W, - header: &[u8], - data: &[u8], - mut n: usize -) -> io::Result<()> -{ - // The aim here is to print a short(ish) string cyclically - // with line breaks as appropriate. - // - // The secret technique is to repeat the string such that - // any wanted line is a single offset in the string. - // - // This technique is stolen from the Haskell version. - - try!(writer.write_all(header)); - - // Maximum offset is data.len(), - // Maximum read len is LINE_LEN - let stream = data.iter().cloned().cycle(); - let mut extended: Vec = stream.take(data.len() + LINE_LEN + 1).collect(); - - let mut offset = 0; +fn make_fasta2>(header: &str, mut it: I, mut n: usize) + -> io::Result<()> { + let mut sysout = BufWriter::new(io::stdout()); + try!(sysout.write_all(header.as_bytes())); + let mut line = [0u8; LINE_LENGTH + 1]; while n > 0 { - let write_len = min(LINE_LEN, n); - let end = offset + write_len; - n -= write_len; - - let tmp = extended[end]; - extended[end] = b'\n'; - try!(writer.write_all(&extended[offset..end + 1])); - extended[end] = tmp; - - offset = end; - offset %= data.len(); + let nb = min(LINE_LENGTH, n); + for i in (0..nb) { + line[i] = it.next().unwrap(); + } + n -= nb; + line[nb] = '\n' as u8; + try!(sysout.write_all(&line[..(nb+1)])); } - Ok(()) } +fn do_fasta(thread_num: u16, rng: Arc>, + wr: Arc>, data: Vec<(u32, u8)>) { + + let mut rng_buf = [0u32; BLKLEN]; + let mut out_buf = [0u8; BLKLEN + LINES]; + let mut count; + loop { + loop { + if let Ok(x) = rng.lock().unwrap().gen(&mut rng_buf, thread_num) { + count = x; + break; + } + }; -// For RNG streams as output -fn fasta( - submitter: &Arc>>, - header: &[u8], - table: &[(u8, f32)], - rng: &mut Rng, - n: usize -) -> io::Result<()> -{ - // There's another secret technique in use here: - // we generate a lookup table to cache search of the - // aa buffer. - // - // The secret technique used is stolen from Haskell's - // implementation, and is the main secret to the Haskell - // implementation's speed. - fn gen_lookup_table(aa: &[(u8, f32)]) -> Vec { - let mut table = Vec::with_capacity(Rng::max_value() as usize + 1); - - let mut cumulative_prob = 0.0; - let mut cumulative_norm = 0; - - for &(byte, prob) in aa { - let last_norm = cumulative_norm; - cumulative_prob += prob; - cumulative_norm = min(Rng::max_value(), Rng::normalize(cumulative_prob)) + 1; - - table.extend((0..cumulative_norm - last_norm).map(|_| byte)); + if count == 0 { + break; } + let mut line_count = 0; + for i in 0..count { + if i % LINE_LENGTH == 0 && i > 0 { + out_buf[i+line_count] = b'\n'; + line_count += 1; + } + let rn = rng_buf[i]; + for j in &data { + if j.0 >= rn { + out_buf[i+line_count] = j.1; + break; + } + } + } + out_buf[count+line_count] = b'\n'; - table - } - - { - try!(submitter.lock().unwrap().submit_async(header)); + while let Err(_) = wr.lock() + .unwrap() + .write(&out_buf[..(count+line_count+1)], thread_num) {}; } +} - let lookup_table = Arc::new(gen_lookup_table(table)); +fn make_fasta(header: &str, rng: Arc>, + data: Vec<(u32, u8)>, num_threads: u16 + ) -> io::Result<()> { - let thread_count = num_cpus::get(); + let stdout = Arc::new(Mutex::new(MyStdOut::new(num_threads))); + try!(io::stdout().write_all(header.as_bytes())); let mut threads = Vec::new(); - for block_num in 0..thread_count { - let offset = BLOCK_THOROUGHPUT * block_num; - - let local_submitter = submitter.clone(); - let local_lookup_table = lookup_table.clone(); - let local_rng = rng.future(offset as u32); - + for thread in 0..num_threads { + let d = data.clone(); + let rng_clone = rng.clone(); + let stdout_clone = stdout.clone(); threads.push(thread::spawn(move || { - gen_block( - local_submitter, - local_lookup_table, - local_rng, - n.saturating_sub(offset), - block_num, - thread_count - ) + do_fasta(thread, rng_clone, stdout_clone, d); })); } - - for thread in threads { - try!(thread.join().unwrap()); - } - - *rng = rng.future(n as u32); - - Ok(()) -} - -// A very optimized writer. -// I have a feeling a simpler version wouldn't slow -// things down too much, though, since the RNG -// is the really heavy hitter. -fn gen_block( - submitter: Arc>>, - lookup_table: Arc>, - mut rng: Rng, - mut length: usize, - mut block_num: usize, - block_stride: usize, -) -> io::Result<()> -{ - // Include newlines in block - length += length / LINE_LEN; - let block: &mut [u8] = &mut [b'\n'; BLOCK_LEN]; - - while length > 0 { - { - let gen_into = &mut block[..min(length, BLOCK_LEN)]; - - // Write random numbers, skipping newlines - for (i, byte) in gen_into.iter_mut().enumerate() { - if (i + 1) % (LINE_LEN + 1) != 0 { - *byte = lookup_table[rng.gen() as usize]; - } - } - } - - let write_out = { - if length >= BLOCK_LEN { &mut *block } - else if length % (LINE_LEN + 1) == 0 { &mut block[..length] } - else { &mut block[..length + 1] } - }; - - *write_out.last_mut().unwrap() = b'\n'; - loop { - match submitter.lock().unwrap().submit(write_out, block_num) { - Some(result) => { try!(result); break; } - None => std::thread::yield_now() - } - } - block_num += block_stride; - rng = rng.future((BLOCK_THOROUGHPUT * (block_stride - 1)) as u32); - length = length.saturating_sub(BLOCK_LEN * (block_stride - 1)); - - length = length.saturating_sub(BLOCK_LEN); + for thread_guard in threads { + thread_guard.join().unwrap(); } - Ok(()) } - -fn run(writer: W) -> io::Result<()> { +fn main() { let n = std::env::args_os().nth(1) .and_then(|s| s.into_string().ok()) .and_then(|n| n.parse().ok()) .unwrap_or(1000); - - let rng = &mut Rng::new(); - - // Use automatic buffering for the static version... - let mut writer = io::BufWriter::with_capacity(STDIN_BUF, writer); - try!(fasta_static(&mut writer, b">ONE Homo sapiens alu\n", ALU, n * 2)); - - // ...but the dynamic version does its own buffering already - let writer = try!(writer.into_inner()); - let submitter = Arc::new(Mutex::new(BlockSubmitter { writer: writer, waiting_on: 0 })); - - { submitter.lock().unwrap().waiting_on = 0; } - try!(fasta(&submitter, b">TWO IUB ambiguity codes\n", &IUB, rng, n * 3)); - { submitter.lock().unwrap().waiting_on = 0; } - try!(fasta(&submitter, b">THREE Homo sapiens frequency\n", &HOMOSAPIENS, rng, n * 5)); - - Ok(()) -} - - -fn main() { - run(io::stdout()).unwrap() + + let num_threads: u16 = num_cpus::get() as u16; + + let rng = Arc::new(Mutex::new(MyRandom::new(n*3, num_threads))); + let alu: &[u8] = b"GGCCGGGCGCGGTGGCTCACGCCTGTAATCCCAGCACTTT\ + GGGAGGCCGAGGCGGGCGGATCACCTGAGGTCAGGAGTTC\ + GAGACCAGCCTGGCCAACATGGTGAAACCCCGTCTCTACT\ + AAAAATACAAAAATTAGCCGGGCGTGGTGGCGCGCGCCTG\ + TAATCCCAGCTACTCGGGAGGCTGAGGCAGGAGAATCGCT\ + TGAACCCGGGAGGCGGAGGTTGCAGTGAGCCGAGATCGCG\ + CCACTGCACTCCAGCCTGGGCGACAGAGCGAGACTCCGTCT\ + CAAAAA"; + + let iub = &[('a', 0.27), ('c', 0.12), ('g', 0.12), + ('t', 0.27), ('B', 0.02), ('D', 0.02), + ('H', 0.02), ('K', 0.02), ('M', 0.02), + ('N', 0.02), ('R', 0.02), ('S', 0.02), + ('V', 0.02), ('W', 0.02), ('Y', 0.02)]; + + let homosapiens = &[('a', 0.3029549426680), + ('c', 0.1979883004921), + ('g', 0.1975473066391), + ('t', 0.3015094502008)]; + + make_fasta2(">ONE Homo sapiens alu\n", + alu.iter().cycle().map(|c| *c), n * 2).unwrap(); + make_fasta(">TWO IUB ambiguity codes\n", + rng.clone(), make_random(iub), num_threads).unwrap(); + + rng.lock().unwrap().reset(n*5); + + make_fasta(">THREE Homo sapiens frequency\n", + rng, make_random(homosapiens), num_threads).unwrap(); + + io::stdout().flush().unwrap(); }