-
Notifications
You must be signed in to change notification settings - Fork 85
/
transport_stream.go
97 lines (86 loc) · 2.96 KB
/
transport_stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//
//
// Tencent is pleased to support the open source community by making tRPC available.
//
// Copyright (C) 2023 THL A29 Limited, a Tencent company.
// All rights reserved.
//
// If you have downloaded a copy of the tRPC source code from Tencent,
// please note that tRPC source code is licensed under the Apache 2.0 License,
// A copy of the Apache 2.0 License is included in this file.
//
//
package transport
import (
"context"
"reflect"
"sync"
)
var (
clientStreamTrans = make(map[string]ClientStreamTransport)
muxClientStreamTrans = sync.RWMutex{}
serverStreamTrans = make(map[string]ServerStreamTransport)
muxServerStreamTrans = sync.RWMutex{}
)
// ClientStreamTransport is the client stream transport interface.
// It's compatible with common RPC transport.
type ClientStreamTransport interface {
// Send sends stream messages.
Send(ctx context.Context, req []byte, opts ...RoundTripOption) error
// Recv receives stream messages.
Recv(ctx context.Context, opts ...RoundTripOption) ([]byte, error)
// Init inits the stream.
Init(ctx context.Context, opts ...RoundTripOption) error
// Close closes stream transport, return connection to the resource pool.
Close(ctx context.Context)
}
// ServerStreamTransport is the server stream transport interface.
// It's compatible with common RPC transport.
type ServerStreamTransport interface {
// ServerTransport is used to keep compatibility with common RPC transport.
ServerTransport
// Send sends messages.
Send(ctx context.Context, req []byte) error
// Close is called when server encounters an error and cleans up.
Close(ctx context.Context)
}
// RegisterServerStreamTransport Registers a named ServerStreamTransport.
func RegisterServerStreamTransport(name string, t ServerStreamTransport) {
tv := reflect.ValueOf(t)
if t == nil || tv.Kind() == reflect.Ptr && tv.IsNil() {
panic("transport: register nil server transport")
}
if name == "" {
panic("transport: register empty name of server transport")
}
muxServerStreamTrans.Lock()
serverStreamTrans[name] = t
muxServerStreamTrans.Unlock()
}
// RegisterClientStreamTransport registers a named ClientStreamTransport.
func RegisterClientStreamTransport(name string, t ClientStreamTransport) {
tv := reflect.ValueOf(t)
if t == nil || tv.Kind() == reflect.Ptr && tv.IsNil() {
panic("transport: register nil client transport")
}
if name == "" {
panic("transport: register empty name of client transport")
}
muxClientStreamTrans.Lock()
clientStreamTrans[name] = t
muxClientStreamTrans.Unlock()
}
// GetClientStreamTransport returns ClientStreamTransport by name.
func GetClientStreamTransport(name string) ClientStreamTransport {
muxClientStreamTrans.RLock()
t := clientStreamTrans[name]
muxClientStreamTrans.RUnlock()
return t
}
// GetServerStreamTransport returns ServerStreamTransport by name.
func GetServerStreamTransport(name string) ServerStreamTransport {
muxServerStreamTrans.RLock()
t := serverStreamTrans[name]
muxServerStreamTrans.RUnlock()
return t
}