Skip to content

Commit

Permalink
update random-access for u64 support
Browse files Browse the repository at this point in the history
  • Loading branch information
substack committed Feb 2, 2020
1 parent 5053860 commit b514534
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 50 deletions.
10 changes: 2 additions & 8 deletions Cargo.toml
Expand Up @@ -8,17 +8,11 @@ bincode = "1.0.1"
failure = "0.1.5"
lru = "0.1.13"
num-traits = "0.2.6"
random-access-disk = "0.8.0"
random-access-storage = "2.0.0"
random-access-disk = "1.0.0"
random-access-storage = "3.0.0"
serde = "1.0.82"

[dev-dependencies]
rand = "0.6.1"
random = "0.12.2"
tempfile = "3.0.7"

[patch.crates-io.random-access-disk]
path = "../datrs/random-access-disk"

[patch.crates-io.random-access-storage]
path = "../datrs/random-access-storage"
14 changes: 8 additions & 6 deletions src/data.rs
Expand Up @@ -73,7 +73,7 @@ where S: RandomAccess<Error=Error>, P: Point, V: Value {
data[0..4].copy_from_slice(&len.to_be_bytes());
data[4..6].copy_from_slice(&(bitfield_len as u16).to_be_bytes());
let offset = self.store.len()? as u64;
self.store.write(offset as usize, &data)?;
self.store.write(offset, &data)?;
let bbox = match P::bounds(&rows.iter().map(|(p,_)| *p).collect()) {
None => bail!["invalid data at offset {}", offset],
Some(bbox) => bbox
Expand Down Expand Up @@ -165,7 +165,7 @@ where S: RandomAccess<Error=Error>, P: Point, V: Value {
};
let max_len = 2 + (max_i+7)/8;
let guess = max_len;
let header = read_block(&mut self.store, *block, max_len, guess)?;
let mut header = read_block(&mut self.store, *block, max_len, guess)?;
for index in indexes.iter() {
header[2+index/8] &= 1<<(index%8);
}
Expand Down Expand Up @@ -219,7 +219,7 @@ where S: RandomAccess<Error=Error>, P: Point {
}
pub fn list (&mut self) -> Result<Vec<(u64,P,u64)>,Error> {
let mut results = vec![];
let bsize = <P::Range as Point>::size_of();
let bsize = <P::Range as Point>::size_of() as u64;
let n = 8 + bsize + 8;
let len = self.store.len()?;
let buf_size = {
Expand All @@ -228,9 +228,11 @@ where S: RandomAccess<Error=Error>, P: Point {
};
for j in 0..(len+buf_size-1)/buf_size {
let buf = self.store.read(j*buf_size,((j+1)*buf_size).min(len))?;
for i in 0..buf.len()/n {
let offset = i * n;
results.push(self.bincode.deserialize(&buf[offset..offset+n])?);
for i in 0..buf.len()/(n as usize) {
let offset = i * (n as usize);
results.push(self.bincode.deserialize(
&buf[offset..offset+(n as usize)])?
);
}
}
Ok(results)
Expand Down
6 changes: 3 additions & 3 deletions src/read_block.rs
Expand Up @@ -7,7 +7,7 @@ pub fn read_block<S> (store: &mut S, offset: u64, max_size: u64, guess: u64)
where S: RandomAccess<Error=Error> {
let size_guess = guess.min(max_size - offset.min(max_size));
if size_guess < 4 { bail!["block too small for length field"] }
let fbuf: Vec<u8> = store.read(offset as usize, size_guess as usize)?;
let fbuf: Vec<u8> = store.read(offset, size_guess)?;
ensure_eq![fbuf.len() as u64, size_guess, "requested {} bytes, received {}",
size_guess, fbuf.len()];
let len = u32::from_be_bytes([fbuf[0],fbuf[1],fbuf[2],fbuf[3]]) as u64;
Expand All @@ -29,8 +29,8 @@ where S: RandomAccess<Error=Error> {
Ordering::Less => {
buf.extend_from_slice(&fbuf[4..]);
buf.extend(store.read(
(offset+(fbuf.len() as u64)) as usize,
(len-(fbuf.len() as u64)) as usize
offset+(fbuf.len() as u64),
len-(fbuf.len() as u64)
)?);
}
};
Expand Down
7 changes: 4 additions & 3 deletions src/staging.rs
Expand Up @@ -58,8 +58,9 @@ where S: RandomAccess<Error=Error>, P: Point, V: Value {
Ok(staging)
}
fn load (&mut self) -> Result<(),Error> {
let len = self.store.len()?;
let buf = self.store.read(0, len)?;
let slen = self.store.len()?;
let buf = self.store.read(0, slen)?;
let len = slen as usize;
self.rows.clear();
let mut offset = 0;
while offset < len {
Expand All @@ -86,7 +87,7 @@ where S: RandomAccess<Error=Error>, P: Point, V: Value {
self.rows.clear();
Ok(())
}
pub fn bytes (&mut self) -> Result<usize,Error> {
pub fn bytes (&mut self) -> Result<u64,Error> {
let len = self.store.len()?;
Ok(len)
}
Expand Down
4 changes: 2 additions & 2 deletions src/tree.rs
Expand Up @@ -175,7 +175,7 @@ where S: RandomAccess<Error=Error>, P: Point, V: Value {

impl<S,P,V> Tree<S,P,V>
where S: RandomAccess<Error=Error>, P: Point, V: Value {
pub fn open (mut opts: TreeOpts<S,P,V>) -> Result<Self,Error> {
pub fn open (opts: TreeOpts<S,P,V>) -> Result<Self,Error> {
let bytes = opts.store.len()? as u64;
let data_merge = Rc::new(RefCell::new(
DataMerge::new(Rc::clone(&opts.data_store))));
Expand Down Expand Up @@ -261,7 +261,7 @@ where S: RandomAccess<Error=Error>, P: Point, V: Value {
let alloc = &mut {|bytes| self.alloc(bytes) };
b.build(alloc)?
};
self.store.write(b.offset as usize, &data)?;
self.store.write(b.offset, &data)?;
self.bytes = self.bytes.max(b.offset + (data.len() as u64));
nbranches.extend(nb);
}
Expand Down
57 changes: 29 additions & 28 deletions src/write_cache.rs
Expand Up @@ -4,13 +4,13 @@ use std::io::Write;
#[derive(Debug,Clone)]
pub struct WriteCache<S> where S: RandomAccess {
store: S,
queue: Vec<(usize,Vec<u8>)>,
length: usize,
queue: Vec<(u64,Vec<u8>)>,
length: u64,
enabled: bool
}

impl<S> WriteCache<S> where S: RandomAccess {
pub fn open (mut store: S) -> Result<Self,S::Error> {
pub fn open (store: S) -> Result<Self,S::Error> {
let length = store.len()?;
Ok(Self {
store,
Expand All @@ -23,30 +23,31 @@ impl<S> WriteCache<S> where S: RandomAccess {

impl<S> RandomAccess for WriteCache<S> where S: RandomAccess {
type Error = S::Error;
fn write (&mut self, offset: usize, data: &[u8]) -> Result<(),Self::Error> {
fn write (&mut self, offset: u64, data: &[u8]) -> Result<(),Self::Error> {
if !self.enabled { return self.store.write(offset, data) }

let new_range = (offset,offset+data.len());
let new_range = (offset,offset+(data.len() as u64));
let overlapping: Vec<usize> = (0..self.queue.len()).filter(|i| {
let q = &self.queue[*i];
overlaps(new_range, (q.0,q.0+(q.1).len()))
overlaps(new_range, (q.0,q.0+((q.1).len() as u64)))
}).collect();

let mut start = new_range.0;
let mut end = new_range.1;
for i in overlapping.iter() {
let q = &self.queue[*i];
start = start.min(q.0);
end = end.max(q.0 + (q.1).len());
end = end.max(q.0 + ((q.1).len() as u64));
}
let mut merged = (start,vec![0;end-start]);
let mut merged = (start,vec![0;(end-start) as usize]);
for i in overlapping.iter() {
let q = &self.queue[*i];
merged.1[q.0-start..q.0-start+q.1.len()]
merged.1[(q.0-start) as usize..(q.0-start+(q.1.len() as u64)) as usize]
.copy_from_slice(&q.1.as_slice());
}
merged.1[new_range.0-start..new_range.0-start+data.len()]
.copy_from_slice(data);
merged.1[(new_range.0-start) as usize
.. (new_range.0-start+(data.len() as u64)) as usize
].copy_from_slice(data);

for (i,ov) in overlapping.iter().enumerate() {
self.queue.remove(ov-i);
Expand All @@ -65,7 +66,7 @@ impl<S> RandomAccess for WriteCache<S> where S: RandomAccess {
self.length = self.length.max(end);
Ok(())
}
fn read (&mut self, offset: usize, length: usize)
fn read (&mut self, offset: u64, length: u64)
-> Result<Vec<u8>,Self::Error> {
if !self.enabled { return self.store.read(offset, length) }
// TODO: analysis to know when to skip the read()
Expand All @@ -74,26 +75,26 @@ impl<S> RandomAccess for WriteCache<S> where S: RandomAccess {
let slen = self.store.len()?;
let mut d = if slen < offset { vec![] }
else { self.store.read(offset, (slen-offset).min(length))? };
let dlen = d.len();
let dlen = d.len() as u64;
if dlen < length {
d.extend(vec![0;length-dlen]);
d.extend(vec![0;(length-dlen) as usize]);
}
d
};
// TODO: turn these asserts into ensure_eq!
assert_eq![data.len(), length, "insufficient length"];
assert_eq![data.len() as u64, length, "insufficient length"];
for q in self.queue.iter() {
if overlaps(range,(q.0,q.0+q.1.len())) {
let q1 = q.0 + q.1.len();
let dstart = q.0.max(range.0) - range.0;
let dend = q1.min(range.1) - range.0;
let qstart = q.0.max(range.0) - q.0;
let qend = q1.min(range.1) - q.0;
if overlaps(range,(q.0,q.0+(q.1.len() as u64))) {
let q1 = q.0 + (q.1.len() as u64);
let dstart = (q.0.max(range.0) - range.0) as usize;
let dend = (q1.min(range.1) - range.0) as usize;
let qstart = (q.0.max(range.0) - q.0) as usize;
let qend = (q1.min(range.1) - q.0) as usize;
assert_eq![dend-dstart, qend-qstart, "data and range length mismatch"];
data[dstart..dend].copy_from_slice(&q.1[qstart..qend]);
}
}
assert_eq![length, data.len(),
assert_eq![length, data.len() as u64,
"requested read of {} bytes, returned {} bytes instead",
length, data.len()];
/*
Expand All @@ -103,23 +104,23 @@ impl<S> RandomAccess for WriteCache<S> where S: RandomAccess {
*/
Ok(data)
}
fn read_to_writer (&mut self, _offset: usize, _length: usize,
fn read_to_writer (&mut self, _offset: u64, _length: u64,
_buf: &mut impl Write) -> Result<(),Self::Error> {
unimplemented![];
}
fn del (&mut self, offset: usize, length: usize) -> Result<(),Self::Error> {
fn del (&mut self, offset: u64, length: u64) -> Result<(),Self::Error> {
self.store.del(offset, length)
}
fn truncate (&mut self, length: usize) -> Result<(),Self::Error> {
fn truncate (&mut self, length: u64) -> Result<(),Self::Error> {
if !self.enabled { return self.store.truncate(length) }
let mut i = 0;
while i < self.queue.len() {
let q0 = self.queue[i].0;
let qlen = self.queue[i].1.len();
let qlen = self.queue[i].1.len() as u64;
if q0 < length {
self.queue.remove(i);
} else if q0 + qlen < length {
self.queue[i].1.truncate(length - q0);
self.queue[i].1.truncate((length - q0 as u64) as usize);
i += 1;
} else {
i += 1;
Expand All @@ -129,7 +130,7 @@ impl<S> RandomAccess for WriteCache<S> where S: RandomAccess {
self.length = length;
Ok(())
}
fn len (&mut self) -> Result<usize,Self::Error> {
fn len (&self) -> Result<u64,Self::Error> {
if self.enabled { Ok(self.length) }
else { self.store.len() }
}
Expand Down

0 comments on commit b514534

Please sign in to comment.