Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(subscriber): correct retain logic #447

Merged
merged 2 commits into from Aug 1, 2023
Merged

Conversation

hds
Copy link
Collaborator

@hds hds commented Jul 5, 2023

The current logic present in IdData::drop_closed marks an item (task,
resource, and async op stats) to be dropped in the case that the item
is dirty and there are watchers: (dirty && has_watchers).

This causes a case where if an item is first received and then completes
in between the aggregator push cycle, it will be discarded immediately
and never sent.

This logic has been in place since the concepts of watchers and dirty
items was introduced in #77. However since an item that is created and
then dropped within a single update cycle isn't likely to be missed in
the UI, it may never have been noticed.

Instead the logic should be to retain an item if any of the
following is true:

  • there are watchers and the item is dirty: (dirty && has_watchers)
  • item has been dropped less time than the retention period:
    dropped_for <= retention.

The current logic present in `IdData::drop_closed` marks an item (task,
resource, and async op stats) to be dropped in the case that the item
**is** dirty and there **are** watchers: `(dirty && has_watchers)`.

This causes a case where if an item is first received and then completes
in between the aggregator push cycle, it will be discarded immediately
and never sent.

This logic has been in place since the concepts of watchers and dirty
items was introduced in #77. However since an item that is created and
then dropped within a single update cycle isn't likely to be missed in
the UI, it may never have been noticed.

Instead the logic should be to **retain** an item if **any** of the
following is true:
* there are watchers and the item is dirty: `(dirty && has_watchers)`
* item has been dropped less time than the retention period:
  `dropped_for <= retention`.
@hds hds requested a review from a team as a code owner July 5, 2023 21:52
@hds hds marked this pull request as draft July 5, 2023 21:55
hds added a commit that referenced this pull request Jul 18, 2023
The `console-subscriber` crate has no integration tests. There are some
unit tests, but without very high coverage of features.

Recently, we've found or fixed a few errors which probably could have
been caught by a medium level of integration testing.

However, testing `console-subscriber` isn't straight forward. It is
effectively a tracing subscriber (or layer) on one end, and a gRPC
server on the other end.

This change adds enough of a testing framework to write some initial
integration tests.

Each test comprises 2 parts:
- One or more "expcted tasks"
- A future which will be driven to completion on a dedicated Tokio runtime.

Behind the scenes, a console subscriber layer is created and it's server
part is connected to a duplex stream. The client of the duplex stream
then records incoming updates and reconstructs "actual tasks". The layer
itself is set as the default subscriber for the duration of `block_on`
which is used to drive the provided future to completioin.

The expected tasks have a set of "matches", which is how we find the
actual task that we want to validate against. Currently, the only value
we match on is the task's name.

The expected tasks also have a set of expectations. These are other
fields on the actual task which are validated once a matching task is
found. Currently, the two fields which can have expectations set on them
are the `wakes` and `self_wakes` fields.

So, to construct an expected task, which will match a task with the name
`"my-task"` and then validate that the matched task gets woken once, the
code would be:

```rust
ExpectedTask::default()
    .match_name("my-task")
    .expect_wakes(1);
```

A future which passes this test could be:

```rust
async {
    task::Builder::new()
        .name("my-task")
        .spawn(async {
            tokio::time::sleep(std::time::Duration::ZERO).await
        })
}
```

The full test would then look like:

```rust
fn wakes_once() {
    let expected_task = ExpectedTask::default()
        .match_name("my-task")
        .expect_wakes(1);

    let future = async {
        task::Builder::new()
            .name("my-task")
            .spawn(async {
                tokio::time::sleep(std::time::Duration::ZERO).await
            })
    };

    assert_task(expected_task, future);
}
```

The PR depends on 2 others:
 - #447 which fixes an error in the logic that determines whether a task
   is retained in the aggregator or not.
 - #451 which exposes the server parts and is necessary to allow us to
   connect the instrument server and client via a duplex channel.

This change contains some initial tests for wakes and self wakes which
would have caught the error fixed in #430. Additionally there are tests
for the functionality of the testing framework itself.
hds added a commit that referenced this pull request Jul 18, 2023
The `console-subscriber` crate has no integration tests. There are some
unit tests, but without very high coverage of features.

Recently, we've found or fixed a few errors which probably could have
been caught by a medium level of integration testing.

However, testing `console-subscriber` isn't straight forward. It is
effectively a tracing subscriber (or layer) on one end, and a gRPC
server on the other end.

This change adds enough of a testing framework to write some initial
integration tests.

Each test comprises 2 parts:
- One or more "expcted tasks"
- A future which will be driven to completion on a dedicated Tokio runtime.

