A replicator that takes a lazers DB and syncs couchdb data into it. It is an implementation of the algorithm described here: [here](http://docs.couchdb.org/en/1.6.1/replication/protocol. html#replication-protocol-algorithm).
extern crate lazers_traits;
extern crate futures;
extern crate backtrace;
#[macro_use]
extern crate error_chain;
use lazers_traits::prelude::*;
use futures::Future;
use futures::BoxFuture;
use futures::failed;
use futures::finished;
use std::sync::Arc;
use std::convert::From;
pub mod errors;
The standard replicator struct is just a pair of clients to sync from and to, along with the databases to use.
The two clients don't need to be of the same kind.
pub struct Replicator<From: Client + Send, To: Client + Send> {
from: From,
to: To,
from_db: DatabaseName,
to_db: DatabaseName,
create_target: bool
}
impl<From: Client + Send, To: Client + Send> Replicator<From, To> {
pub fn new(from: From, to: To, from_db: DatabaseName, to_db: DatabaseName, create_target: bool) -> Replicator<From, To> {
Replicator {
from: from,
to: to,
from_db: from_db,
to_db: to_db,
create_target: create_target
}
}
}
impl<From: Client + Send + 'static, To: Client + Send + 'static> Replicator<From, To> {
pub fn verify_peers(self) -> BoxFuture<(), Error> {
self.setup_peers(false)
}
pub fn setup_peers(self, create_target: bool) -> BoxFuture<(), Error> {
let verifier = VerifyPeers::new(self);
verifier.verify_source().and_then(|state| {
state.verify_target()
}).and_then(move |state| {
if create_target {
state.create_if_absent()
} else {
state.fail_if_absent()
}
}).and_then(|_| {
finished(())
}).boxed()
}
}
We implement the peer verification as described here.
We follow a state-machine like pattern here and name all possible states first. We label all states by using zero sized structs. They only serve as information for the type system.
trait State {}
struct Unconnected;
impl State for Unconnected {}
struct SourceExisting;
impl State for SourceExisting {}
impl From<Unconnected> for SourceExisting {
fn from(_: Unconnected) -> SourceExisting {
SourceExisting
}
}
struct TargetAbsent;
impl State for TargetAbsent {}
impl From<SourceExisting> for TargetAbsent {
fn from(_: SourceExisting) -> TargetAbsent {
TargetAbsent
}
}
struct TargetExisting;
impl State for TargetExisting {}
impl From<SourceExisting> for TargetExisting {
fn from(_: SourceExisting) -> TargetExisting {
TargetExisting
}
}
impl From<TargetAbsent> for TargetExisting {
fn from(_: TargetAbsent) -> TargetExisting {
TargetExisting
}
}
type VerifyError = String;
We then define a VerifyPeers
struct to define the flow used in the first
few steps. VerifyPeers
also wraps an instance of a CouchDB
client.
struct VerifyPeers<From: Client + Send, To: Client + Send, S: State> {
replicator: Replicator<From, To>,
#[allow(dead_code)]
state: S
}
impl<From: Client + Send, To: Client + Send, T: State> VerifyPeers<From, To, T> {
fn transition<X: State + std::convert::From<T>>(self, state: X) -> VerifyPeers<From, To, X> {
VerifyPeers { replicator: self.replicator, state: state }
}
}
impl<From: Client + Send + 'static, To: Client + Send + 'static> VerifyPeers<From, To, Unconnected> {
fn new(replicator: Replicator<From, To>) -> Self {
VerifyPeers { replicator: replicator, state: Unconnected }
}
fn verify_source(self) -> BoxFuture<VerifyPeers<From, To, SourceExisting>, Error> {
let database = self.replicator.from_db.clone();
let future_db_state = self.replicator.from.find_database(database);
future_db_state.and_then(|db_state| {
if db_state.existing() {
finished(self.transition(SourceExisting)).boxed()
} else {
failed(error("Source doesn't exist".into(), backtrace::Backtrace::new())).boxed()
}
}).boxed()
}
}
impl<From: Client + Send + 'static, To: Client + Send + 'static> VerifyPeers<From, To, SourceExisting> {
fn verify_target(self) -> BoxFuture<TargetBranch<From, To>, Error> {
let database = self.replicator.to_db.clone();
let future_db_state = self.replicator.to.find_database(database);
future_db_state.and_then(|db_state| {
if db_state.existing() {
finished(TargetBranch::Existing(self.transition(TargetExisting))).boxed()
} else {
finished(TargetBranch::Absent(self.transition(TargetAbsent))).boxed()
}
}).boxed()
}
}
impl<From: Client + Send + 'static, To: Client + Send + 'static> TargetBranch<From, To> {
fn create_if_absent(self) -> BoxFuture<VerifyPeers<From, To, TargetExisting>, Error> {
match self {
TargetBranch::Existing(s) => finished(s).boxed(),
TargetBranch::Absent(s) => {
s.create_target()
}
}
}
fn fail_if_absent(self) -> BoxFuture<VerifyPeers<From, To, TargetExisting>, Error> {
match self {
TargetBranch::Existing(s) => finished(s).boxed(),
TargetBranch::Absent(_) => {
failed(error("Target doesn't exist".into(), backtrace::Backtrace::new())).boxed()
}
}
}
}
impl<From: Client + Send + 'static, To: Client + Send + 'static> VerifyPeers<From, To, TargetAbsent> {
fn create_target(self) -> BoxFuture<VerifyPeers<From, To, TargetExisting>, Error> {
let database = self.replicator.to_db.clone();
let future_db_state = self.replicator.to.find_database(database);
future_db_state.or_create().and_then(|_| {
finished(self.transition(TargetExisting))
}).boxed()
}
}
enum TargetBranch<From: Client + Send, To: Client + Send> {
Existing(VerifyPeers<From, To, TargetExisting>),
Absent(VerifyPeers<From, To, TargetAbsent>)
}
fn error(message: String, backtrace: backtrace::Backtrace) -> Error {
Error(ErrorKind::ClientError(message), (None, Arc::new(backtrace)))
}