-
Notifications
You must be signed in to change notification settings - Fork 149
/
hyper-client.rs
144 lines (128 loc) · 4.2 KB
/
hyper-client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//! An HTTP+TLS client based on `hyper` and `async-native-tls`.
//!
//! Run with:
//!
//! ```
//! cargo run --example hyper-client
//! ```
use std::convert::TryInto;
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Result};
use async_native_tls::TlsStream;
use http_body_util::{BodyStream, Empty};
use hyper::body::Incoming;
use hyper::{Request, Response};
use macro_rules_attribute::apply;
use smol::{io, net::TcpStream, prelude::*, Executor};
use smol_hyper::rt::FuturesIo;
use smol_macros::main;
/// Sends a request and fetches the response.
async fn fetch(
ex: &Executor<'static>,
req: Request<Empty<&'static [u8]>>,
) -> Result<Response<Incoming>> {
// Connect to the HTTP server.
let io = {
let host = req.uri().host().context("cannot parse host")?;
match req.uri().scheme_str() {
Some("http") => {
let stream = {
let port = req.uri().port_u16().unwrap_or(80);
TcpStream::connect((host, port)).await?
};
SmolStream::Plain(stream)
}
Some("https") => {
// In case of HTTPS, establish a secure TLS connection first.
let stream = {
let port = req.uri().port_u16().unwrap_or(443);
TcpStream::connect((host, port)).await?
};
let stream = async_native_tls::connect(host, stream).await?;
SmolStream::Tls(stream)
}
scheme => bail!("unsupported scheme: {:?}", scheme),
}
};
// Spawn the HTTP/1 connection.
let (mut sender, conn) = hyper::client::conn::http1::handshake(FuturesIo::new(io)).await?;
ex.spawn(async move {
if let Err(e) = conn.await {
println!("Connection failed: {:?}", e);
}
})
.detach();
// Get the result
let result = sender.send_request(req).await?;
Ok(result)
}
#[apply(main!)]
async fn main(ex: &Executor<'static>) -> Result<()> {
// Create a request.
let url: hyper::Uri = "https://www.rust-lang.org".try_into()?;
let req = Request::builder()
.header(
hyper::header::HOST,
url.authority().unwrap().clone().as_str(),
)
.uri(url)
.body(Empty::new())?;
// Fetch the response.
let resp = fetch(ex, req).await?;
println!("{:#?}", resp);
// Read the message body.
let body: Vec<u8> = BodyStream::new(resp.into_body())
.try_fold(Vec::new(), |mut body, chunk| {
if let Some(chunk) = chunk.data_ref() {
body.extend_from_slice(chunk);
}
Ok(body)
})
.await?;
println!("{}", String::from_utf8_lossy(&body));
Ok(())
}
/// A TCP or TCP+TLS connection.
enum SmolStream {
/// A plain TCP connection.
Plain(TcpStream),
/// A TCP connection secured by TLS.
Tls(TlsStream<TcpStream>),
}
impl AsyncRead for SmolStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match &mut *self {
SmolStream::Plain(stream) => Pin::new(stream).poll_read(cx, buf),
SmolStream::Tls(stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
impl AsyncWrite for SmolStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match &mut *self {
SmolStream::Plain(stream) => Pin::new(stream).poll_write(cx, buf),
SmolStream::Tls(stream) => Pin::new(stream).poll_write(cx, buf),
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
SmolStream::Plain(stream) => Pin::new(stream).poll_close(cx),
SmolStream::Tls(stream) => Pin::new(stream).poll_close(cx),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
SmolStream::Plain(stream) => Pin::new(stream).poll_flush(cx),
SmolStream::Tls(stream) => Pin::new(stream).poll_flush(cx),
}
}
}