Behind the scenes, a console subscriber layer is created and it's server
part is connected to a duplex stream. The client of the duplex stream
then records incoming updates and reconstructs "actual tasks". The layer
itself is set as the default subscriber for the duration of `block_on`
which is used to drive the provided future to completioin.

The expected tasks have a set of "matches", which is how we find the
actual task that we want to validate against. Currently, the only value
we match on is the task's name.

The expected tasks also have a set of expectations. These are other
fields on the actual task which are validated once a matching task is
found. Currently, the two fields which can have expectations set on them
are the `wakes` and `self_wakes` fields.

So, to construct an expected task, which will match a task with the name
`"my-task"` and then validate that the matched task gets woken once, the
code would be:

```rust
ExpectedTask::default()
    .match_name("my-task")
    .expect_wakes(1);
```

A future which passes this test could be:

```rust
async {
    task::Builder::new()
        .name("my-task")
        .spawn(async {
            tokio::time::sleep(std::time::Duration::ZERO).await
        })
}
```

The full test would then look like:

```rust
fn wakes_once() {
    let expected_task = ExpectedTask::default()
        .match_name("my-task")
        .expect_wakes(1);

    let future = async {
        task::Builder::new()
            .name("my-task")
            .spawn(async {
                tokio::time::sleep(std::time::Duration::ZERO).await
            })
    };

    assert_task(expected_task, future);
}
```

The PR depends on 2 others:
 - #447 which fixes an error in the logic that determines whether a task
   is retained in the aggregator or not.
 - #451 which exposes the server parts and is necessary to allow us to
   connect the instrument server and client via a duplex channel.

This change contains some initial tests for wakes and self wakes which
would have caught the error fixed in #430. Additionally there are tests
for the functionality of the testing framework itself.
hds added a commit that referenced this pull request Jul 18, 2023
The `console-subscriber` crate has no integration tests. There are some
unit tests, but without very high coverage of features.

Recently, we've found or fixed a few errors which probably could have
been caught by a medium level of integration testing.

However, testing `console-subscriber` isn't straight forward. It is
effectively a tracing subscriber (or layer) on one end, and a gRPC
server on the other end.

This change adds enough of a testing framework to write some initial
integration tests.

Each test comprises 2 parts:
- One or more "expcted tasks"
- A future which will be driven to completion on a dedicated Tokio runtime.

Behind the scenes, a console subscriber layer is created and it's server
part is connected to a duplex stream. The client of the duplex stream
then records incoming updates and reconstructs "actual tasks". The layer
itself is set as the default subscriber for the duration of `block_on`
which is used to drive the provided future to completioin.

The expected tasks have a set of "matches", which is how we find the
actual task that we want to validate against. Currently, the only value
we match on is the task's name.

The expected tasks also have a set of expectations. These are other
fields on the actual task which are validated once a matching task is
found. Currently, the two fields which can have expectations set on them
are the `wakes` and `self_wakes` fields.

So, to construct an expected task, which will match a task with the name
`"my-task"` and then validate that the matched task gets woken once, the
code would be:

```rust
ExpectedTask::default()
    .match_name("my-task")
    .expect_wakes(1);
```

A future which passes this test could be:

```rust
async {
    task::Builder::new()
        .name("my-task")
        .spawn(async {
            tokio::time::sleep(std::time::Duration::ZERO).await
        })
}
```

The full test would then look like:

```rust
fn wakes_once() {
    let expected_task = ExpectedTask::default()
        .match_name("my-task")
        .expect_wakes(1);

    let future = async {
        task::Builder::new()
            .name("my-task")
            .spawn(async {
                tokio::time::sleep(std::time::Duration::ZERO).await
            })
    };

    assert_task(expected_task, future);
}
```

The PR depends on 2 others:
 - #447 which fixes an error in the logic that determines whether a task
   is retained in the aggregator or not.
 - #451 which exposes the server parts and is necessary to allow us to
   connect the instrument server and client via a duplex channel.

This change contains some initial tests for wakes and self wakes which
would have caught the error fixed in #430. Additionally there are tests
for the functionality of the testing framework itself.
hds added a commit that referenced this pull request Jul 18, 2023
The `console-subscriber` crate has no integration tests. There are some
unit tests, but without very high coverage of features.

Recently, we've found or fixed a few errors which probably could have
been caught by a medium level of integration testing.

However, testing `console-subscriber` isn't straight forward. It is
effectively a tracing subscriber (or layer) on one end, and a gRPC
server on the other end.

