-
-
Notifications
You must be signed in to change notification settings - Fork 129
/
consul_interface.rs
117 lines (98 loc) · 3.66 KB
/
consul_interface.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//! Provides an interface to a Consul cluster
use log::error;
use hyper::body::Body;
use hyper::rt::Future;
use hyper::{Client, Request};
use serde_json;
use cluster::shard::Shard;
use cluster::ClusterError;
static CONSUL_PREFIX: &'static str = "services/toshi/";
/// Stub struct for a connection to Consul
pub struct ConsulInterface {
address: String,
port: String,
scheme: String,
cluster_name: Option<String>,
pub node_id: Option<String>,
}
impl ConsulInterface {
/// Sets the address of the consul service
pub fn with_address(mut self, address: String) -> Self {
self.address = address;
self
}
/// Sets the port for the Consul HTTP library
pub fn with_port(mut self, port: String) -> Self {
self.port = port;
self
}
/// Sets the scheme (http or https) for the Consul server
pub fn with_scheme(mut self, scheme: String) -> Self {
self.scheme = scheme;
self
}
/// Sets the *Toshi* cluster name
pub fn with_cluster_name(mut self, cluster_name: String) -> Self {
self.cluster_name = Some(cluster_name);
self
}
/// Sets the ID of this specific node in the Toshi cluster
pub fn with_node_id(mut self, node_id: String) -> Self {
self.node_id = Some(node_id);
self
}
/// Registers this node with Consul via HTTP API
pub fn register_node(&mut self) -> impl Future<Item = (), Error = ClusterError> {
let uri = self.base_consul_url() + &self.cluster_name() + "/" + &self.node_id() + "/";
let client = Client::new();
let req = self.put_request(&uri);
client.request(req).map(|_| ()).map_err(|e| {
error!("Error registering node: {:?}", e);
std::process::exit(1);
})
}
/// Registers a cluster with Consul via the HTTP API
pub fn register_cluster(&self) -> impl Future<Item = (), Error = ClusterError> {
let uri = self.base_consul_url() + &self.cluster_name() + "/";
let client = Client::new();
let req = self.put_request(&uri);
client.request(req).map(|_| ()).map_err(|_| ClusterError::FailedRegisteringNode)
}
/// Registers a shard with the Consul cluster
pub fn register_shard<T: Shard + serde::Serialize>(&mut self, shard: &T) -> impl Future<Item = (), Error = ()> {
let uri = self.base_consul_url() + &self.cluster_name() + "/" + &shard.shard_id().to_hyphenated_ref().to_string() + "/";
let client = Client::new();
let json_body = serde_json::to_string(&shard).unwrap();
let req = self.put_request_with_body(&uri, json_body);
client.request(req).map(|_| ()).map_err(|e| {
error!("Error registering shard: {:?}", e);
std::process::exit(1);
})
}
fn base_consul_url(&self) -> String {
self.scheme.clone() + "://" + &self.address + ":" + &self.port + "/v1/kv/" + CONSUL_PREFIX
}
fn put_request(&self, uri: &str) -> Request<Body> {
Request::builder().method("PUT").uri(uri).body(Body::empty()).unwrap()
}
fn put_request_with_body(&self, uri: &str, payload: String) -> Request<Body> {
Request::builder().method("PUT").uri(uri).body(Body::from(payload)).unwrap()
}
fn cluster_name(&self) -> String {
self.cluster_name.clone().unwrap()
}
fn node_id(&self) -> String {
self.node_id.clone().unwrap()
}
}
impl Default for ConsulInterface {
fn default() -> ConsulInterface {
ConsulInterface {
address: "127.0.0.1".into(),
port: "8500".into(),
scheme: String::from("http"),
cluster_name: None,
node_id: None,
}
}
}