Skip to content

Commit

Permalink
Merge pull request snapview#46 from snapview/update-dependencies
Browse files Browse the repository at this point in the history
Update to the newest `tungstenite-rs` version
  • Loading branch information
agalakhov committed Jul 19, 2018
2 parents 8b3f811 + b61ed96 commit d43c614
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 19 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Expand Up @@ -20,7 +20,7 @@ stream = ["bytes"]
futures = "0.1.17"

[dependencies.tungstenite]
version = "0.5.3"
version = "0.6.0"
default-features = false

[dependencies.bytes]
Expand All @@ -29,7 +29,7 @@ version = "0.4.6"

[dependencies.native-tls]
optional = true
version = "0.1.5"
version = "0.2.0"

[dependencies.tokio-dns-unofficial]
optional = true
Expand All @@ -46,3 +46,6 @@ version = "0.1.4"
[dev-dependencies]
tokio-core = "0.1.12"
url = "1.6.0"

[patch.crates-io]
tokio-tls = { git = "https://github.com/aep/tokio-tls.git", rev = "7865734d2167160cabd4422aca76b8478e643b41" }
3 changes: 1 addition & 2 deletions src/connect.rs
Expand Up @@ -57,8 +57,7 @@ mod encryption {
match mode {
Mode::Plain => Box::new(future::ok(StreamSwitcher::Plain(socket))),
Mode::Tls => {
Box::new(future::result(TlsConnector::builder())
.and_then(move |builder| future::result(builder.build()))
Box::new(future::result(TlsConnector::builder().build())
.and_then(move |connector| connector.connect_async(&domain, socket))
.map(|s| StreamSwitcher::Tls(s))
.map_err(|e| Error::Tls(e)))
Expand Down
106 changes: 91 additions & 15 deletions src/lib.rs
Expand Up @@ -31,12 +31,16 @@ use std::io::ErrorKind;
use futures::{Poll, Future, Async, AsyncSink, Stream, Sink, StartSend};
use tokio::io::{AsyncRead, AsyncWrite};

use tungstenite::handshake::client::{ClientHandshake, Response, Request};
use tungstenite::handshake::server::{ServerHandshake, Callback, NoCallback};
use tungstenite::handshake::{HandshakeRole, HandshakeError};
use tungstenite::protocol::{WebSocket, Message, Role};
use tungstenite::error::Error as WsError;
use tungstenite::server;
use tungstenite::{
error::Error as WsError,
handshake::{
HandshakeRole, HandshakeError,
client::{ClientHandshake, Response, Request},
server::{ServerHandshake, Callback, NoCallback},
},
protocol::{WebSocket, Message, Role, WebSocketConfig},
server,
};

#[cfg(feature="connect")]
pub use connect::{connect_async, client_async_tls};
Expand All @@ -53,14 +57,31 @@ pub use connect::{connect_async, client_async_tls};
///
/// This is typically used for clients who have already established, for
/// example, a TCP connection to the remote server.
pub fn client_async<'a, R, S>(request: R, stream: S) -> ConnectAsync<S>
pub fn client_async<'a, R, S>(
request: R,
stream: S,
) -> ConnectAsync<S>
where
R: Into<Request<'a>>,
S: AsyncRead + AsyncWrite
{
client_async_with_config(request, stream, None)
}

/// The same as `client_async()` but the one can specify a websocket configuration.
/// Please refer to `client_async()` for more details.
pub fn client_async_with_config<'a, R, S>(
request: R,
stream: S,
config: Option<WebSocketConfig>,
) -> ConnectAsync<S>
where
R: Into<Request<'a>>,
S: AsyncRead + AsyncWrite
{
ConnectAsync {
inner: MidHandshake {
inner: Some(ClientHandshake::start(stream, request.into()).handshake())
inner: Some(ClientHandshake::start(stream, request.into(), config).handshake())
}
}
}
Expand All @@ -83,19 +104,45 @@ where
accept_hdr_async(stream, NoCallback)
}

/// The same as `accept_async()` but the one can specify a websocket configuration.
/// Please refer to `accept_async()` for more details.
pub fn accept_async_with_config<S>(
stream: S,
config: Option<WebSocketConfig>,
) -> AcceptAsync<S, NoCallback>
where
S: AsyncRead + AsyncWrite,
{
accept_hdr_async_with_config(stream, NoCallback, config)
}

/// Accepts a new WebSocket connection with the provided stream.
///
/// This function does the same as `accept_async()` but accepts an extra callback
/// for header processing. The callback receives headers of the incoming
/// requests and is able to add extra headers to the reply.
pub fn accept_hdr_async<S, C>(stream: S, callback: C) -> AcceptAsync<S, C>
where
S: AsyncRead + AsyncWrite,
C: Callback,
{
accept_hdr_async_with_config(stream, callback, None)
}

/// The same as `accept_hdr_async()` but the one can specify a websocket configuration.
/// Please refer to `accept_hdr_async()` for more details.
pub fn accept_hdr_async_with_config<S, C>(
stream: S,
callback: C,
config: Option<WebSocketConfig>,
) -> AcceptAsync<S, C>
where
S: AsyncRead + AsyncWrite,
C: Callback,
{
AcceptAsync {
inner: MidHandshake {
inner: Some(server::accept_hdr(stream, callback))
inner: Some(server::accept_hdr_with_config(stream, callback, config))
}
}
}
Expand All @@ -116,15 +163,24 @@ pub struct WebSocketStream<S> {
impl<S> WebSocketStream<S> {
/// Convert a raw socket into a WebSocketStream without performing a
/// handshake.
pub fn from_raw_socket(stream: S, role: Role) -> Self {
let ws = WebSocket::from_raw_socket(stream, role);
pub fn from_raw_socket(
stream: S,
role: Role,
config: Option<WebSocketConfig>,
) -> Self {
let ws = WebSocket::from_raw_socket(stream, role, config);
WebSocketStream { inner: ws }
}

/// Convert a raw socket into a WebSocketStream without performing a
/// handshake.
pub fn from_partially_read(stream: S, part: Vec<u8>, role: Role) -> Self {
let ws = WebSocket::from_partially_read(stream, part, role);
pub fn from_partially_read(
stream: S,
part: Vec<u8>,
role: Role,
config: Option<WebSocketConfig>,
) -> Self {
let ws = WebSocket::from_partially_read(stream, part, role, config);
WebSocketStream { inner: ws }
}
}
Expand All @@ -143,8 +199,7 @@ impl<T> Sink for WebSocketStream<T> where T: AsyncRead + AsyncWrite {
type SinkError = WsError;

fn start_send(&mut self, item: Message) -> StartSend<Message, WsError> {
try!(self.inner.write_message(item).to_async());
Ok(AsyncSink::Ready)
self.inner.write_message(item).to_start_send()
}

fn poll_complete(&mut self) -> Poll<(), WsError> {
Expand Down Expand Up @@ -238,3 +293,24 @@ impl<T> ToAsync for Result<T, WsError> {
}
}

trait ToStartSend {
type T;
type E;
fn to_start_send(self) -> StartSend<Self::T, Self::E>;
}

impl ToStartSend for Result<(), WsError> {
type T = Message;
type E = WsError;
fn to_start_send(self) -> StartSend<Self::T, Self::E> {
match self {
Ok(_) => Ok(AsyncSink::Ready),
Err(error) => match error {
WsError::Io(ref err) if err.kind() == ErrorKind::WouldBlock => Ok(AsyncSink::Ready),
WsError::SendQueueFull(msg) => Ok(AsyncSink::NotReady(msg)),
err => Err(err),
}
}
}
}

0 comments on commit d43c614

Please sign in to comment.