From 7767d9504855776c22cad8aa1ee4cf9ba05bd1e9 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 28 Mar 2022 15:34:41 +0800 Subject: [PATCH] metadata: avoid change metadata ref (#566) After metadata is received, every entry should be owned by the grpc c core, application should clone the byte slice if necessary. In the past, we get around the problem by increasing the reference count of the slice. However, it's unsafe to do so as the slice is not guaranteed to be accessible in the first place. This PR fixes the problem by introducing an unowned metadata type. It works just like Metadata, but accessing its content requires manual check for the lifetime of associated call. Signed-off-by: Jay Lee --- grpc-sys/grpc_wrap.cc | 20 -------------------- proto/Cargo.toml | 2 +- src/buf.rs | 2 +- src/call/client.rs | 35 ++++++++++++++++++++--------------- src/call/mod.rs | 9 +++++---- src/metadata.rs | 38 +++++++++++++++++++++++++++++++++----- src/task/mod.rs | 2 +- src/task/promise.rs | 14 +++++++------- 8 files changed, 68 insertions(+), 54 deletions(-) diff --git a/grpc-sys/grpc_wrap.cc b/grpc-sys/grpc_wrap.cc index 0aba43384..786214b30 100644 --- a/grpc-sys/grpc_wrap.cc +++ b/grpc-sys/grpc_wrap.cc @@ -283,16 +283,6 @@ grpcwrap_request_call_context_destroy(grpcwrap_request_call_context* ctx) { GPR_EXPORT void GPR_CALLTYPE grpcwrap_batch_context_take_recv_initial_metadata( grpcwrap_batch_context* ctx, grpc_metadata_array* res) { grpcwrap_metadata_array_move(res, &(ctx->recv_initial_metadata)); - - /* According to the documentation for struct grpc_op in grpc_types.h, - * ownership of keys and values for - * metadata stays with the call object. This means we have ref each of the - * keys and values here. */ - size_t i; - for (i = 0; i < res->count; i++) { - grpc_slice_ref(res->metadata[i].key); - grpc_slice_ref(res->metadata[i].value); - } } GPR_EXPORT void GPR_CALLTYPE @@ -300,16 +290,6 @@ grpcwrap_batch_context_take_recv_status_on_client_trailing_metadata( grpcwrap_batch_context* ctx, grpc_metadata_array* res) { grpcwrap_metadata_array_move(res, &(ctx->recv_status_on_client.trailing_metadata)); - - /* According to the documentation for struct grpc_op in grpc_types.h, - * ownership of keys and values for - * metadata stays with the call object. This means we have ref each of the - * keys and values here. */ - size_t i; - for (i = 0; i < res->count; i++) { - grpc_slice_ref(res->metadata[i].key); - grpc_slice_ref(res->metadata[i].value); - } } GPR_EXPORT const char* GPR_CALLTYPE diff --git a/proto/Cargo.toml b/proto/Cargo.toml index 27cfdc948..d638d4798 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -27,5 +27,5 @@ protobuf = "2" lazy_static = { version = "1.3", optional = true } [build-dependencies] -protobuf-build = { version = ">=0.12", default-features = false } +protobuf-build = { version = ">=0.13", default-features = false } walkdir = "2.2" diff --git a/src/buf.rs b/src/buf.rs index de8fe5463..7bde49034 100644 --- a/src/buf.rs +++ b/src/buf.rs @@ -15,7 +15,7 @@ const INLINED_SIZE: usize = mem::size_of::() + mem::size_of::<*mut /// A convenient rust wrapper for the type `grpc_slice`. /// /// It's expected that the slice should be initialized. -#[repr(C)] +#[repr(transparent)] pub struct GrpcSlice(grpc_slice); impl GrpcSlice { diff --git a/src/call/client.rs b/src/call/client.rs index c87eac121..1032ad54f 100644 --- a/src/call/client.rs +++ b/src/call/client.rs @@ -19,7 +19,7 @@ use crate::call::{check_run, Call, MessageReader, Method}; use crate::channel::Channel; use crate::codec::{DeserializeFn, SerializeFn}; use crate::error::{Error, Result}; -use crate::metadata::{Metadata, MetadataBuilder}; +use crate::metadata::{Metadata, UnownedMetadata}; use crate::task::{BatchFuture, BatchType}; /// Update the flag bit in res. @@ -225,8 +225,8 @@ pub struct ClientUnaryReceiver { resp_de: DeserializeFn, finished: bool, message: Option, - initial_metadata: Metadata, - trailing_metadata: Metadata, + initial_metadata: UnownedMetadata, + trailing_metadata: UnownedMetadata, } impl ClientUnaryReceiver { @@ -237,8 +237,8 @@ impl ClientUnaryReceiver { resp_de, finished: false, message: None, - initial_metadata: MetadataBuilder::new().build(), - trailing_metadata: MetadataBuilder::new().build(), + initial_metadata: UnownedMetadata::empty(), + trailing_metadata: UnownedMetadata::empty(), } } @@ -274,12 +274,14 @@ impl ClientUnaryReceiver { /// Get the initial metadata. pub async fn headers(&mut self) -> Result<&Metadata> { self.wait_for_batch_future().await?; - Ok(&self.initial_metadata) + // Because we have a reference to call, so it's safe to read. + Ok(unsafe { self.initial_metadata.assume_valid() }) } pub async fn trailers(&mut self) -> Result<&Metadata> { self.wait_for_batch_future().await?; - Ok(&self.trailing_metadata) + // Because we have a reference to call, so it's safe to read. + Ok(unsafe { self.trailing_metadata.assume_valid() }) } pub fn receive_sync(&mut self) -> Result<(Metadata, T, Metadata)> { @@ -325,8 +327,8 @@ pub struct ClientCStreamReceiver { resp_de: DeserializeFn, finished: bool, message: Option, - initial_metadata: Metadata, - trailing_metadata: Metadata, + initial_metadata: UnownedMetadata, + trailing_metadata: UnownedMetadata, } impl ClientCStreamReceiver { @@ -337,8 +339,8 @@ impl ClientCStreamReceiver { resp_de, finished: false, message: None, - initial_metadata: MetadataBuilder::new().build(), - trailing_metadata: MetadataBuilder::new().build(), + initial_metadata: UnownedMetadata::empty(), + trailing_metadata: UnownedMetadata::empty(), } } @@ -378,12 +380,14 @@ impl ClientCStreamReceiver { /// Get the initial metadata. pub async fn headers(&mut self) -> Result<&Metadata> { self.wait_for_batch_future().await?; - Ok(&self.initial_metadata) + // We still have a reference in share call. + Ok(unsafe { self.initial_metadata.assume_valid() }) } pub async fn trailers(&mut self) -> Result<&Metadata> { self.wait_for_batch_future().await?; - Ok(&self.trailing_metadata) + // We still have a reference in share call. + Ok(unsafe { self.trailing_metadata.assume_valid() }) } } @@ -550,7 +554,7 @@ struct ResponseStreamImpl { read_done: bool, finished: bool, resp_de: DeserializeFn, - headers_f: FutureOrValue, + headers_f: FutureOrValue, } impl ResponseStreamImpl { @@ -623,7 +627,8 @@ impl ResponseStreamImpl { self.headers_f = FutureOrValue::Value(Pin::new(f).await?.initial_metadata); } match &self.headers_f { - FutureOrValue::Value(v) => Ok(v), + // We still have reference to call. + FutureOrValue::Value(v) => Ok(unsafe { v.assume_valid() }), _ => unreachable!(), } } diff --git a/src/call/mod.rs b/src/call/mod.rs index 7aff27660..8a4f7e618 100644 --- a/src/call/mod.rs +++ b/src/call/mod.rs @@ -11,6 +11,7 @@ use std::task::{Context, Poll}; use std::{ptr, slice}; use crate::grpc_sys::{self, grpc_call, grpc_call_error, grpcwrap_batch_context}; +use crate::metadata::UnownedMetadata; use crate::{cq::CompletionQueue, Metadata, MetadataBuilder}; use futures_util::ready; use libc::c_void; @@ -291,8 +292,8 @@ impl BatchContext { /// /// If initial metadata is not fetched or the method has been called, empty metadata will be /// returned. - pub fn take_initial_metadata(&mut self) -> Metadata { - let mut res = MetadataBuilder::with_capacity(0).build(); + pub fn take_initial_metadata(&mut self) -> UnownedMetadata { + let mut res = UnownedMetadata::empty(); unsafe { grpcio_sys::grpcwrap_batch_context_take_recv_initial_metadata( self.ctx, @@ -306,8 +307,8 @@ impl BatchContext { /// /// If trailing metadata is not fetched or the method has been called, empty metadata will be /// returned. - pub fn take_trailing_metadata(&mut self) -> Metadata { - let mut res = MetadataBuilder::with_capacity(0).build(); + pub fn take_trailing_metadata(&mut self) -> UnownedMetadata { + let mut res = UnownedMetadata::empty(); unsafe { grpc_sys::grpcwrap_batch_context_take_recv_status_on_client_trailing_metadata( self.ctx, diff --git a/src/metadata.rs b/src/metadata.rs index a30143b32..296c6bc29 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -147,7 +147,7 @@ impl MetadataBuilder { /// /// Metadata value can be ascii string or bytes. They are distinguish by the /// key suffix, key of bytes value should have suffix '-bin'. -#[repr(C)] +#[repr(transparent)] pub struct Metadata(grpc_metadata_array); impl Metadata { @@ -234,10 +234,6 @@ impl Metadata { } &[] } - - pub(crate) fn as_mut_ptr(&mut self) -> *mut grpc_metadata_array { - &mut self.0 as _ - } } impl fmt::Debug for Metadata { @@ -273,6 +269,38 @@ impl Drop for Metadata { unsafe impl Send for Metadata {} unsafe impl Sync for Metadata {} +/// A special metadata that only for receiving metadata from remote. +/// +/// gRPC C Core manages metadata internally, it's unsafe to read them unless +/// call is not destroyed. +#[repr(transparent)] +pub struct UnownedMetadata(grpc_metadata_array); + +impl UnownedMetadata { + #[inline] + pub fn empty() -> UnownedMetadata { + unsafe { mem::transmute(Metadata::with_capacity(0)) } + } + #[inline] + pub unsafe fn assume_valid(&self) -> &Metadata { + mem::transmute(self) + } + + pub fn as_mut_ptr(&mut self) -> *mut grpc_metadata_array { + &mut self.0 as _ + } +} + +impl Drop for UnownedMetadata { + #[inline] + fn drop(&mut self) { + unsafe { grpcio_sys::grpcwrap_metadata_array_destroy_metadata_only(&mut self.0) } + } +} + +unsafe impl Send for UnownedMetadata {} +unsafe impl Sync for UnownedMetadata {} + /// Immutable metadata iterator /// /// This struct is created by the iter method on `Metadata`. diff --git a/src/task/mod.rs b/src/task/mod.rs index 3c456db28..4f5654cd8 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -22,7 +22,7 @@ use crate::error::{Error, Result}; use crate::server::RequestCallContext; pub(crate) use self::executor::{Executor, Kicker, UnfinishedWork}; -pub use self::promise::BatchResult; +pub(crate) use self::promise::BatchResult; pub use self::promise::BatchType; /// A handle that is used to notify future that the task finishes. diff --git a/src/task/promise.rs b/src/task/promise.rs index 7d25c0c3a..e9b364688 100644 --- a/src/task/promise.rs +++ b/src/task/promise.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use super::Inner; use crate::call::{BatchContext, MessageReader, RpcStatusCode}; use crate::error::Error; -use crate::{Metadata, MetadataBuilder}; +use crate::metadata::UnownedMetadata; /// Batch job type. #[derive(PartialEq, Debug)] @@ -22,25 +22,25 @@ pub enum BatchType { /// A promise result which stores a message reader with bundled metadata. pub struct BatchResult { pub message_reader: Option, - pub initial_metadata: Metadata, - pub trailing_metadata: Metadata, + pub initial_metadata: UnownedMetadata, + pub trailing_metadata: UnownedMetadata, } impl BatchResult { pub fn new( message_reader: Option, - initial_metadata: Option, - trailing_metadata: Option, + initial_metadata: Option, + trailing_metadata: Option, ) -> BatchResult { let initial_metadata = if let Some(m) = initial_metadata { m } else { - MetadataBuilder::new().build() + UnownedMetadata::empty() }; let trailing_metadata = if let Some(m) = trailing_metadata { m } else { - MetadataBuilder::new().build() + UnownedMetadata::empty() }; BatchResult { message_reader,