Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Make string interpolation error reporting great again.
Signed-off-by: Matthias Wahl <mwahl@wayfair.com>
  • Loading branch information
Matthias Wahl authored and mfelsche committed Nov 26, 2020
1 parent 317f2b8 commit 5918a05
Show file tree
Hide file tree
Showing 67 changed files with 647 additions and 433 deletions.
23 changes: 22 additions & 1 deletion .vscode/launch.json
Expand Up @@ -22,7 +22,8 @@
"args": [
//"server",
"run",
"temp/heredoc.tremor"
//"temp/end.tremor",
"tests/script_errors/lexer_string_interpolation3/script.tremor"
//"-f",
//"../tremor-www-docs/docs/workshop/examples/35_reverse_proxy_load_balancing/etc/tremor/config/config.yaml",
// "../tremor-www-docs/docs/workshop/examples/35_reverse_proxy_load_balancing/etc/tremor/config/request_handling.trickle",
Expand All @@ -36,6 +37,26 @@
},
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
"name": "Debug script_error unit test",
"cargo": {
"args": [
"test",
"--no-run",
"pp_embed_unrecognized_token2"
],
"filter": {
"name": "script_error",
"kind": "test"
}
},
"args": [
"${selectedText}"
],
"cwd": "${workspaceFolder}"
},
{
"type": "lldb",
"request": "launch",
Expand Down
3 changes: 3 additions & 0 deletions src/codec/binflux.rs
Expand Up @@ -30,9 +30,11 @@ const TYPE_FALSE: u8 = 4;
pub struct BInflux {}

impl BInflux {
#[allow(clippy::map_err_ignore)]
pub fn encode(v: &simd_json::BorrowedValue) -> Result<Vec<u8>> {
fn write_str<W: Write>(w: &mut W, s: &str) -> Result<()> {
w.write_u16::<BigEndian>(
#[allow(clippy::map_err_ignore)]
u16::try_from(s.len())
.map_err(|_| ErrorKind::InvalidBInfluxData("string too long".into()))?,
)?;
Expand Down Expand Up @@ -71,6 +73,7 @@ impl BInflux {

if let Some(fields) = v.get("fields").and_then(Value::as_object) {
res.write_u16::<BigEndian>(
#[allow(clippy::map_err_ignore)]
u16::try_from(fields.len())
.map_err(|_| ErrorKind::InvalidBInfluxData("too many fields".into()))?,
)?;
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Expand Up @@ -23,7 +23,7 @@
clippy::unnecessary_unwrap,
clippy::pedantic
)]

#![warn(clippy::option_if_let_else)]
#[macro_use]
extern crate serde_derive;
#[macro_use]
Expand Down Expand Up @@ -120,12 +120,13 @@ pub fn incarnate(config: config::Config) -> Result<IncarnatedConfig> {
})
}

// TODO: what the heck do we need this for?
fn incarnate_onramps(config: config::OnRampVec) -> OnRampVec {
config.into_iter().map(|d| d).collect()
config.into_iter().collect()
}

fn incarnate_offramps(config: config::OffRampVec) -> OffRampVec {
config.into_iter().map(|d| d).collect()
config.into_iter().collect()
}

fn incarnate_links(config: &[Binding]) -> BindingVec {
Expand Down Expand Up @@ -160,7 +161,7 @@ pub async fn load_query_file(world: &World, file_name: &str) -> Result<usize> {
&*FN_REGISTRY.lock()?,
&aggr_reg,
)?;
let id = query.id().unwrap_or_else(|| &file_id);
let id = query.id().unwrap_or(&file_id);

let id = TremorURL::parse(&format!("/pipeline/{}", id))?;
info!("Loading {} from file {}.", id, file_name);
Expand Down
1 change: 1 addition & 0 deletions src/preprocessor.rs
Expand Up @@ -123,6 +123,7 @@ impl SliceTrim for [u8] {
c != b'\t' && c != b' '
}

#[allow(clippy::option_if_let_else)]
if let Some(first) = self.iter().position(|v| is_not_whitespace(*v)) {
if let Some(last) = self.iter().rposition(|v| is_not_whitespace(*v)) {
self.get(first..=last).unwrap_or_default()
Expand Down
30 changes: 15 additions & 15 deletions src/ramp/postgres.rs
Expand Up @@ -157,22 +157,22 @@ impl postgres::types::ToSql for Record<'_> {
// NOTE: cannot easily tell which types are accepted due to trait not accepting self
// which holds t context. This renders the function not as useful
fn accepts(ty: &postgres::types::Type) -> bool {
match ty {
matches!(
ty,
&postgres::types::Type::BOOL
| &postgres::types::Type::CHAR
| &postgres::types::Type::TEXT
| &postgres::types::Type::NAME
| &postgres::types::Type::INT2
| &postgres::types::Type::INT4
| &postgres::types::Type::INT8
| &postgres::types::Type::JSON
| &postgres::types::Type::JSONB
| &postgres::types::Type::TIMESTAMP
| &postgres::types::Type::TIMESTAMPTZ
| &postgres::types::Type::UNKNOWN
| &postgres::types::Type::VARCHAR => true,
_ => false,
}
| &postgres::types::Type::CHAR
| &postgres::types::Type::TEXT
| &postgres::types::Type::NAME
| &postgres::types::Type::INT2
| &postgres::types::Type::INT4
| &postgres::types::Type::INT8
| &postgres::types::Type::JSON
| &postgres::types::Type::JSONB
| &postgres::types::Type::TIMESTAMP
| &postgres::types::Type::TIMESTAMPTZ
| &postgres::types::Type::UNKNOWN
| &postgres::types::Type::VARCHAR
)
}

to_sql_checked!();
Expand Down
9 changes: 4 additions & 5 deletions src/repository/artefact.rs
Expand Up @@ -335,11 +335,10 @@ impl Artefact for OnrampArtefact {
type LinkRHS = TremorURL;
async fn spawn(&self, world: &World, servant_id: ServantId) -> Result<Self::SpawnResult> {
let stream = onramp::lookup(&self.binding_type, &servant_id, &self.config)?;
let codec = if let Some(codec) = &self.codec {
codec.clone()
} else {
stream.default_codec().to_string()
};
let codec = self.codec.as_ref().map_or_else(
|| stream.default_codec().to_string(),
std::clone::Clone::clone,
);
let codec_map = self
.codec_map
.clone()
Expand Down
9 changes: 4 additions & 5 deletions src/sink/debug.rs
Expand Up @@ -82,11 +82,10 @@ impl Sink for Debug {
}
self.buckets.clear();
}
let c = if let Some(s) = meta.get("class").and_then(Value::as_str) {
s
} else {
"<unclassified>"
};
let c = meta
.get("class")
.and_then(Value::as_str)
.unwrap_or("<unclassified>");
//TODO: return to entry
if let Some(entry) = self.buckets.get_mut(c) {
entry.cnt += 1;
Expand Down
24 changes: 12 additions & 12 deletions src/sink/kafka.rs
Expand Up @@ -126,19 +126,19 @@ impl offramp::Impl for Kafka {
}

fn is_fatal(e: &KafkaError) -> bool {
match e {
matches!(
e,
KafkaError::AdminOp(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::ConsumerCommit(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::Global(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::GroupListFetch(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::MessageConsumption(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::MessageProduction(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::MetadataFetch(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::OffsetFetch(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::SetPartitionOffset(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::StoreOffset(rdkafka::error::RDKafkaError::Fatal) => true,
_ => false,
}
| KafkaError::ConsumerCommit(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::Global(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::GroupListFetch(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::MessageConsumption(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::MessageProduction(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::MetadataFetch(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::OffsetFetch(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::SetPartitionOffset(rdkafka::error::RDKafkaError::Fatal)
| KafkaError::StoreOffset(rdkafka::error::RDKafkaError::Fatal)
)
}

unsafe fn get_fatal_error<C>(
Expand Down
1 change: 1 addition & 0 deletions src/sink/rest.rs
Expand Up @@ -65,6 +65,7 @@ impl Endpoint {
self.as_url_with_base("http://localhost")
}

#[allow(clippy::map_err_ignore)]
// expensive but seemingly correct way of populating a url from this struct
fn as_url_with_base(&self, base_url: &str) -> Result<url::Url> {
let mut res = url::Url::parse(base_url)?; //stupid placeholder, no other way to create a url
Expand Down
1 change: 1 addition & 0 deletions src/source.rs
Expand Up @@ -204,6 +204,7 @@ where
}
}

#[allow(clippy::option_if_let_else)]
async fn make_event_data(
&mut self,
stream: usize,
Expand Down
8 changes: 3 additions & 5 deletions src/source/crononome.rs
Expand Up @@ -263,12 +263,10 @@ impl ChronomicQueue {
trigger
}
pub fn next(&mut self) -> Option<(String, Option<BorrowedValue<'static>>)> {
if let Some(ti) = self.tpq.pop() {
self.tpq.pop().map(|ti| {
self.enqueue(&ti.what);
Some((ti.what.name, ti.what.payload))
} else {
None
}
(ti.what.name, ti.what.payload)
})
}
}

Expand Down
1 change: 1 addition & 0 deletions src/source/kafka.rs
Expand Up @@ -350,6 +350,7 @@ impl Source for Int {
// but only commit the offsets explicitly stored via `consumer.store_offset`.
.set("enable.auto.offset.store", "true");

#[allow(clippy::option_if_let_else)]
let client_config = if let Some(options) = self.config.rdkafka_options.as_ref() {
options
.iter()
Expand Down
7 changes: 1 addition & 6 deletions src/source/postgres.rs
Expand Up @@ -185,12 +185,7 @@ impl Source for Int {
self.rows = match Compat::new(client.query(statement, &[&cf, &ct])).await {
Ok(v) => v,
Err(e) => {
let code = if let Some(v) = e.code() {
v
} else {
&SqlState::CONNECTION_EXCEPTION
};

let code = e.code().unwrap_or(&SqlState::CONNECTION_EXCEPTION);
if code == &SqlState::CONNECTION_EXCEPTION {
self.cli = None;
self.stmt = None;
Expand Down
1 change: 1 addition & 0 deletions src/source/rest.rs
Expand Up @@ -343,6 +343,7 @@ fn make_response(
impl Source for Int {
// TODO possible to do this in source trait?
async fn pull_event(&mut self, id: u64) -> Result<SourceReply> {
#[allow(clippy::option_if_let_else)]
if let Some(listener) = self.listener.as_ref() {
match listener.try_recv() {
Ok(RestSourceReply(Some(response_tx), source_reply)) => {
Expand Down
1 change: 1 addition & 0 deletions src/source/tcp.rs
Expand Up @@ -77,6 +77,7 @@ impl Source for Int {
&self.onramp_id
}

#[allow(clippy::option_if_let_else)]
async fn pull_event(&mut self, _id: u64) -> Result<SourceReply> {
if let Some(listener) = self.listener.as_ref() {
match listener.try_recv() {
Expand Down
9 changes: 4 additions & 5 deletions src/source/ws.rs
Expand Up @@ -116,11 +116,9 @@ impl Int {
// for request/response style flow), but for websocket, arbitrary events can
// come in here.
// also messages keeps growing as events come in right now
if let Some(stream_id) = self.messages.get(&id) {
self.streams.get(&stream_id)
} else {
None
}
self.messages
.get(&id)
.and_then(|stream_id| self.streams.get(stream_id))
}
}

Expand Down Expand Up @@ -260,6 +258,7 @@ fn create_error_response(err: String, event_id: String, source_id: &TremorURL) -

#[async_trait::async_trait()]
impl Source for Int {
#[allow(clippy::option_if_let_else)]
async fn pull_event(&mut self, id: u64) -> Result<SourceReply> {
if let Some(listener) = self.listener.as_ref() {
match listener.try_recv() {
Expand Down
6 changes: 1 addition & 5 deletions src/url.rs
Expand Up @@ -208,11 +208,7 @@ impl TremorURL {
}
};

let host = if let Some(host) = r.host_str() {
host.to_owned()
} else {
"localhost".to_owned()
};
let host = r.host_str().unwrap_or("localhost").to_owned();
Ok(Self {
host,
scope,
Expand Down
1 change: 1 addition & 0 deletions src/utils.rs
Expand Up @@ -16,6 +16,7 @@
#[must_use]
#[cfg(not(tarpaulin_include))]
pub fn hostname() -> String {
#[allow(clippy::map_err_ignore)]
hostname::get()
.map_err(|_| ())
.and_then(|s| s.into_string().map_err(|_| ()))
Expand Down
1 change: 1 addition & 0 deletions tests/query_error.rs
Expand Up @@ -126,6 +126,7 @@ test_cases!(
local_in_select,
local_in_where,
local_in_group_by,
pp_mod_not_found,
pp_unrecognized_token,
pp_unrecognized_token2,
pp_unrecognized_token3,
Expand Down
2 changes: 1 addition & 1 deletion tests/query_errors/pp_embed_unrecognized_token/error.txt
Expand Up @@ -2,6 +2,6 @@ Error:
1 | define script snot
2 | script
3 | use ;
| Found the token `;` but expected `<ident>`
| ^ Found the token `;` but expected `<ident>`
4 | end;
5 |
12 changes: 7 additions & 5 deletions tests/query_errors/pp_mod_not_found/error.txt
@@ -1,6 +1,8 @@
Error:
1 | use foo as foo;
| Module `foo.tremor` not found or not readable error in module path:
- tests/script_errors/pp_mod_not_found/
2 |
3 | match event of
1 | define script snot
2 | script
3 | use foo as foo;
| ^ Module `foo` not found or not readable error in module path:
- tests/query_errors/pp_mod_not_found/
4 |
5 | match event of
14 changes: 14 additions & 0 deletions tests/query_errors/pp_mod_not_found/query.trickle
@@ -0,0 +1,14 @@
define script snot
script
use foo as foo;

match event of
case {} => "snot {snot}"
default => "ko"
end;
end;

create script snot;

select from in into snot;
select from snot into out;
6 changes: 0 additions & 6 deletions tests/query_errors/pp_mod_not_found/script.tremor

This file was deleted.

2 changes: 1 addition & 1 deletion tests/query_errors/pp_unrecognized_token/error.txt
@@ -1,5 +1,5 @@
Error:
1 | use ;
| Found the token `;` but expected `<ident>`
| ^ Found the token `;` but expected `<ident>`
2 |

6 changes: 6 additions & 0 deletions tests/script_error.rs
Expand Up @@ -134,6 +134,12 @@ test_cases!(
pp_cyclic,
pp_nest_cyclic,
//INSERT
lexer_string_interpolation3,
lexer_string_interpolation2,
lexer_string_interpolation,
lexer_heredoc_interpolation,
lexer_heredoc_interpolation2,
lexer_heredoc_interpolation3,
invalid_utf8_1,
invalid_utf8_2,
error_in_include,
Expand Down
2 changes: 1 addition & 1 deletion tests/script_errors/invalid_utf8_1/error.txt
@@ -1,3 +1,3 @@
Error:
1 | "\u000"
| ^ An unexpected escape code '"' was found
| ^^^^^ An invalid UTF8 escape sequence was found
2 changes: 1 addition & 1 deletion tests/script_errors/invalid_utf8_2/error.txt
@@ -1,3 +1,3 @@
Error:
1 | "invalid: \uD800
1 | "invalid: \uD800"
| ^^^^^^ An invalid UTF8 escape sequence was found

0 comments on commit 5918a05

Please sign in to comment.