Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async query support #30

Merged
merged 32 commits into from
Feb 19, 2021
Merged

Conversation

apennamen
Copy link
Contributor

@apennamen apennamen commented Dec 23, 2020

General Information

WMI provides an ExecQueryAsync method. This PR aims at providing an implementation and expose an API looking like :

let result: Future<Vec<HashMap<String, Variant>>> = con.async_raw_query("SELECT Name FROM Win32_OperatingSystem")?;

Motivation is described in #13.

WIP

  • Find a way to implement IWbemObjectSink
  • Toglle functionnality with a feature flag : async-query
  • Implements indicate and set_status in QuerySink based on this example. Maybe find other inspirations.
  • Write tests for async_raw_query
  • Implements pub fn async_raw_query in query.rs
  • Check for memory leaks (see MIRI ?)
  • Benchmarks
  • Refactoring
  • DocTest
  • Documentation in readme (feature enabling...)

Copy link
Owner

@ohadravid ohadravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the small & fast PR very very much! 🎩

Left a few comments, but I didn't dive too deep / compiled locally (for now at least 😄 ).
From the linked docs, I think the actual signature should be something like

let result: impl Stream<Item=HashMap<String, Variant>> = con.async_raw_query("SELECT Name FROM Win32_OperatingSystem")?;

Since WMI will indicate to the sink multiple times as more items become available.

Guessing even more, I think WMI will call this function from another thread, so be sure to wrap everything in a real Mutex (and not the "just" an async one) or something equivalent (for example, using a queue between the sink and the actual Stream) [and document that either way]. I don't think that ComPtr handles that but again I didn't check it myself.

src/query_sink.rs Outdated Show resolved Hide resolved
src/query.rs Outdated Show resolved Hide resolved
src/query.rs Outdated Show resolved Hide resolved
src/query_sink.rs Show resolved Hide resolved
@apennamen
Copy link
Contributor Author

apennamen commented Dec 25, 2020

Hello @ohadravid and Merry Christmas !

Is it ok to trigger your azure pipeline each time I push ? Otherwise I could disabled it temporarily for this branch ?

I have to take into account several of your comments, thanks a lot for reviewing quickly !

EDIT:
To reply on the implementation part of Indicate => indead, I will have to use a Mutex as the Indicate function can be called multiple times. I have to look into Stream, I think it's an excellent idea.

For the public API, I think I won't pub the exec_async_query_native_wrapper.

@ohadravid
Copy link
Owner

Is should probably be public, since it's still useful as an API.
You could also try to use something like async-channel to communicate between the sink and the actual future, which might be simpler and align better with the idea of the sink than a Mutex.

I think the pipeline won't mind.

Merry Christmas! 🎄

@apennamen
Copy link
Contributor Author

We had the same idea ! I was looking at implementing an async mspc too :D

let (tx, rx) = mpsc::channel();

let p_sink: ComPtr<IWbemObjectSink> = QuerySink::new(tx);

But it would fail as tx isn't Sync.

I'll go with async_channel it's perfect for me.

Thanks a lot !

@apennamen
Copy link
Contributor Author

Hello,
I just began unit testing the indicate function, so far so good even if the code is stil "rough" 😄

