Skip to content

Commit

Permalink
support text format (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus committed Aug 23, 2016
1 parent 9b908f2 commit aff9075
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 34 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,2 +1,3 @@
target
Cargo.lock
bin
5 changes: 4 additions & 1 deletion Cargo.toml
Expand Up @@ -11,4 +11,7 @@ dev = ["clippy"]
protobuf = "1.0"
quick-error = "0.2"
clippy = {version = "*", optional = true}
fnv = "1.0.3"
fnv = "1.0.3"

[dev-dependencies.hyper]
git = "https://github.com/hyperium/hyper.git"
11 changes: 9 additions & 2 deletions Makefile
@@ -1,4 +1,4 @@
all: format build test
all: format build test examples

build:
cargo build --features default
Expand All @@ -14,4 +14,11 @@ format:
cargo fmt -- --write-mode overwrite

clean:
cargo clean
cargo clean

examples:
cargo build --example example_embed
cargo build --example example_hyper
cp target/debug/examples/* bin/

.PHONY: all examples
62 changes: 62 additions & 0 deletions examples/example_embed.rs
@@ -0,0 +1,62 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

extern crate prom;

use std::thread;
use std::time::Duration;

use prom::encoder::{TextEncoder, Encoder};
use prom::{Counter, Opts, Registry};

fn main() {
let opts = Opts::new("test", "test help").const_label("a", "1").const_label("b", "2");
let counter = Counter::with_opts(opts).unwrap();

let r = Registry::new();
r.register(Box::new(counter.clone())).unwrap();

counter.inc();
assert_eq!(counter.get() as u64, 1);
counter.inc_by(42.0).unwrap();
assert_eq!(counter.get() as u64, 43);

let c2 = counter.clone();
thread::spawn(move || {
for _ in 0..10 {
thread::sleep(Duration::from_millis(500));
c2.inc();
}
});

thread::spawn(move || {
for _ in 0..5 {
thread::sleep(Duration::from_secs(1));
counter.inc();
}
});

// Choose your writer and encoder.
let mut buffer = Vec::<u8>::new();
let encoder = TextEncoder::new();
for _ in 0..5 {
let metric_familys = r.gather();
encoder.encode(&metric_familys, &mut buffer).unwrap();

// Output to the standard output.
println!("{}", String::from_utf8(buffer.clone()).unwrap());

buffer.clear();
thread::sleep(Duration::from_secs(1));
}
}
133 changes: 133 additions & 0 deletions examples/example_hyper.rs
@@ -0,0 +1,133 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

extern crate prom;
extern crate hyper;

use std::thread;
use std::time::Duration;

use hyper::{StatusCode, Decoder, Next, Encoder as HyperEncoder};
use hyper::header::{ContentLength, ContentType};
use hyper::net::HttpStream;
use hyper::server::{Server, Handler, Request, Response};
use hyper::mime::Mime;

use prom::errors::{Error, Result};
use prom::encoder::{Encoder, TextEncoder};
use prom::{Counter, Opts, Registry};

fn main() {
let opts = Opts::new("test", "test help").const_label("a", "1").const_label("b", "2");
let counter = Counter::with_opts(opts).unwrap();

let r = Registry::new();
r.register(Box::new(counter.clone())).unwrap();

counter.inc();
assert_eq!(counter.get() as u64, 1);
counter.inc_by(42.0).unwrap();
assert_eq!(counter.get() as u64, 43);

let c2 = counter.clone();
thread::spawn(move || {
loop {
thread::sleep(Duration::from_millis(300));
c2.inc();
}
});

thread::spawn(move || {
loop {
thread::sleep(Duration::from_millis(1500));
counter.inc();
}
});

let encoder = TextEncoder::new();
// Http server
run("127.0.0.1:9898", &r, &encoder).unwrap();
}

// run runs a http server with a Registry and a Encoder, it blocks current thread.
pub fn run<'a>(addr: &str, reg: &Registry, encoder: &'a Encoder) -> Result<()> {
let reg = reg.clone();

let addr = try!(addr.parse().or_else(|e| Err(Error::Msg(format!("{:?}", e)))));
let server = try!(Server::http(&addr).or_else(|e| Err(Error::Msg(format!("{:?}", e)))));
if let Ok((listener, server)) = server.handle(|_| HttpHandler::new(reg.clone(), encoder)) {
println!("listening {}", listener);

server.run();
}

Err(Error::Msg("http server error".to_owned()))
}

pub struct HttpHandler<'a> {
registry: Registry,
encoder: &'a (Encoder + 'a),
buffer: Vec<u8>,
write_pos: usize,
}

impl<'a> HttpHandler<'a> {
pub fn new(registry: Registry, encoder: &'a Encoder) -> HttpHandler<'a> {
HttpHandler {
registry: registry,
encoder: encoder,
buffer: Vec::new(),
write_pos: 0,
}
}
}

impl<'a> Handler<HttpStream> for HttpHandler<'a> {
fn on_request(&mut self, _: Request<HttpStream>) -> Next {
// TODO: route requests
Next::write()
}

fn on_request_readable(&mut self, _: &mut Decoder<HttpStream>) -> Next {
Next::write()
}

fn on_response(&mut self, res: &mut Response) -> Next {
let metric_familys = self.registry.gather();
if let Ok(_) = self.encoder.encode(&metric_familys, &mut self.buffer) {
res.headers_mut().set(ContentLength(self.buffer.len() as u64));
} else {
return Next::remove();
}

res.set_status(StatusCode::Ok);
res.headers_mut().set(ContentType((&self.encoder.format_type()).parse::<Mime>().unwrap()));
Next::write()
}

fn on_response_writable(&mut self, encoder: &mut HyperEncoder<HttpStream>) -> Next {
match encoder.try_write(&self.buffer[self.write_pos..]) {
Ok(Some(n)) => {
if (self.write_pos + n) == self.buffer.len() {
Next::end()
} else {
// a partial write
self.write_pos += n;
Next::write()
}
}
Ok(None) => Next::write(),
Err(_) => Next::remove(),
}
}
}

0 comments on commit aff9075

Please sign in to comment.