This change adds enough of a testing framework to write some initial
integration tests. It is the first step towards closing #450.

Each test comprises 2 parts:
- One or more "expcted tasks"
- A future which will be driven to completion on a dedicated Tokio runtime.

Behind the scenes, a console subscriber layer is created and it's server
part is connected to a duplex stream. The client of the duplex stream
then records incoming updates and reconstructs "actual tasks". The layer
itself is set as the default subscriber for the duration of `block_on`
which is used to drive the provided future to completioin.

The expected tasks have a set of "matches", which is how we find the
actual task that we want to validate against. Currently, the only value
we match on is the task's name.

The expected tasks also have a set of expectations. These are other
fields on the actual task which are validated once a matching task is
found. Currently, the two fields which can have expectations set on them
are the `wakes` and `self_wakes` fields.

So, to construct an expected task, which will match a task with the name
`"my-task"` and then validate that the matched task gets woken once, the
code would be:

```rust
ExpectedTask::default()
    .match_name("my-task")
    .expect_wakes(1);
```

A future which passes this test could be:

```rust
async {
    task::Builder::new()
        .name("my-task")
        .spawn(async {
            tokio::time::sleep(std::time::Duration::ZERO).await
        })
}
```

The full test would then look like:

```rust
fn wakes_once() {
    let expected_task = ExpectedTask::default()
        .match_name("my-task")
        .expect_wakes(1);

    let future = async {
        task::Builder::new()
            .name("my-task")
            .spawn(async {
                tokio::time::sleep(std::time::Duration::ZERO).await
            })
    };

    assert_task(expected_task, future);
}
```

The PR depends on 2 others:
 - #447 which fixes an error in the logic that determines whether a task
   is retained in the aggregator or not.
 - #451 which exposes the server parts and is necessary to allow us to
   connect the instrument server and client via a duplex channel.

This change contains some initial tests for wakes and self wakes which
would have caught the error fixed in #430. Additionally there are tests
for the functionality of the testing framework itself.
@hds hds marked this pull request as ready for review July 18, 2023 15:41
hds added a commit that referenced this pull request Aug 1, 2023
The `console-subscriber` crate has no integration tests. There are some
unit tests, but without very high coverage of features.

Recently, we've found or fixed a few errors which probably could have
been caught by a medium level of integration testing.

However, testing `console-subscriber` isn't straight forward. It is
effectively a tracing subscriber (or layer) on one end, and a gRPC
server on the other end.

This change adds enough of a testing framework to write some initial
integration tests. It is the first step towards closing #450.

Each test comprises 2 parts:
- One or more "expcted tasks"
- A future which will be driven to completion on a dedicated Tokio runtime.

Behind the scenes, a console subscriber layer is created and it's server
part is connected to a duplex stream. The client of the duplex stream
then records incoming updates and reconstructs "actual tasks". The layer
itself is set as the default subscriber for the duration of `block_on`
which is used to drive the provided future to completioin.

The expected tasks have a set of "matches", which is how we find the
actual task that we want to validate against. Currently, the only value
we match on is the task's name.

The expected tasks also have a set of expectations. These are other
fields on the actual task which are validated once a matching task is
found. Currently, the two fields which can have expectations set on them
are the `wakes` and `self_wakes` fields.

So, to construct an expected task, which will match a task with the name
`"my-task"` and then validate that the matched task gets woken once, the
code would be:

```rust
ExpectedTask::default()
    .match_name("my-task")
    .expect_wakes(1);
```

A future which passes this test could be:

```rust
async {
    task::Builder::new()
        .name("my-task")
        .spawn(async {
            tokio::time::sleep(std::time::Duration::ZERO).await
        })
}
```

The full test would then look like:

```rust
fn wakes_once() {
    let expected_task = ExpectedTask::default()
        .match_name("my-task")
        .expect_wakes(1);

    let future = async {
        task::Builder::new()
            .name("my-task")
            .spawn(async {
                tokio::time::sleep(std::time::Duration::ZERO).await
            })
    };

    assert_task(expected_task, future);
}
```

The PR depends on 2 others:
 - #447 which fixes an error in the logic that determines whether a task
   is retained in the aggregator or not.
 - #451 which exposes the server parts and is necessary to allow us to
   connect the instrument server and client via a duplex channel.

This change contains some initial tests for wakes and self wakes which
would have caught the error fixed in #430. Additionally there are tests
for the functionality of the testing framework itself.
@hawkw hawkw merged commit fa9a45c into main Aug 1, 2023
12 checks passed
@hawkw hawkw deleted the hds/aggregator-should-retain branch August 1, 2023 17:34
hds added a commit that referenced this pull request Sep 6, 2023
The `console-subscriber` crate has no integration tests. There are some
unit tests, but without very high coverage of features.