The async mpsc seems to work, it was a very good idea.
I think the SetStatus function will simply close the transmitter side of the mpsc, which will allow to finalize the Future by polling the state of the senders (with sender_count and is_closed method available on the receiver side.

For the moment, I send a Vec of results for each Indicate call, the Future will have to consolidate them once the channel is closed. I can't foresee any implementation hurdle right now after this quick and dirty try
There's still a lot of work but I guess the MVP should be quite close...


let lObjectCount = lObjectCount as usize;
let tx = self.sender.clone();
let mut result = Vec::<IWbemClassWrapper>::with_capacity(lObjectCount);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use a Sender<IWbemClassWrapper> directly, and send each object directly to the channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see 2 reasons by now :

Sink implementations should process the event notification within 100 MSEC because the WMI thread that delivers the event notification cannot do other work until the sink object has completed processing. If the notification requires a large amount of processing, the sink can use an internal queue for another thread to handle the processing.

(https://docs.microsoft.com/en-us/windows/win32/api/wbemcli/nf-wbemcli-iwbemobjectsink-indicate) => maybe It would take too much time to call the transmitter on each element, but I could be wrong.

The other reason would be that I don't see yet the need to iterate on the receiver side on each element.

It's not set in stone yet

BTW, the way I understand the previous quote, I think the Indicate function won't be called from 2 different threads, but I have to confirm this. I don't think it will impact my implementation anyway, but that could be a good news

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that sending to the channel should be almost immediate as it's not blocking (or at most on the order of a millisecond).

The other reason would be that I don't see yet the need to iterate on the receiver side on each element.

If I understand correctly, indicate can be called multiple times, so if you use the vector the other side will receive an entire vector each time, and will need to iterate on it and return items from it, before moving on to the next vector. It seem simpler to have a "flat" channel so both side just send/recv a single object at a time.

src/query_sink.rs Outdated Show resolved Hide resolved
src/query_sink.rs Outdated Show resolved Hide resolved

let mut arr = vec![ptr, ptr2];

unsafe {p_sink.Indicate(arr.len() as i32, arr.as_mut_ptr());}
Copy link
Owner

@ohadravid ohadravid Dec 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool test! I like this very much :)
I like it, but thinking about this some more, a simpler & more "real" test would probably be executing a query which return a few items, and iterating over the resulting stream.

For example, here you don't simulate the "real" way Indicate passes/not passes ownership 🤔

Copy link
Contributor Author

@apennamen apennamen Dec 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completed the test, it should be now splitted into several smaller tests, but I checked the refcount by releasing the first pointers, so that it mimics how the Indicate call will be done.

EDIT: now there should be too much Release at the end of the tests, I'm trying to find what happens in this case...
I'll quickly fix the tests first
EDIT2: quick fix available

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't resolve this conversation as I did not take exactly what you suggested into account.

Here I make some unit testing on the indicate and set_status methods. The "real" test is made in async_query.rs.

Copy link
Owner

@ohadravid ohadravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is looking really good!
Perhaps you can cleanup & handle the needed errors, and return a Result<impl Stream<Item=Result<IWbemClassWrapper, WMIError>>> by returning the receiver and we can merge this already!

src/query_sink.rs Outdated Show resolved Hide resolved
src/result_enumerator.rs Outdated Show resolved Hide resolved
@apennamen
Copy link
Contributor Author

apennamen commented Jan 11, 2021

Hello!

I wish you an happy new year, all the best for 2021 ;)

I tried a small experiment with the future crate, which is available here: https://github.com/apennamen/wmi-rs/tree/feat/async-query-with-future

I'm launching the tests with the following command line :
cargo test async --features async-query,test -- --show-output

As it is a different implementation, I made another branch. But at least I could craft a test for the exec_async_query_native_wrapper.
And that's when the problems begun... I was greeted at first with correct results, but after 3 or 4 runs, the dreaded stacktrace appeared :

running 2 tests
test query::tests::it_works_async ... thread '<unnamed>ok' panicked at '
attempt to subtract with overflow', C:\Users\apennamen\.cargo\registry\src\github.com-1ecc6299db9ec823\com-impl-0.1.1\src\lib.rs:139:9
stack backtrace:
   0: std::panicking::begin_panic_handler
             at /rustc/b32e6e6ac8921035177256ab6806e6ab0d4b9b94\/library\std\src\panicking.rs:493
   1: core::panicking::panic_fmt
             at /rustc/b32e6e6ac8921035177256ab6806e6ab0d4b9b94\/library\core\src\panicking.rs:92
   2: core::panicking::panic
             at /rustc/b32e6e6ac8921035177256ab6806e6ab0d4b9b94\/library\core\src\panicking.rs:50
   3: com_impl::Refcount::release
             at C:\Users\apennamen\.cargo\registry\src\github.com-1ecc6299db9ec823\com-impl-0.1.1\src\lib.rs:139
   4: wmi::query_sink::QuerySink::__com_impl__IUnknown__Release
             at .\src\query_sink.rs:31
   5: CoMarshalInterface
   6: CoMarshalInterface
   7: CoMarshalInterface
   8: CoMarshalInterface
   9: NdrSendReceive
  10: NdrClientCall3
  11: NdrStubCall3
  12: CStdStubBuffer_Invoke
  13: CoWaitForMultipleHandles
  14: CoWaitForMultipleHandles
  15: CoGetMalloc
  16: CStdStubBuffer_QueryInterface
  17: CoGetApartmentType
  18: CoMarshalInterface
  19: CoMarshalInterface
  20: CoMarshalInterface
  21: RpcExceptionFilter
  22: RpcBindingCopy
  23: RpcBindingCopy
  24: I_RpcBCacheAllocate
  25: I_RpcBCacheAllocate
  26: I_RpcBCacheAllocate
  27: I_RpcBCacheAllocate
  28: I_RpcBCacheAllocate
  29: TpCallbackIndependent
  30: RtlInitializeResource
  31: BaseThreadInitThunk
  32: RtlUserThreadStart
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
fatal runtime error: Rust panics must be rethrown
error: test failed, to rerun pass '--lib

