@@ -7,8 +7,9 @@ use std::{
7
7
8
8
use anyhow:: anyhow;
9
9
use client:: {
10
- blobs:: { BlobInfo , BlobStatus , IncompleteBlobInfo , WrapOption } ,
10
+ blobs:: { self , BlobInfo , BlobStatus , IncompleteBlobInfo , WrapOption } ,
11
11
tags:: TagInfo ,
12
+ MemConnector ,
12
13
} ;
13
14
use futures_buffered:: BufferedStreamExt ;
14
15
use futures_lite:: StreamExt ;
@@ -32,7 +33,11 @@ use proto::{
32
33
} ,
33
34
Request , RpcError , RpcResult , RpcService ,
34
35
} ;
35
- use quic_rpc:: server:: { ChannelTypes , RpcChannel , RpcServerError } ;
36
+ use quic_rpc:: {
37
+ server:: { ChannelTypes , RpcChannel , RpcServerError } ,
38
+ RpcClient , RpcServer ,
39
+ } ;
40
+ use tokio_util:: task:: AbortOnDropHandle ;
36
41
37
42
use crate :: {
38
43
export:: ExportProgress ,
@@ -56,6 +61,16 @@ const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64;
56
61
const RPC_BLOB_GET_CHANNEL_CAP : usize = 2 ;
57
62
58
63
impl < D : crate :: store:: Store > Blobs < D > {
64
+ /// Get a client for the blobs protocol
65
+ pub fn client ( self : Arc < Self > ) -> blobs:: MemClient {
66
+ let client = self
67
+ . rpc_handler
68
+ . get_or_init ( || RpcHandler :: new ( & self ) )
69
+ . client
70
+ . clone ( ) ;
71
+ blobs:: Client :: new ( client)
72
+ }
73
+
59
74
/// Handle an RPC request
60
75
pub async fn handle_rpc_request < C > (
61
76
self : Arc < Self > ,
@@ -871,3 +886,23 @@ impl<D: crate::store::Store> Blobs<D> {
871
886
Ok ( CreateCollectionResponse { hash, tag } )
872
887
}
873
888
}
889
+
890
+ #[ derive( Debug ) ]
891
+ pub ( crate ) struct RpcHandler {
892
+ /// Client to hand out
893
+ client : RpcClient < RpcService , MemConnector > ,
894
+ /// Handler task
895
+ _handler : AbortOnDropHandle < ( ) > ,
896
+ }
897
+
898
+ impl RpcHandler {
899
+ fn new < D : crate :: store:: Store > ( blobs : & Arc < Blobs < D > > ) -> Self {
900
+ let blobs = blobs. clone ( ) ;
901
+ let ( listener, connector) = quic_rpc:: transport:: flume:: channel ( 1 ) ;
902
+ let listener = RpcServer :: new ( listener) ;
903
+ let client = RpcClient :: new ( connector) ;
904
+ let _handler = listener
905
+ . spawn_accept_loop ( move |req, chan| blobs. clone ( ) . handle_rpc_request ( req, chan) ) ;
906
+ Self { client, _handler }
907
+ }
908
+ }
0 commit comments