Recently, we've found or fixed a few errors which probably could have
been caught by a medium level of integration testing.

However, testing `console-subscriber` isn't straight forward. It is
effectively a tracing subscriber (or layer) on one end, and a gRPC
server on the other end.

This change adds enough of a testing framework to write some initial
integration tests. It is the first step towards closing #450.

Each test comprises 2 parts:
- One or more "expected tasks"
- A future which will be driven to completion on a dedicated Tokio runtime.

Behind the scenes, a console subscriber layer is created and its server
part is connected to a duplex stream. The client of the duplex stream
then records incoming updates and reconstructs "actual tasks". The layer
itself is set as the default subscriber for the duration of `block_on`
which is used to drive the provided future to completioin.

The expected tasks have a set of "matches", which is how we find the
actual task that we want to validate against. Currently, the only value
we match on is the task's name.

The expected tasks also have a set of "expectations". These are other
fields on the actual task which are validated once a matching task is
found. Currently, the two fields which can have expectations set on them
are `wakes` and `self_wakes`.

So, to construct an expected task, which will match a task with the name
`"my-task"` and then validate that the matched task gets woken once, the
code would be:

```rust
ExpectedTask::default()
    .match_name("my-task")
    .expect_wakes(1);
```

A future which passes this test could be:

```rust
async {
    task::Builder::new()
        .name("my-task")
        .spawn(async {
            tokio::time::sleep(std::time::Duration::ZERO).await
        })
}
```

The full test would then look like:

```rust
fn wakes_once() {
    let expected_task = ExpectedTask::default()
        .match_name("my-task")
        .expect_wakes(1);

    let future = async {
        task::Builder::new()
            .name("my-task")
            .spawn(async {
                tokio::time::sleep(std::time::Duration::ZERO).await
            })
    };

    assert_task(expected_task, future);
}
```

The PR depends on 2 others:
 - #447 which fixes an error in the logic that determines whether a task
   is retained in the aggregator or not.
 - #451 which exposes the server parts and is necessary to allow us to
   connect the instrument server and client via a duplex channel.

This change contains some initial tests for wakes and self wakes which
would have caught the error fixed in #430. Additionally there are tests
for the functionality of the testing framework itself.

Co-authored-by: Eliza Weisman <eliza@buoyant.io>
hawkw pushed a commit that referenced this pull request Sep 29, 2023
The current logic present in `IdData::drop_closed` marks an item (task,
resource, and async op stats) to be dropped in the case that the item
**is** dirty and there **are** watchers: `(dirty && has_watchers)`.

This causes a case where if an item is first received and then completes
in between the aggregator push cycle, it will be discarded immediately
and never sent.

This logic has been in place since the concepts of watchers and dirty
items was introduced in #77. However since an item that is created and
then dropped within a single update cycle isn't likely to be missed in
the UI, it may never have been noticed.

Instead the logic should be to **retain** an item if **any** of the
following is true:
* there are watchers and the item is dirty: `(dirty && has_watchers)`
* item has been dropped less time than the retention period:
  `dropped_for <= retention`.
hawkw added a commit that referenced this pull request Sep 29, 2023
The `console-subscriber` crate has no integration tests. There are some
unit tests, but without very high coverage of features.

Recently, we've found or fixed a few errors which probably could have
been caught by a medium level of integration testing.

However, testing `console-subscriber` isn't straight forward. It is
effectively a tracing subscriber (or layer) on one end, and a gRPC
server on the other end.

This change adds enough of a testing framework to write some initial
integration tests. It is the first step towards closing #450.

Each test comprises 2 parts:
- One or more "expected tasks"
- A future which will be driven to completion on a dedicated Tokio runtime.

Behind the scenes, a console subscriber layer is created and its server
part is connected to a duplex stream. The client of the duplex stream
then records incoming updates and reconstructs "actual tasks". The layer
itself is set as the default subscriber for the duration of `block_on`
which is used to drive the provided future to completioin.

The expected tasks have a set of "matches", which is how we find the
actual task that we want to validate against. Currently, the only value
we match on is the task's name.

The expected tasks also have a set of "expectations". These are other
fields on the actual task which are validated once a matching task is
found. Currently, the two fields which can have expectations set on them
are `wakes` and `self_wakes`.

So, to construct an expected task, which will match a task with the name
`"my-task"` and then validate that the matched task gets woken once, the
code would be:

