forked from twitter/finagle
/
Service.scala
160 lines (141 loc) · 5.04 KB
/
Service.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package com.twitter.finagle
import com.twitter.util.Future
import com.twitter.finagle.service.RefcountedService
/**
* A Service is an asynchronous function from Request to Future[Response]. It is the
* basic unit of an RPC interface.
*
* '''Note:''' this is an abstract class (vs. a trait) to maintain java
* compatibility, as it has implementation as well as interface.
*/
abstract class Service[-Req, +Rep] extends (Req => Future[Rep]) {
def map[Req1](f: (Req1) => (Req)) = new Service[Req1, Rep] {
def apply(req1: Req1) = Service.this.apply(f(req1))
override def release() = Service.this.release()
}
/**
* This is the method to override/implement to create your own Service.
*/
def apply(request: Req): Future[Rep]
/**
* Relinquishes the use of this service instance. Behavior is
* undefined is apply() is called after resources are relinquished.
*/
def release() = ()
/**
* Determines whether this service is available (can accept requests
* with a reasonable likelihood of success).
*/
def isAvailable: Boolean = true
}
abstract class ServiceFactory[-Req, +Rep] {
/**
* Reserve the use of a given service instance. This pins the
* underlying channel and the returned service has exclusive use of
* its underlying connection. To relinquish the use of the reserved
* Service, the user must call Service.release().
*/
def make(): Future[Service[Req, Rep]]
/**
* Close the factory and its underlying resources.
*/
def close()
def isAvailable: Boolean = true
}
class FactoryToService[Req, Rep](factory: ServiceFactory[Req, Rep])
extends Service[Req, Rep]
{
def apply(request: Req) =
factory.make() flatMap { service =>
service(request) ensure { service.release() }
}
override def release() = factory.close()
override def isAvailable = factory.isAvailable
}
/**
* A Filter acts as a decorator/transformer of a service. It may apply
* transformations to the input and output of that service:
*
* (* MyService *)
* [ReqIn -> (ReqOut -> RepIn) -> RepOut]
*
* For example, you may have a POJO service that takes Strings and
* parses them as Ints. If you want to expose this as a Network
* Service via Thrift, it is nice to isolate the protocol handling
* from the business rules. Hence you might have a Filter that
* converts back and forth between Thrift structs. Again, your service
* deals with POJOs:
*
* [ThriftIn -> (String -> Int) -> ThriftOut]
*
*/
abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
extends ((ReqIn, Service[ReqOut, RepIn]) => Future[RepOut])
{
/**
* This is the method to override/implement to create your own Filter.
*
* @param request the input request type
* @param service a service that takes the output request type and the input response type
*
*/
def apply(request: ReqIn, service: Service[ReqOut, RepIn]): Future[RepOut]
/**
* Chains a series of filters together:
*
* myModularService = handleExcetions.andThen(thrift2Pojo.andThen(parseString))
*
* @param next another filter to follow after this one
*
*/
def andThen[Req2, Rep2](next: Filter[ReqOut, RepIn, Req2, Rep2]) =
new Filter[ReqIn, RepOut, Req2, Rep2] {
def apply(request: ReqIn, service: Service[Req2, Rep2]) = {
Filter.this.apply(request, new Service[ReqOut, RepIn] {
def apply(request: ReqOut): Future[RepIn] = next(request, service)
override def release() = service.release()
override def isAvailable = service.isAvailable
})
}
}
/**
* Terminates a filter chain in a service. For example,
*
* myFilter.andThen(myService)
*
* @param service a service that takes the output request type and the input response type.
*
*/
def andThen(service: Service[ReqOut, RepIn]) = new Service[ReqIn, RepOut] {
private[this] val refcounted = new RefcountedService(service)
def apply(request: ReqIn) = Filter.this.apply(request, refcounted)
override def release() = refcounted.release()
override def isAvailable = refcounted.isAvailable
}
def andThen(factory: ServiceFactory[ReqOut, RepIn]): ServiceFactory[ReqIn, RepOut] =
new ServiceFactory[ReqIn, RepOut] {
def make() = factory.make() map { Filter.this andThen _ }
override def close() = factory.close()
override def isAvailable = factory.isAvailable
}
/**
* Conditionally propagates requests down the filter chain. This may
* useful if you are statically wiring together filter chains based
* on a configuration file, for instance.
*
* @param condAndFilter a tuple of boolean and filter.
*
*/
def andThenIf[Req2 >: ReqOut, Rep2 <: RepIn](
condAndFilter: (Boolean, Filter[ReqOut, RepIn, Req2, Rep2])) =
condAndFilter match {
case (true, filter) => andThen(filter)
case (false, _) => this
}
}
abstract class SimpleFilter[Req, Rep] extends Filter[Req, Rep, Req, Rep]
object Filter {
def identity[Req, Rep] = new SimpleFilter[Req, Rep] {
def apply(request: Req, service: Service[Req, Rep]) = service(request)
}
}