Skip to content

Commit

Permalink
make use of sqlite transactions
Browse files Browse the repository at this point in the history
this should help to eliminate more application-level locks in database
wrappers
  • Loading branch information
eugene-babichenko committed Jan 15, 2020
1 parent 2fcaba1 commit ef663ee
Showing 1 changed file with 100 additions and 24 deletions.
124 changes: 100 additions & 24 deletions chain-storage-sqlite-old/src/lib.rs
@@ -1,6 +1,6 @@
use chain_core::property::{Block, BlockId, Serialize};
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{types::Value, OpenFlags};
use rusqlite::{types::Value, Connection, OpenFlags};
use std::path::Path;
use thiserror::Error;

Expand Down Expand Up @@ -142,7 +142,16 @@ where
pub fn put_block(&mut self, block: &B) -> Result<(), Error> {
let block_hash = block.id();

if self.block_exists(&block_hash)? {
let mut conn = self
.pool
.get()
.map_err(|err| Error::BackendError(Box::new(err)))?;

let tx = conn
.transaction()
.map_err(|err| Error::BackendError(Box::new(err)))?;

if self.block_exists_internal(&tx, &block_hash)? {
return Ok(());
}

Expand All @@ -157,10 +166,12 @@ where
let chain_length = if parent_hash == B::Id::zero() {
1
} else {
let parent_info = self.get_block_info(&parent_hash).map_err(|e| match e {
Error::BlockNotFound => Error::MissingParent,
e => e,
})?;
let parent_info =
self.get_block_info_internal(&tx, &parent_hash)
.map_err(|e| match e {
Error::BlockNotFound => Error::MissingParent,
e => e,
})?;
assert!(parent_info.chain_length > 0);
let chain_length = 1 + parent_info.chain_length;
let fast_link = compute_fast_link(chain_length);
Expand All @@ -183,15 +194,6 @@ where
back_links,
};

let mut conn = self
.pool
.get()
.map_err(|err| Error::BackendError(Box::new(err)))?;

let tx = conn
.transaction()
.map_err(|err| Error::BackendError(Box::new(err)))?;

let worked = tx
.prepare_cached("insert into Blocks (hash, block) values(?, ?)")
.map_err(|err| Error::BackendError(Box::new(err)))?
Expand Down Expand Up @@ -270,9 +272,20 @@ where
}

pub fn get_block_info(&self, block_hash: &B::Id) -> Result<BlockInfo<B::Id>, Error> {
self.pool
let connection = self
.pool
.get()
.map_err(|err| Error::BackendError(Box::new(err)))?
.map_err(|err| Error::BackendError(Box::new(err)))?;

self.get_block_info_internal(&connection, block_hash)
}

fn get_block_info_internal(
&self,
connection: &Connection,
block_hash: &B::Id,
) -> Result<BlockInfo<B::Id>, Error> {
connection
.prepare_cached(
"select depth, parent, fast_distance, fast_hash from BlockInfo where hash = ?",
)
Expand Down Expand Up @@ -343,7 +356,20 @@ where

/// Check whether a block exists.
pub fn block_exists(&self, block_hash: &B::Id) -> Result<bool, Error> {
match self.get_block_info(block_hash) {
let connection = self
.pool
.get()
.map_err(|err| Error::BackendError(Box::new(err)))?;

self.block_exists_internal(&connection, block_hash)
}

fn block_exists_internal(
&self,
connection: &Connection,
block_hash: &B::Id,
) -> Result<bool, Error> {
match self.get_block_info_internal(connection, block_hash) {
Ok(_) => Ok(true),
Err(Error::BlockNotFound) => Ok(false),
Err(err) => Err(err),
Expand All @@ -363,13 +389,22 @@ where
return Ok(Some(0));
}

let descendent = self.get_block_info(&descendent)?;
let mut connection = self
.pool
.get()
.map_err(|err| Error::BackendError(Box::new(err)))?;

let tx = connection
.transaction()
.map_err(|err| Error::BackendError(Box::new(err)))?;

let descendent = self.get_block_info_internal(&tx, &descendent)?;

if ancestor == &B::Id::zero() {
return Ok(Some(descendent.chain_length));
}

let ancestor = self.get_block_info(&ancestor)?;
let ancestor = self.get_block_info_internal(&tx, &ancestor)?;

// Bail out right away if the "descendent" does not have a
// higher chain_length.
Expand All @@ -379,7 +414,8 @@ where

// Seek back from the descendent to check whether it has the
// ancestor at the expected place.
let info = self.get_nth_ancestor(
let info = self.get_nth_ancestor_internal(
&tx,
&descendent.block_hash,
descendent.chain_length - ancestor.chain_length,
)?;
Expand All @@ -396,7 +432,24 @@ where
block_hash: &B::Id,
distance: u64,
) -> Result<BlockInfo<B::Id>, Error> {
for_path_to_nth_ancestor(self, block_hash, distance, |_| {})
let mut connection = self
.pool
.get()
.map_err(|err| Error::BackendError(Box::new(err)))?;

let tx = connection
.transaction()
.map_err(|err| Error::BackendError(Box::new(err)))?;
self.get_nth_ancestor_internal(&tx, block_hash, distance)
}

fn get_nth_ancestor_internal(
&self,
connection: &Connection,
block_hash: &B::Id,
distance: u64,
) -> Result<BlockInfo<B::Id>, Error> {
for_path_to_nth_ancestor_internal(self, connection, block_hash, distance, |_| {})
}
}

Expand All @@ -414,13 +467,36 @@ pub fn for_path_to_nth_ancestor<B, F>(
store: &BlockStore<B>,
block_hash: &B::Id,
distance: u64,
callback: F,
) -> Result<BlockInfo<B::Id>, Error>
where
B: Block,
F: FnMut(&BlockInfo<B::Id>),
{
let mut connection = store
.pool
.get()
.map_err(|err| Error::BackendError(Box::new(err)))?;

let tx = connection
.transaction()
.map_err(|err| Error::BackendError(Box::new(err)))?;

for_path_to_nth_ancestor_internal(store, &tx, block_hash, distance, callback)
}

fn for_path_to_nth_ancestor_internal<B, F>(
store: &BlockStore<B>,
connection: &Connection,
block_hash: &B::Id,
distance: u64,
mut callback: F,
) -> Result<BlockInfo<B::Id>, Error>
where
B: Block,
F: FnMut(&BlockInfo<B::Id>),
{
let mut cur_block_info = store.get_block_info(block_hash)?;
let mut cur_block_info = store.get_block_info_internal(connection, block_hash)?;

if distance >= cur_block_info.chain_length {
// FIXME: return error
Expand All @@ -446,7 +522,7 @@ where
.unwrap()
.clone();
callback(&cur_block_info);
cur_block_info = store.get_block_info(&best_link.block_hash)?;
cur_block_info = store.get_block_info_internal(connection, &best_link.block_hash)?;
}

assert_eq!(target, cur_block_info.chain_length);
Expand Down

0 comments on commit ef663ee

Please sign in to comment.