```rust
ExpectedTask::default()
    .match_name("my-task")
    .expect_wakes(1);
```

A future which passes this test could be:

```rust
async {
    task::Builder::new()
        .name("my-task")
        .spawn(async {
            tokio::time::sleep(std::time::Duration::ZERO).await
        })
}
```

The full test would then look like:

```rust
fn wakes_once() {
    let expected_task = ExpectedTask::default()
        .match_name("my-task")
        .expect_wakes(1);

    let future = async {
        task::Builder::new()
            .name("my-task")
            .spawn(async {
                tokio::time::sleep(std::time::Duration::ZERO).await
            })
    };

    assert_task(expected_task, future);
}
```

The PR depends on 2 others:
 - #447 which fixes an error in the logic that determines whether a task
   is retained in the aggregator or not.
 - #451 which exposes the server parts and is necessary to allow us to
   connect the instrument server and client via a duplex channel.

This change contains some initial tests for wakes and self wakes which
would have caught the error fixed in #430. Additionally there are tests
for the functionality of the testing framework itself.

Co-authored-by: Eliza Weisman <eliza@buoyant.io>
hawkw added a commit that referenced this pull request Sep 29, 2023
# Changelog

All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## console-subscriber-v0.2.0 - (2023-09-29)

[0b0c1af](https://github.com/tokio-rs/console/commit/0b0c1aff18c3260d3a45a78f6c0d6f4206af1cbb)...[0b0c1af](https://github.com/tokio-rs/console/commit/0b0c1aff18c3260d3a45a78f6c0d6f4206af1cbb)

### <a id = "console-subscriber-v0.2.0-breaking"></a>Breaking Changes
- **Update Tonic and Prost dependencies ([#364](#364 ([f9b8e03](https://github.com/tokio-rs/console/commit/f9b8e03bd7ee1d0edb441c94a93a350d5b06ed3b))<br />This commit updates the public dependencies `prost` and `tonic` to
semver-incompatible versions (v0.11.0 and v0.8.0, respectively). This is
a breaking change for users who are integrating the `console-api` protos
with their own `tonic` servers or clients.
- **Update `tonic` to v0.10 and increase MSRV to 1.64 ([#464](#464 ([96e62c8](https://github.com/tokio-rs/console/commit/96e62c83ef959569bb062dc8fee98fa2b2461e8d))<br />This is a breaking change for users of `console-api` and
`console-subscriber`, as it changes the public `tonic` dependency to a
semver-incompatible version. This breaks compatibility with `tonic`
0.9.x and `prost` 0.11.x.

### Added

- [**breaking**](#console-subscriber-v0.2.0-breaking) Update Tonic and Prost dependencies ([#364](#364)) ([f9b8e03](f9b8e03))
- Add support for Unix domain sockets ([#388](#388)) ([a944dbc](a944dbc), closes [#296](#296))
- Add scheduled time per task ([#406](#406)) ([f280df9](f280df9))
- Add task scheduled times histogram ([#409](#409)) ([d92a399](d92a399))
- Update `tonic` to 0.9 ([#420](#420)) ([48af1ee](48af1ee))
- Update MSRV to Rust 1.60.0 ([b18ee47](b18ee47))
- Expose server parts ([#451](#451)) ([e51ac5a](e51ac5a))
- Add cfg `console_without_tokio_unstable` ([#446](#446)) ([7ed6673](7ed6673))
- Add warning for tasks that never yield ([#439](#439)) ([d05fa9e](d05fa9e))
- [**breaking**](#console-subscriber-v0.2.0-breaking) Update `tonic` to v0.10 and increase MSRV to 1.64 ([#464](#464)) ([96e62c8](96e62c8))

### Documented

- Fix unclosed code block ([#463](#463)) ([362bdbe](362bdbe))
- Update MSRV version docs to 1.64 ([#467](#467)) ([94a5a51](94a5a51))

### Fixed

- Fix build on tokio 1.21.0 ([#374](#374)) ([c34ac2d](c34ac2d))
- Fix off-by-one indexing for `callsites` ([#391](#391)) ([43891ab](43891ab))
- Bump minimum Tokio version ([#397](#397)) ([bbb8f25](bbb8f25), fixes [#386](#386))
- Fix self wakes count ([#430](#430)) ([d308935](d308935))
- Remove clock skew warning in `start_poll` ([#434](#434)) ([4a88b28](4a88b28))
- Do not report excessive polling ([#378](#378)) ([#440](#440)) ([8b483bf](8b483bf), closes [#378](#378))
- Correct retain logic ([#447](#447)) ([36ffc51](36ffc51))

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants