diff --git a/.gitignore b/.gitignore index a9d37c56..8a84b3eb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target Cargo.lock +bin diff --git a/Cargo.toml b/Cargo.toml index ca75013c..5863e26b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,4 +11,7 @@ dev = ["clippy"] protobuf = "1.0" quick-error = "0.2" clippy = {version = "*", optional = true} -fnv = "1.0.3" \ No newline at end of file +fnv = "1.0.3" + +[dev-dependencies.hyper] +git = "https://github.com/hyperium/hyper.git" diff --git a/Makefile b/Makefile index c8d99230..19fea5a7 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -all: format build test +all: format build test examples build: cargo build --features default @@ -14,4 +14,11 @@ format: cargo fmt -- --write-mode overwrite clean: - cargo clean \ No newline at end of file + cargo clean + +examples: + cargo build --example example_embed + cargo build --example example_hyper + cp target/debug/examples/* bin/ + +.PHONY: all examples diff --git a/examples/example_embed.rs b/examples/example_embed.rs new file mode 100644 index 00000000..8c675c26 --- /dev/null +++ b/examples/example_embed.rs @@ -0,0 +1,62 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +extern crate prom; + +use std::thread; +use std::time::Duration; + +use prom::encoder::{TextEncoder, Encoder}; +use prom::{Counter, Opts, Registry}; + +fn main() { + let opts = Opts::new("test", "test help").const_label("a", "1").const_label("b", "2"); + let counter = Counter::with_opts(opts).unwrap(); + + let r = Registry::new(); + r.register(Box::new(counter.clone())).unwrap(); + + counter.inc(); + assert_eq!(counter.get() as u64, 1); + counter.inc_by(42.0).unwrap(); + assert_eq!(counter.get() as u64, 43); + + let c2 = counter.clone(); + thread::spawn(move || { + for _ in 0..10 { + thread::sleep(Duration::from_millis(500)); + c2.inc(); + } + }); + + thread::spawn(move || { + for _ in 0..5 { + thread::sleep(Duration::from_secs(1)); + counter.inc(); + } + }); + + // Choose your writer and encoder. + let mut buffer = Vec::::new(); + let encoder = TextEncoder::new(); + for _ in 0..5 { + let metric_familys = r.gather(); + encoder.encode(&metric_familys, &mut buffer).unwrap(); + + // Output to the standard output. + println!("{}", String::from_utf8(buffer.clone()).unwrap()); + + buffer.clear(); + thread::sleep(Duration::from_secs(1)); + } +} diff --git a/examples/example_hyper.rs b/examples/example_hyper.rs new file mode 100644 index 00000000..3ee199e1 --- /dev/null +++ b/examples/example_hyper.rs @@ -0,0 +1,133 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +extern crate prom; +extern crate hyper; + +use std::thread; +use std::time::Duration; + +use hyper::{StatusCode, Decoder, Next, Encoder as HyperEncoder}; +use hyper::header::{ContentLength, ContentType}; +use hyper::net::HttpStream; +use hyper::server::{Server, Handler, Request, Response}; +use hyper::mime::Mime; + +use prom::errors::{Error, Result}; +use prom::encoder::{Encoder, TextEncoder}; +use prom::{Counter, Opts, Registry}; + +fn main() { + let opts = Opts::new("test", "test help").const_label("a", "1").const_label("b", "2"); + let counter = Counter::with_opts(opts).unwrap(); + + let r = Registry::new(); + r.register(Box::new(counter.clone())).unwrap(); + + counter.inc(); + assert_eq!(counter.get() as u64, 1); + counter.inc_by(42.0).unwrap(); + assert_eq!(counter.get() as u64, 43); + + let c2 = counter.clone(); + thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(300)); + c2.inc(); + } + }); + + thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(1500)); + counter.inc(); + } + }); + + let encoder = TextEncoder::new(); + // Http server + run("127.0.0.1:9898", &r, &encoder).unwrap(); +} + +// run runs a http server with a Registry and a Encoder, it blocks current thread. +pub fn run<'a>(addr: &str, reg: &Registry, encoder: &'a Encoder) -> Result<()> { + let reg = reg.clone(); + + let addr = try!(addr.parse().or_else(|e| Err(Error::Msg(format!("{:?}", e))))); + let server = try!(Server::http(&addr).or_else(|e| Err(Error::Msg(format!("{:?}", e))))); + if let Ok((listener, server)) = server.handle(|_| HttpHandler::new(reg.clone(), encoder)) { + println!("listening {}", listener); + + server.run(); + } + + Err(Error::Msg("http server error".to_owned())) +} + +pub struct HttpHandler<'a> { + registry: Registry, + encoder: &'a (Encoder + 'a), + buffer: Vec, + write_pos: usize, +} + +impl<'a> HttpHandler<'a> { + pub fn new(registry: Registry, encoder: &'a Encoder) -> HttpHandler<'a> { + HttpHandler { + registry: registry, + encoder: encoder, + buffer: Vec::new(), + write_pos: 0, + } + } +} + +impl<'a> Handler for HttpHandler<'a> { + fn on_request(&mut self, _: Request) -> Next { + // TODO: route requests + Next::write() + } + + fn on_request_readable(&mut self, _: &mut Decoder) -> Next { + Next::write() + } + + fn on_response(&mut self, res: &mut Response) -> Next { + let metric_familys = self.registry.gather(); + if let Ok(_) = self.encoder.encode(&metric_familys, &mut self.buffer) { + res.headers_mut().set(ContentLength(self.buffer.len() as u64)); + } else { + return Next::remove(); + } + + res.set_status(StatusCode::Ok); + res.headers_mut().set(ContentType((&self.encoder.format_type()).parse::().unwrap())); + Next::write() + } + + fn on_response_writable(&mut self, encoder: &mut HyperEncoder) -> Next { + match encoder.try_write(&self.buffer[self.write_pos..]) { + Ok(Some(n)) => { + if (self.write_pos + n) == self.buffer.len() { + Next::end() + } else { + // a partial write + self.write_pos += n; + Next::write() + } + } + Ok(None) => Next::write(), + Err(_) => Next::remove(), + } + } +} diff --git a/src/encoder.rs b/src/encoder.rs new file mode 100644 index 00000000..06e36f83 --- /dev/null +++ b/src/encoder.rs @@ -0,0 +1,222 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Write; + +use errors::{Result, Error}; +use proto::MetricFamily; +use proto::{self, MetricType}; + +pub trait Encoder { + /// `encode` converts a slice of MetricFamily proto messages into target + /// format and writes the resulting lines to `writer`. It returns the number + /// of bytes written and any error encountered. This function does not + /// perform checks on the content of the metric and label names, + /// i.e. invalid metric or label names will result in invalid text format + /// output. + fn encode(&self, &[MetricFamily], &mut Write) -> Result<()>; + + /// `format_type` returns target format. + fn format_type(&self) -> &str; +} + +pub type Format = &'static str; + +pub const TEXT_FORMAT: Format = "text/plain; version=0.0.4"; + +/// Implementation of an `Encoder` that converts a `MetricFamily` proto message +/// into text format. +#[derive(Debug, Default)] +pub struct TextEncoder; + +impl TextEncoder { + pub fn new() -> TextEncoder { + TextEncoder + } +} + +impl Encoder for TextEncoder { + fn encode(&self, metric_familys: &[MetricFamily], writer: &mut Write) -> Result<()> { + for mf in metric_familys { + if mf.get_metric().is_empty() { + return Err(Error::Msg("MetricFamily has no metrics".to_owned())); + } + + let name = mf.get_name(); + if name.is_empty() { + return Err(Error::Msg("MetricFamily has no name".to_owned())); + } + + let help = mf.get_help(); + if !help.is_empty() { + try!(write!(writer, "# HELP {} {}\n", name, escape_string(help, false))); + } + + let metric_type = mf.get_field_type(); + let lowercase_type = format!("{:?}", metric_type).to_lowercase(); + try!(write!(writer, "# TYPE {} {}\n", name, lowercase_type)); + + for m in mf.get_metric() { + match metric_type { + MetricType::COUNTER => { + try!(write_sample(name, m, "", "", m.get_counter().get_value(), writer)); + } + MetricType::GAUGE | MetricType::SUMMARY | MetricType::HISTOGRAM | + MetricType::UNTYPED => unimplemented!(), + } + } + } + + Ok(()) + } + + fn format_type(&self) -> &str { + TEXT_FORMAT + } +} + +/// `write_sample` writes a single sample in text format to `writer`, given the +/// metric name, the metric proto message itself, optionally an additional label +/// name and value (use empty strings if not required), and the value. +/// The function returns the number of bytes written and any error encountered. +fn write_sample(name: &str, + mc: &proto::Metric, + additional_label_name: &str, + additional_label_value: &str, + value: f64, + writer: &mut Write) + -> Result<()> { + try!(writer.write_all(name.as_bytes())); + + try!(label_pairs_to_text(mc.get_label(), + additional_label_name, + additional_label_value, + writer)); + + try!(write!(writer, " {}", value)); + + let timestamp = mc.get_timestamp_ms(); + if timestamp != 0 { + try!(write!(writer, " {}", timestamp)); + } + + try!(writer.write_all(b"\n")); + + Ok(()) +} + +/// `label_pairs_to_text` converts a slice of `LabelPair` proto messages plus +/// the explicitly given additional label pair into text formatted as required +/// by the text format and writes it to `writer`. An empty slice in combination +/// with an empty string `additional_label_name` results in nothing being +/// written. Otherwise, the label pairs are written, escaped as required by the +/// text format, and enclosed in '{...}'. The function returns the number of +/// bytes written and any error encountered. +fn label_pairs_to_text(pairs: &[proto::LabelPair], + additional_label_name: &str, + additional_label_value: &str, + writer: &mut Write) + -> Result<()> { + if pairs.is_empty() && additional_label_name.is_empty() { + return Ok(()); + } + + let mut separator = "{"; + for lp in pairs { + try!(write!(writer, + "{}{}=\"{}\"", + separator, + lp.get_name(), + escape_string(lp.get_value(), true))); + + separator = ","; + } + + if !additional_label_name.is_empty() { + try!(write!(writer, + "{}{}=\"{}\"", + separator, + additional_label_name, + escape_string(additional_label_value, true))); + } + + try!(writer.write_all(b"}")); + + Ok(()) +} + +/// `escape_string` replaces '\' by '\\', new line character by '\n', and - if +/// `include_double_quote` is true - '"' by '\"'. +pub fn escape_string(v: &str, include_double_quote: bool) -> String { + let mut escaped = String::with_capacity(v.len() * 2); + + for c in v.chars() { + match c { + '\\' | '\n' => { + escaped.extend(c.escape_default()); + } + '"' if include_double_quote => { + escaped.extend(c.escape_default()); + } + _ => { + escaped.push(c); + } + } + } + + escaped.shrink_to_fit(); + + escaped +} + +#[cfg(test)] +mod tests { + use counter::Counter; + use metrics::{Opts, Collector}; + + use super::*; + + #[test] + fn test_ecape_string() { + assert_eq!(r"\\", escape_string("\\", false)); + assert_eq!(r"a\\", escape_string("a\\", false)); + assert_eq!(r"\n", escape_string("\n", false)); + assert_eq!(r"a\n", escape_string("a\n", false)); + assert_eq!(r"\\n", escape_string("\\n", false)); + + assert_eq!(r##"\\n\""##, escape_string("\\n\"", true)); + assert_eq!(r##"\\\n\""##, escape_string("\\\n\"", true)); + assert_eq!(r##"\\\\n\""##, escape_string("\\\\n\"", true)); + assert_eq!(r##"\"\\n\""##, escape_string("\"\\n\"", true)); + } + + #[test] + fn test_text_encoder() { + let opts = Opts::new("test", "test help").const_label("a", "1").const_label("b", "2"); + let counter = Counter::with_opts(opts).unwrap(); + counter.inc(); + + let mf = counter.collect(); + let mut writer = Vec::::new(); + let encoder = TextEncoder::new(); + let txt = encoder.encode(&[mf], &mut writer); + assert!(txt.is_ok()); + + let ans = r##"# HELP test test help +# TYPE test counter +test{a="1",b="2"} 1 +"##; + + assert_eq!(ans.as_bytes(), writer.as_slice()); + } +} diff --git a/src/lib.rs b/src/lib.rs index 9184abdd..89cebff3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,13 +19,15 @@ extern crate quick_error; extern crate protobuf; extern crate fnv; -mod metrics; pub mod proto; +pub mod errors; +pub mod encoder; + +mod metrics; mod desc; // TODO: remove dead_code later. #[allow(dead_code)] mod value; -mod errors; #[allow(dead_code)] mod counter; #[allow(dead_code)] diff --git a/src/registry.rs b/src/registry.rs index 55fc2fcf..9142d74f 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::sync::{Arc, RwLock}; -use std::io::Write; +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use proto; use metrics::Collector; use errors::{Result, Error}; @@ -25,7 +26,7 @@ struct RegistryCore { } impl RegistryCore { - pub fn register(&mut self, c: Box) -> Result<()> { + fn register(&mut self, c: Box) -> Result<()> { // TODO: should simplify later. let id = { let desc = c.desc(); @@ -53,7 +54,7 @@ impl RegistryCore { Ok(()) } - pub fn unregister(&mut self, c: Box) -> Result<()> { + fn unregister(&mut self, c: Box) -> Result<()> { let desc = c.desc(); if self.colloctors_by_id.remove(&desc.id).is_none() { return Err(Error::Msg(format!("collector {:?} is not registered", desc))); @@ -63,6 +64,35 @@ impl RegistryCore { // throughout the lifetime of a program. Ok(()) } + + fn gather(&self) -> Vec { + let mut mf_by_name = HashMap::new(); + + for c in self.colloctors_by_id.values() { + let mut mf = c.collect(); + let name = mf.get_name().to_owned(); + + match mf_by_name.entry(name) { + Entry::Vacant(entry) => { + entry.insert(mf); + } + Entry::Occupied(mut entry) => { + let mut existent_mf = entry.get_mut(); + let mut existent_metrics = existent_mf.mut_metric(); + + // TODO: check type. + // TODO: check consistency. + for metric in mf.take_metric().into_iter() { + existent_metrics.push(metric); + } + } + } + } + + // TODO: metric_family injection hook. + // TODO: sort metrics. + mf_by_name.into_iter().map(|(_, m)| m).collect() + } } #[derive(Clone)] @@ -70,6 +100,17 @@ pub struct Registry { r: Arc>, } +impl Default for Registry { + fn default() -> Registry { + let r = RegistryCore { + colloctors_by_id: HashMap::new(), + dim_hashes_by_name: HashMap::new(), + }; + + Registry { r: Arc::new(RwLock::new(r)) } + } +} + impl Registry { pub fn new() -> Registry { Registry::default() @@ -83,48 +124,37 @@ impl Registry { self.r.write().unwrap().unregister(c) } - pub fn write_pb(&self, _: &mut T) -> Result<()> { - Ok(()) - } - - pub fn write_text(&self, _: &mut T) -> Result<()> { - Ok(()) - } -} - -impl Default for Registry { - fn default() -> Registry { - let r = RegistryCore { - colloctors_by_id: HashMap::new(), - dim_hashes_by_name: HashMap::new(), - }; - - Registry { r: Arc::new(RwLock::new(r)) } + pub fn gather(&self) -> Vec { + self.r.read().unwrap().gather() } } #[cfg(test)] mod tests { use std::thread; - use super::*; + use counter::Counter; + use super::*; + #[test] fn test_registry() { let r = Registry::new(); - let r1 = r.clone(); - thread::spawn(move || { - let mut w = vec![]; - r1.write_pb(&mut w).unwrap(); - }); - let counter = Counter::new("test", "test help").unwrap(); - r.register(Box::new(counter.clone())).unwrap(); counter.inc(); + let r1 = r.clone(); + let handler = thread::spawn(move || { + let metric_familys = r1.gather(); + assert_eq!(metric_familys.len(), 1); + }); + assert!(r.register(Box::new(counter.clone())).is_err()); + + assert!(handler.join().is_ok()); + assert!(r.unregister(Box::new(counter.clone())).is_ok()); assert!(r.unregister(Box::new(counter.clone())).is_err()); }