If I understand correctly, the Release method provided by com_impl is called too many time, so WMI releases the QuerySink and decrement the refcount below zero, which causes this stacktrace.
In my opinion, I should not call the AddRef method to try to get around the error. The QuerySink::create_raw(sender); in query_sink.rs is already initializing the RefCount to 1.

So I'm a bit stuck there at the moment... I don't understand why QuerySink is realeased more than once by WMI.
It struck my motivation a little bit tbh ^^

EDIT: I guess those are the worst errors, the one that are not consistent between 2 runs...:

---- query::tests::it_works_async stdout ----
[Ok(IWbemClassWrapper { inner: Some(0x22b2fefbba0) })]


successes:
    query::tests::it_works_async
    query_sink::tests::it_should_use_async_channel_to_send_result

test result: ok. 2 passed; 0 failed; 0 ignored; 0 measured; 39 filtered out; finished in 2.96s

My wild guess would be that the Release is sometime called before the AddRef occured, i don't know how it can be possible...
The only hint is in this documentation :

pResponseHandler

Pointer to the caller's implementation of IWbemObjectSink. This handler receives the objects in the query result set as they become available. If any error code is returned, then the supplied IWbemObjectSink pointer is not used. If WBEM_S_NO_ERROR is returned, then the user's IWbemObjectSink implementation is called to indicate the result of the operation. Windows Management Instrumentation (WMI) calls IWbemObjectSink::Indicate with the objects any number of times, followed by a single call to IWbemObjectSink::SetStatus to indicate the final status.

WMI only calls AddRef to the pointer when WBEM_S_NO_ERROR returns. When an error code returns, the reference count is the same as on entry. For a detailed explanation of asynchronous calling methods, see Calling a Method.

@apennamen
Copy link
Contributor Author

apennamen commented Jan 15, 2021

Hello there,

I tried a similar implementation as the one in the other branch but with async_channel. Test is still failing with the refcount problem...
At least I share a test of the exec_async_query_native_wrapper, even if it's failing. I wish it's only a silly mistake...

I thought maybe query_sink is released by rust because there is no real way for the compiler to know that query_sink has to exist after the .await.

// Execute the given query in async way, returns result in a Sink.
    #[cfg(feature = "async-query")]
    pub async fn exec_async_query_native_wrapper(
        &self,
        query: impl AsRef<str>,
    ) -> Result<Vec<Result<IWbemClassWrapper, WMIError>>, WMIError> {
        let query_language = BStr::from_str("WQL")?;
        let query = BStr::from_str(query.as_ref())?;

        let (tx, rx) = async_channel::unbounded();
        let p_sink: ComPtr<IWbemObjectSink> = QuerySink::new(tx);

        unsafe {
            check_hres((*self.svc()).ExecQueryAsync(
                query_language.as_bstr(),
                query.as_bstr(),
                WBEM_FLAG_BIDIRECTIONAL as i32,
                ptr::null_mut(),
                p_sink.as_raw(),
            ))?;
            
        }

        // TODO: transform to Result<Vec<IWbemClassWrapper>, WMIError>
        // TODO: try not to use collect as it adds dependency to futures::stream::StreamExt
        Ok(rx.collect::<Vec<_>>().await) // <= query_sink must be released only after await finishes
    }

