Skip to content
This repository has been archived by the owner on Sep 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #50 from ustulation/websocket-logger
Browse files Browse the repository at this point in the history
Web Socket Logging
  • Loading branch information
madadam committed Apr 15, 2016
2 parents bcb3f67 + 3506dfd commit 6e7e527
Show file tree
Hide file tree
Showing 7 changed files with 379 additions and 45 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ quick-error = "1.0.0"
regex = "~0.1.62"
rustc-serialize = "~0.3.19"
toml = "~0.1.28"
ws = "~0.4.6"

[dev-dependencies]
time = "~0.1.35"
3 changes: 2 additions & 1 deletion src/event_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ impl<Category: fmt::Debug + Clone, EventSubset: fmt::Debug> EventSender<Category
// it requires EventSubset to be clonable even though mpsc::Sender<EventSubset> does
// not require EventSubset to be clonable for itself being cloned.
impl<Category: fmt::Debug + Clone, EventSubset: fmt::Debug> Clone for EventSender<Category,
EventSubset> {
EventSubset>
{
fn clone(&self) -> EventSender<Category, EventSubset> {
EventSender {
event_tx: self.event_tx.clone(),
Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ extern crate toml;
#[macro_use]
extern crate log as logger;
extern crate regex;
extern crate ws;

#[allow(unused_extern_crates)]
// Needed because the crate is only used for macros
Expand All @@ -70,5 +71,3 @@ pub mod log;
pub mod event_sender;
/// Functions for serialisation and deserialisation
pub mod serialisation;

mod async_log;
138 changes: 117 additions & 21 deletions src/async_log.rs → src/log/async_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ use std::fs::{File, OpenOptions};
use std::io::{self, Stdout, Write};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{self, Sender};
use std::net::{SocketAddr, TcpStream};
use std::net::{ToSocketAddrs, SocketAddr, TcpStream};

use std::str::FromStr;
use std::borrow::Borrow;
use thread::RaiiThreadJoiner;
use log::web_socket::WebSocket;

use regex::Regex;

Expand All @@ -39,7 +42,6 @@ use toml::{Table, Value};
/// demarcates the end of a particular log message.
pub const MSG_TERMINATOR: [u8; 3] = [254, 253, 255];

/// Appender that writes to the stdout asynchronously.
pub struct AsyncConsoleAppender;

impl AsyncConsoleAppender {
Expand All @@ -62,7 +64,6 @@ impl AsyncConsoleAppenderBuilder {
}
}

/// Appender that writes to a file asynchronously.
pub struct AsyncFileAppender;

impl AsyncFileAppender {
Expand Down Expand Up @@ -109,7 +110,80 @@ impl AsyncFileAppenderBuilder {
}
}

/// Creator for `AsyncConsoleAppender`
pub struct AsyncServerAppender;

impl AsyncServerAppender {
pub fn builder<A: ToSocketAddrs>(server_addr: A) -> AsyncServerAppenderBuilder<A> {
AsyncServerAppenderBuilder {
addr: server_addr,
pattern: PatternLayout::default(),
no_delay: true,
}
}
}

pub struct AsyncServerAppenderBuilder<A> {
addr: A,
pattern: PatternLayout,
no_delay: bool,
}

impl<A: ToSocketAddrs> AsyncServerAppenderBuilder<A> {
pub fn pattern(self, pattern: PatternLayout) -> Self {
AsyncServerAppenderBuilder {
addr: self.addr,
pattern: pattern,
no_delay: self.no_delay,
}
}

pub fn no_delay(self, no_delay: bool) -> Self {
AsyncServerAppenderBuilder {
addr: self.addr,
pattern: self.pattern,
no_delay: no_delay,
}
}

pub fn build(self) -> io::Result<AsyncAppender> {
use net2::TcpStreamExt;

let stream = try!(TcpStream::connect(self.addr));
try!(stream.set_nodelay(self.no_delay));
Ok(AsyncAppender::new(stream, self.pattern))
}
}

pub struct AsyncWebSockAppender;

impl AsyncWebSockAppender {
pub fn builder<U: Borrow<str>>(server_url: U) -> AsyncWebSockAppenderBuilder<U> {
AsyncWebSockAppenderBuilder {
url: server_url,
pattern: PatternLayout::default(),
}
}
}

pub struct AsyncWebSockAppenderBuilder<U> {
url: U,
pattern: PatternLayout,
}

impl<U: Borrow<str>> AsyncWebSockAppenderBuilder<U> {
pub fn pattern(self, pattern: PatternLayout) -> Self {
AsyncWebSockAppenderBuilder {
url: self.url,
pattern: pattern,
}
}

pub fn build(self) -> io::Result<AsyncAppender> {
let ws = try!(WebSocket::new(self.url));
Ok(AsyncAppender::new(ws, self.pattern))
}
}

pub struct AsyncConsoleAppenderCreator;

impl CreateAppender for AsyncConsoleAppenderCreator {
Expand All @@ -119,7 +193,6 @@ impl CreateAppender for AsyncConsoleAppenderCreator {
}
}

/// Creator for `AsyncFileAppender`
pub struct AsyncFileAppenderCreator;

impl CreateAppender for AsyncFileAppenderCreator {
Expand All @@ -146,25 +219,47 @@ impl CreateAppender for AsyncFileAppenderCreator {
}
}

/// Creator for `AsyncServerAppender`
pub struct AsyncServerAppenderCreator;

impl CreateAppender for AsyncServerAppenderCreator {
fn create_appender(&self, mut config: Table) -> Result<Box<Append>, Box<Error>> {
use net2::TcpStreamExt;

let server_addr = match config.remove("server_addr") {
Some(Value::String(addr)) => try!(SocketAddr::from_str(&addr[..])),
Some(_) => {
return Err(Box::new(ConfigError("`server_addr` must be a string".to_owned())))
}
None => return Err(Box::new(ConfigError("`server_addr` is required".to_owned()))),
};
let no_delay = match config.remove("no_delay") {
Some(Value::Boolean(no_delay)) => no_delay,
Some(_) => return Err(Box::new(ConfigError("`no_delay` must be a boolean".to_owned()))),
None => true,
};
let pattern = try!(parse_pattern(&mut config));

Ok(Box::new(try!(AsyncServerAppender::builder(server_addr)
.pattern(pattern)
.no_delay(no_delay)
.build())))
}
}

pub struct AsyncWebSockAppenderCreator;

impl CreateAppender for AsyncWebSockAppenderCreator {
fn create_appender(&self, mut config: Table) -> Result<Box<Append>, Box<Error>> {
let server_url = match config.remove("server_url") {
Some(Value::String(url)) => url,
Some(_) => {
return Err(Box::new(ConfigError("`server_url` must be a string".to_owned())))
}
None => return Err(Box::new(ConfigError("`server_url` is required".to_owned()))),
};
let pattern = try!(parse_pattern(&mut config));

let stream = try!(TcpStream::connect(server_addr));
try!(stream.set_nodelay(true));
Ok(Box::new(AsyncAppender::new(stream, pattern)))
Ok(Box::new(try!(AsyncWebSockAppender::builder(server_url)
.pattern(pattern)
.build())))
}
}

Expand Down Expand Up @@ -203,8 +298,7 @@ pub struct AsyncAppender {
}

impl AsyncAppender {
/// Construct an AsyncAppender
pub fn new<W: 'static + SyncWrite + Send>(mut writer: W, pattern: PatternLayout) -> Self {
fn new<W: 'static + SyncWrite + Send>(mut writer: W, pattern: PatternLayout) -> Self {
let (tx, rx) = mpsc::channel::<AsyncEvent>();

let joiner = thread!("AsyncLog", move || {
Expand Down Expand Up @@ -253,32 +347,34 @@ impl Drop for AsyncAppender {
}
}

/// Trait to be implemented for anything utilising `AsyncAppender`
pub trait SyncWrite {
trait SyncWrite {
fn sync_write(&mut self, buf: &[u8]) -> io::Result<()>;
}

impl SyncWrite for Stdout {
fn sync_write(&mut self, buf: &[u8]) -> io::Result<()> {
let mut out = self.lock();
try!(out.write_all(buf));
try!(out.flush());
Ok(())
out.flush()
}
}

impl SyncWrite for File {
fn sync_write(&mut self, buf: &[u8]) -> io::Result<()> {
try!(self.write_all(buf));
try!(self.flush());
Ok(())
self.flush()
}
}

impl SyncWrite for TcpStream {
fn sync_write(&mut self, buf: &[u8]) -> io::Result<()> {
let _ = try!(self.write_all(&buf));
let _ = try!(self.write_all(&MSG_TERMINATOR[..]));
Ok(())
self.write_all(&MSG_TERMINATOR[..])
}
}

impl SyncWrite for WebSocket {
fn sync_write(&mut self, buf: &[u8]) -> io::Result<()> {
self.write_all(buf)
}
}
Loading

0 comments on commit 6e7e527

Please sign in to comment.