Skip to content

Commit

Permalink
Replace reqwest with ureq (#1407)
Browse files Browse the repository at this point in the history
* Replace reqwest with ureq for less dependencies

* Simplify code

* Cleanup
  • Loading branch information
emilk committed Feb 26, 2023
1 parent f77420e commit 90db32b
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 160 deletions.
130 changes: 19 additions & 111 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ polars-core = "0.27.1"
polars-lazy = "0.27.1"
polars-ops = "0.27.1"
puffin = "0.14"
reqwest = { version = "0.11", default-features = false }
thiserror = "1.0"
time = "0.3"
tokio = "1.24"
Expand Down
4 changes: 3 additions & 1 deletion crates/re_analytics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ uuid = { version = "1.1", features = ["serde", "v4", "js"] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
directories-next = "2"
reqwest = { workspace = true, features = ["blocking", "rustls-tls"] }
ureq = { version = "2.6", features = [
"json",
] } # TODO(emilk): use ehttp for web supprt

[target.'cfg(target_arch = "wasm32")'.dependencies]
web-sys = { version = "0.3.58", features = ["Storage"] }
Expand Down
3 changes: 0 additions & 3 deletions crates/re_analytics/src/pipeline_native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ pub enum PipelineError {

#[error(transparent)]
Serde(#[from] serde_json::Error),

#[error(transparent)]
Http(#[from] reqwest::Error),
}

/// An eventual, at-least-once(-ish) event pipeline, backed by a write-ahead log on the local disk.
Expand Down
107 changes: 65 additions & 42 deletions crates/re_analytics/src/sink_native.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use std::{collections::HashMap, time::Duration};
use std::collections::HashMap;

use once_cell::sync::OnceCell;
use reqwest::{blocking::Client as HttpClient, Url};
use time::OffsetDateTime;

use re_log::{debug, error};

use crate::{Event, Property};

// TODO(cmc): abstract away the concept of a `Sink` behind an actual trait when comes the time to
Expand All @@ -15,6 +11,9 @@ const PUBLIC_POSTHOG_PROJECT_KEY: &str = "phc_sgKidIE4WYYFSJHd8LEYY1UZqASpnfQKeM

// ---

#[derive(Debug, Clone)]
struct Url(String);

#[derive(thiserror::Error, Debug)]
pub enum SinkError {
#[error(transparent)]
Expand All @@ -24,19 +23,46 @@ pub enum SinkError {
Serde(#[from] serde_json::Error),

#[error(transparent)]
Http(#[from] reqwest::Error),
HttpTransport(Box<ureq::Transport>),

#[error("HTTP status {status_code} {status_text}: {body}")]
HttpStatus {
status_code: u16,
status_text: String,
body: String,
},
}

impl From<ureq::Error> for SinkError {
fn from(err: ureq::Error) -> Self {
match err {
ureq::Error::Status(status_code, response) => Self::HttpStatus {
status_code,
status_text: response.status_text().to_owned(),
body: response.into_string().unwrap_or_default(),
},

ureq::Error::Transport(transport) => Self::HttpTransport(Box::new(transport)),
}
}
}

#[derive(Default, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct PostHogSink {
// NOTE: We need to lazily build the underlying HTTP client, so that we can guarantee that it
// is initialized from a thread that is free of a Tokio runtime.
// This is necessary because `reqwest` will crash if we try and initialize a blocking HTTP
// client from within a thread that has a Tokio runtime instantiated.
//
// We also use this opportunity to upgrade our public HTTP endpoint into the final HTTP2/TLS
// URL by following all 301 redirects.
client: OnceCell<(Url, HttpClient)>,
agent: ureq::Agent,
// Lazily resolve the url so that we don't do blocking requests in `PostHogSink::default`
resolved_url: once_cell::sync::OnceCell<String>,
}

impl Default for PostHogSink {
fn default() -> Self {
Self {
agent: ureq::AgentBuilder::new()
.timeout(std::time::Duration::from_secs(5))
.build(),
resolved_url: Default::default(),
}
}
}

impl PostHogSink {
Expand All @@ -50,42 +76,39 @@ impl PostHogSink {
session_id: &str,
events: &[Event],
) -> Result<(), SinkError> {
let (resolved_url, client) = self.init()?;
let resolved_url = self.init()?;

let events = events
.iter()
.map(|event| PostHogEvent::from_event(analytics_id, session_id, event))
.collect::<Vec<_>>();
let batch = PostHogBatch::from_events(&events);

debug!("{}", serde_json::to_string_pretty(&batch)?);
let resp = client
.post(resolved_url.clone())
.body(serde_json::to_vec(&batch)?)
.send()?;

resp.error_for_status().map(|_| ()).map_err(Into::into)
re_log::debug!("{}", serde_json::to_string_pretty(&batch)?);
self.agent.post(resolved_url).send_json(&batch)?;
Ok(())
}

fn init(&self) -> Result<&(Url, HttpClient), SinkError> {
self.client.get_or_try_init(|| {
use reqwest::header;
let mut headers = header::HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/json"),
);

let client = HttpClient::builder()
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.pool_idle_timeout(Duration::from_secs(120))
.default_headers(headers)
.build()?;

let resolved_url = client.get(Self::URL).send()?.url().clone();

Ok((resolved_url, client))
fn init(&self) -> Result<&String, SinkError> {
self.resolved_url.get_or_try_init(|| {
// Make a dummy-request to resolve our final URL.
let resolved_url = match self.agent.get(Self::URL).call() {
Ok(response) => response.get_url().to_owned(),
Err(ureq::Error::Status(status, response)) => {
// We actually expect to get here, because we make a bad request (GET to and end-point that expects a POST).
// We only do this requests to get redirected to the final URL.
let resolved_url = response.get_url().to_owned();
re_log::trace!("status: {status} {}", response.status_text().to_owned());
resolved_url
}
Err(ureq::Error::Transport(transport)) => {
return Err(SinkError::HttpTransport(Box::new(transport)))
}
};

// 2023-02-26 the resolved URL was https://eu.posthog.com/capture (in Europe)

Ok(resolved_url)
})
}
}
Expand Down

1 comment on commit 90db32b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 90db32b Previous: 4f8468a Ratio
datastore/insert/batch/rects/insert 547348 ns/iter (± 2815) 549792 ns/iter (± 3167) 1.00
datastore/latest_at/batch/rects/query 1830 ns/iter (± 5) 1842 ns/iter (± 12) 0.99
datastore/latest_at/missing_components/primary 358 ns/iter (± 0) 356 ns/iter (± 2) 1.01
datastore/latest_at/missing_components/secondaries 425 ns/iter (± 2) 419 ns/iter (± 5) 1.01
datastore/range/batch/rects/query 152112 ns/iter (± 580) 151052 ns/iter (± 1292) 1.01
mono_points_arrow/generate_message_bundles 45855910 ns/iter (± 840388) 48445375 ns/iter (± 1774428) 0.95
mono_points_arrow/generate_messages 125104798 ns/iter (± 1052527) 136702497 ns/iter (± 1659147) 0.92
mono_points_arrow/encode_log_msg 152105812 ns/iter (± 1019000) 164855431 ns/iter (± 1057345) 0.92
mono_points_arrow/encode_total 326032772 ns/iter (± 1243889) 351685210 ns/iter (± 3606074) 0.93
mono_points_arrow/decode_log_msg 177194253 ns/iter (± 828451) 186173612 ns/iter (± 1905498) 0.95
mono_points_arrow/decode_message_bundles 64301048 ns/iter (± 714563) 73076209 ns/iter (± 1177197) 0.88
mono_points_arrow/decode_total 238352334 ns/iter (± 1853136) 256329369 ns/iter (± 2419251) 0.93
batch_points_arrow/generate_message_bundles 325314 ns/iter (± 1175) 322821 ns/iter (± 2754) 1.01
batch_points_arrow/generate_messages 6245 ns/iter (± 33) 6190 ns/iter (± 71) 1.01
batch_points_arrow/encode_log_msg 373141 ns/iter (± 1733) 365590 ns/iter (± 3324) 1.02
batch_points_arrow/encode_total 715043 ns/iter (± 2797) 712129 ns/iter (± 6658) 1.00
batch_points_arrow/decode_log_msg 350709 ns/iter (± 1761) 346470 ns/iter (± 3586) 1.01
batch_points_arrow/decode_message_bundles 2031 ns/iter (± 12) 1992 ns/iter (± 23) 1.02
batch_points_arrow/decode_total 356021 ns/iter (± 1070) 355692 ns/iter (± 2599) 1.00
arrow_mono_points/insert 6105989507 ns/iter (± 13623479) 6985053068 ns/iter (± 176020015) 0.87
arrow_mono_points/query 1737889 ns/iter (± 10434) 1717200 ns/iter (± 14682) 1.01
arrow_batch_points/insert 2679472 ns/iter (± 10887) 2603860 ns/iter (± 19505) 1.03
arrow_batch_points/query 17490 ns/iter (± 145) 17440 ns/iter (± 101) 1.00
tuid/Tuid::random 34 ns/iter (± 1) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.