/
Interpreter.scala
113 lines (106 loc) · 3.25 KB
/
Interpreter.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.twitter.finagle.memcached
import com.twitter.finagle.memcached.protocol._
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer
import org.jboss.netty.util.CharsetUtil
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
import com.twitter.util.Future
import com.twitter.finagle.Service
import com.twitter.finagle.memcached.util.{ParserUtils, AtomicMap}
/**
* Evalutes a given Memcached operation and returns the result.
*/
class Interpreter(map: AtomicMap[ChannelBuffer, ChannelBuffer]) {
import ParserUtils._
def apply(command: Command): Response = {
command match {
case Set(key, flags, expiry, value) =>
map.lock(key) { data =>
data(key) = value
Stored()
}
case Add(key, flags, expiry, value) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined)
NotStored()
else {
data(key) = value
Stored()
}
}
case Replace(key, flags, expiry, value) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined) {
data(key) = value
Stored()
} else {
NotStored()
}
}
case Append(key, flags, expiry, value) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined) {
data(key) = wrappedBuffer(value, existing.get)
Stored()
} else {
NotStored()
}
}
case Prepend(key, flags, expiry, value) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined) {
data(key) = wrappedBuffer(existing.get, value)
Stored()
} else {
NotStored()
}
}
case Get(keys) =>
Values(
keys flatMap { key =>
map.lock(key) { data =>
data.get(key) map { datum =>
Value(key, wrappedBuffer(datum))
}
}
}
)
case Delete(key) =>
map.lock(key) { data =>
if (data.remove(key).isDefined)
Deleted()
else
NotFound()
}
case Incr(key, delta) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined) {
val existingString = existing.get.toString(CharsetUtil.US_ASCII)
if (!existingString.isEmpty && !existingString.matches(DIGITS))
throw new ClientError("cannot increment or decrement non-numeric value")
val existingValue =
if (existingString.isEmpty) 0L
else existingString.toLong
val result = existingValue + delta
data(key) = result.toString
Number(result)
} else {
NotFound()
}
}
case Decr(key, value) =>
map.lock(key) { data =>
apply(Incr(key, -value))
}
}
}
}
class InterpreterService(interpreter: Interpreter) extends Service[Command, Response] {
def apply(command: Command) = Future(interpreter(command))
override def release() = ()
}