/
server.go
143 lines (131 loc) · 3.59 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
// Package server is an interface for a micro server
package server
import (
"context"
"time"
"github.com/google/uuid"
"github.com/micro/go-micro/v3/codec"
"github.com/micro/go-micro/v3/registry"
)
// Server is a simple micro server abstraction
type Server interface {
// Initialise options
Init(...Option) error
// Retrieve the options
Options() Options
// Register a handler
Handle(Handler) error
// Create a new handler
NewHandler(interface{}, ...HandlerOption) Handler
// Create a new subscriber
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
// Register a subscriber
Subscribe(Subscriber) error
// Start the server
Start() error
// Stop the server
Stop() error
// Server implementation
String() string
}
// Router handle serving messages
type Router interface {
// ProcessMessage processes a message
ProcessMessage(context.Context, Message) error
// ServeRequest processes a request to completion
ServeRequest(context.Context, Request, Response) error
}
// Message is an async message interface
type Message interface {
// Topic of the message
Topic() string
// The decoded payload value
Payload() interface{}
// The content type of the payload
ContentType() string
// The raw headers of the message
Header() map[string]string
// The raw body of the message
Body() []byte
// Codec used to decode the message
Codec() codec.Reader
}
// Request is a synchronous request interface
type Request interface {
// Service name requested
Service() string
// The action requested
Method() string
// Endpoint name requested
Endpoint() string
// Content type provided
ContentType() string
// Header of the request
Header() map[string]string
// Body is the initial decoded value
Body() interface{}
// Read the undecoded request body
Read() ([]byte, error)
// The encoded message stream
Codec() codec.Reader
// Indicates whether its a stream
Stream() bool
}
// Response is the response writer for unencoded messages
type Response interface {
// Encoded writer
Codec() codec.Writer
// Write the header
WriteHeader(map[string]string)
// write a response directly to the client
Write([]byte) error
}
// Stream represents a stream established with a client.
// A stream can be bidirectional which is indicated by the request.
// The last error will be left in Error().
// EOF indicates end of the stream.
type Stream interface {
Context() context.Context
Request() Request
Send(interface{}) error
Recv(interface{}) error
Error() error
Close() error
}
// Handler interface represents a request handler. It's generated
// by passing any type of public concrete object with endpoints into server.NewHandler.
// Most will pass in a struct.
//
// Example:
//
// type Greeter struct {}
//
// func (g *Greeter) Hello(context, request, response) error {
// return nil
// }
//
type Handler interface {
Name() string
Handler() interface{}
Endpoints() []*registry.Endpoint
Options() HandlerOptions
}
// Subscriber interface represents a subscription to a given topic using
// a specific subscriber function or object with endpoints. It mirrors
// the handler in its behaviour.
type Subscriber interface {
Topic() string
Subscriber() interface{}
Endpoints() []*registry.Endpoint
Options() SubscriberOptions
}
type Option func(*Options)
var (
DefaultAddress = ":0"
DefaultName = "go.micro.server"
DefaultVersion = "latest"
DefaultId = uuid.New().String()
DefaultRegisterCheck = func(context.Context) error { return nil }
DefaultRegisterInterval = time.Second * 30
DefaultRegisterTTL = time.Second * 90
)