So I tried the following version but it's failing in the exact same way...

// Execute the given query in async way, returns result in a Sink.
    #[cfg(feature = "async-query")]
    pub async fn exec_async_query_native_wrapper(
        &self,
        query: impl AsRef<str>,
    ) -> Result<Vec<Result<IWbemClassWrapper, WMIError>>, WMIError> {
        let query_language = BStr::from_str("WQL")?;
        let query = BStr::from_str(query.as_ref())?;

        let (tx, rx) = async_channel::unbounded();
        let p_sink: ComPtr<IWbemObjectSink> = QuerySink::new(tx);

        unsafe {
            p_sink.AddRef();
            check_hres((*self.svc()).ExecQueryAsync(
                query_language.as_bstr(),
                query.as_bstr(),
                WBEM_FLAG_BIDIRECTIONAL as i32,
                ptr::null_mut(),
                p_sink.as_raw(),
            ))?;
            
        }

        // TODO: transform to Result<Vec<IWbemClassWrapper>, WMIError>
        // TODO: try not to use collect as it adds dependency to futures::stream::StreamExt
        let result = rx.collect::<Vec<_>>().await;
        unsafe { p_sink.Release(); } // now p_sink shouldn't be released before the end of await
        Ok(result)
    }

I don't know if you can spot something obviously wrong, I hope you can :)

EDIT: woops broke the pipeline ! Quick precision, I launch my tests with cargo test async --features async-query,test -- --show-output, that's why I include _async_ at the beginning of the names of my tests to target them when launching tests.

@apennamen
Copy link
Contributor Author

Hello,

Quick update on the subject: just to be able to go forward, I "hacked" the RefCount by calling p_sink.AddRef() the required number of time (apparently it's 4 times ^^). At least I have a passing test, I'll have to ask for help to the community on this weird behaviour.
I'm almost sure it's not a problem with the implementation, rather how WMI or com_impl handles AddRef & Release.

Otherwise I made some refactoring, to isolate all the code related to the feature in a separate async_query file, it's much nicer for the feature flipping! Also changed the signature of exec_async_query_native_wrapper :

pub fn exec_async_query_native_wrapper(
        &self,
        query: impl AsRef<str>,
    ) -> Result<impl Stream<Item=Result<IWbemClassWrapper, WMIError>>, WMIError> {

As you suggested, it's now returning a Stream.

Regards,
AP.

@apennamen apennamen changed the title [WIP] Add async query support Add async query support Jan 24, 2021
@apennamen
Copy link
Contributor Author

Hello,
I do not have anymore ideas on how to improve the code, tests or documentation, so I'm asking for a review.

There's 2 things to "fix":

  1. The com-impl link in Cargo.toml, as soon as a new one is released. I currently point to the git repo instead
  2. Pipeline is failing because of code in Readme, maybe you have a simple fix, otherwise I simply remove it from the readme.

Regards,
AP

@ohadravid
Copy link
Owner

Hi @apennamen, I'll try to review this over the weekend 😁

src/async_query.rs Outdated Show resolved Hide resolved
src/lib.rs Outdated Show resolved Hide resolved
src/query_sink.rs Outdated Show resolved Hide resolved
}

let lObjectCount = lObjectCount as usize;
let tx = self.sender.clone();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the clone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be unnecessary, I wanted to be sure that each thread gets its own Sender

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the channel is Send+Sync, so even for different threads, it should be ok to use the same sender (I think)

src/query_sink.rs Outdated Show resolved Hide resolved
return E_FAIL;
}
} else {
if let Err(e) = tx.try_send(Err(WMIError::NullPointerResult)) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice :)

src/async_query.rs Show resolved Hide resolved
Copy link
Owner

@ohadravid ohadravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apennamen awesome, thank you for working on this :)

I'll merge it now and release a version soon.

Cheers!

}

let lObjectCount = lObjectCount as usize;
let tx = self.sender.clone();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the channel is Send+Sync, so even for different threads, it should be ok to use the same sender (I think)

@ohadravid ohadravid merged commit 6362967 into ohadravid:master Feb 19, 2021
@ohadravid ohadravid mentioned this pull request Feb 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants