Skip to content

Commit

Permalink
attaca: prepare for SSH backend, add ssh2 crate
Browse files Browse the repository at this point in the history
  • Loading branch information
sdleffler committed Nov 15, 2017
1 parent d4f4a8c commit 2cede9c
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 22 deletions.
70 changes: 70 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -32,6 +32,7 @@ serde = "1.0.11"
serde_derive = "1.0.11"
sha3 = "0.6.0"
slog = "2.0.6"
ssh2 = "0.3.2"
stable_deref_trait = "1.0.0"
toml = "0.4.4"
typenum = "1.9.0"
Expand Down
4 changes: 2 additions & 2 deletions src/bin/remote/add.rs
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;
use clap::{App, SubCommand, Arg, ArgGroup, ArgMatches};
use itertools::Itertools;

use attaca::repository::{RemoteCfg, CephCfg, EtcdCfg, Repository};
use attaca::repository::{RemoteCfg, ObjectStoreCfg, CephCfg, EtcdCfg, Repository};

use errors::*;

Expand Down Expand Up @@ -144,7 +144,7 @@ pub fn go(repository: &mut Repository, matches: &ArgMatches) -> Result<()> {
repository.config.remotes.insert(
name,
RemoteCfg {
object_store,
object_store: ObjectStoreCfg::Ceph(object_store),
ref_store: EtcdCfg::default(),
},
);
Expand Down
34 changes: 27 additions & 7 deletions src/bin/remote/list.rs
@@ -1,6 +1,8 @@
use std::fmt::Write;

use clap::{App, SubCommand, ArgMatches};

use attaca::Repository;
use attaca::repository::{ObjectStoreCfg, Repository};

use errors::*;

Expand All @@ -12,12 +14,30 @@ pub fn command() -> App<'static, 'static> {

pub fn go(repository: &mut Repository, _matches: &ArgMatches) -> Result<()> {
for (name, remote) in repository.config.remotes.iter() {
if let Some(ref ceph_conf) = remote.object_store.conf_file.as_ref() {
println!("{}: ceph.conf path `{}`", name, ceph_conf.display());
} else if let Some(ref hosts) = remote.object_store.conf_options.get("mon_host") {
println!("{}: from hosts `{}`", name, hosts);
} else {
println!("{}: no `mon_host` or ceph.conf entry", name);
match remote.object_store {
ObjectStoreCfg::Ceph(ref ceph_cfg) => {
if let Some(ref ceph_conf) = ceph_cfg.conf_file.as_ref() {
println!("{}: ceph.conf path `{}`", name, ceph_conf.display());
} else if let Some(ref hosts) = ceph_cfg.conf_options.get("mon_host") {
println!("{}: from hosts `{}`", name, hosts);
} else {
println!("{}: no `mon_host` or ceph.conf entry", name);
}
}
ObjectStoreCfg::Ssh(ref ssh_cfg) => {
let mut out = String::new();
write!(
out,
"{}: {}@{}",
name,
ssh_cfg.username,
ssh_cfg.address.ip()
)?;
if ssh_cfg.address.port() != 22 {
write!(out, ":[{}]", ssh_cfg.address.port())?;
}
println!("{}", out);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/errors.rs
Expand Up @@ -17,6 +17,7 @@ error_chain! {
Io(::std::io::Error);
Nul(::std::ffi::NulError);
ParseInt(::std::num::ParseIntError);
Ssh2(::ssh2::Error);
TomlSer(::toml::ser::Error);
TomlDe(::toml::de::Error);
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -41,6 +41,7 @@ extern crate seahash;
extern crate serde_derive;
extern crate sequence_trie;
extern crate sha3;
extern crate ssh2;
extern crate stable_deref_trait;
extern crate toml;
extern crate typenum;
Expand Down
45 changes: 38 additions & 7 deletions src/repository.rs
Expand Up @@ -18,6 +18,7 @@
use std::collections::HashMap;
use std::fs::{self, File};
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;

Expand All @@ -37,6 +38,16 @@ use store::{Local, Remote, Ceph};
use trace::Trace;


#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SshCfg {
/// Socket address to connect to with SSH.
pub address: SocketAddr,

/// SSH username.
pub username: String,
}


/// The persistent configuration data for a single pool of a RADOS cluster.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CephCfg {
Expand All @@ -55,6 +66,13 @@ pub struct CephCfg {
}


#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ObjectStoreCfg {
Ceph(CephCfg),
Ssh(SshCfg),
}


/// The persistent configuration data for an etcd cluster.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct EtcdCfg {
Expand All @@ -63,13 +81,20 @@ pub struct EtcdCfg {
}


#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RefStoreCfg {
Etcd(EtcdCfg),
Ssh(SshCfg),
}


/// The persistent configuration data for a single remote.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteCfg {
/// The remote object store.
///
/// TODO: Support object stores other than Ceph/RADOS.
pub object_store: CephCfg,
pub object_store: ObjectStoreCfg,

/// The remote ref store.
///
Expand Down Expand Up @@ -381,12 +406,18 @@ impl Repository {
},
)?;
let local = Local::new(&self.paths, &local_catalog, io_pool);
Remote::Ceph(Ceph::connect(
local,
&remote_catalog,
remote_config,
io_pool,
)?)

match remote_config.object_store {
ObjectStoreCfg::Ceph(ref ceph_cfg) => {
Remote::Ceph(Ceph::connect(
local,
&remote_catalog,
ceph_cfg,
io_pool,
)?)
}
ObjectStoreCfg::Ssh(ref _ssh_cfg) => unimplemented!(),
}
};

Ok(Context::new(self, trace, remote, marshal_pool, io_pool))
Expand Down
12 changes: 6 additions & 6 deletions src/store/ceph.rs
Expand Up @@ -14,7 +14,7 @@ use rad::{ConnectionBuilder, Connection};
use catalog::Catalog;
use errors::*;
use marshal::{Hashed, ObjectHash, Object};
use repository::RemoteCfg;
use repository::CephCfg;
use store::{ObjectStore, Local};


Expand Down Expand Up @@ -45,21 +45,21 @@ impl Ceph {
pub fn connect(
local: Local,
remote_catalog: &Catalog,
remote_config: &RemoteCfg,
remote_config: &CephCfg,
io_pool: &CpuPool,
) -> Result<Self> {
let conn = {
let mut builder = ConnectionBuilder::with_user(&remote_config.object_store.user)
let mut builder = ConnectionBuilder::with_user(&remote_config.user)
.chain_err(|| ErrorKind::RemoteConnectInit)?;

if let Some(ref conf_path) = remote_config.object_store.conf_file {
if let Some(ref conf_path) = remote_config.conf_file {
builder = builder.read_conf_file(conf_path).chain_err(|| {
ErrorKind::RemoteConnectReadConf
})?;
}

builder = remote_config
.object_store

.conf_options
.iter()
.fold(Ok(builder), |acc, (key, value)| {
Expand All @@ -70,7 +70,7 @@ impl Ceph {
Mutex::new(builder.connect().chain_err(|| ErrorKind::RemoteConnect)?)
};

let pool = remote_config.object_store.pool.clone();
let pool = remote_config.pool.clone();

Ok(Ceph {
local,
Expand Down

0 comments on commit 2cede9c

Please sign in to comment.