This repository has been archived by the owner on Sep 18, 2021. It is now read-only.
/
production.scala
136 lines (119 loc) · 3.92 KB
/
production.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import com.twitter.flockdb.config._
import com.twitter.gizzard.config._
import com.twitter.querulous.config._
import com.twitter.querulous.StatsCollector
import com.twitter.conversions.time._
import com.twitter.conversions.storage._
import com.twitter.flockdb.shards.QueryClass
import com.twitter.flockdb.Priority
import com.twitter.ostrich.admin.config.AdminServiceConfig
import com.twitter.logging.Level
import com.twitter.logging.config._
trait Credentials extends Connection {
val username = "root"
val password = ""
}
class ProductionQueryEvaluator extends AsyncQueryEvaluator {
database.memoize = true
database.pool = new ThrottledPoolingDatabase {
size = 40
openTimeout = 100.millis
}
query.timeouts = Map(
QueryClass.Select -> QueryTimeout(1.second),
QueryClass.Execute -> QueryTimeout(1.second),
QueryClass.SelectCopy -> QueryTimeout(15.seconds),
QueryClass.SelectModify -> QueryTimeout(3.seconds),
QueryClass.SelectSingle -> QueryTimeout(1.second),
QueryClass.SelectIntersection -> QueryTimeout(1.second),
QueryClass.SelectIntersectionSmall -> QueryTimeout(1.second),
QueryClass.SelectMetadata -> QueryTimeout(1.second)
)
}
class ProductionNameServerReplica(host: String) extends Mysql {
val connection = new Connection with Credentials {
val hostnames = Seq(host)
val database = "flock_edges_production"
}
queryEvaluator = new QueryEvaluator {
database.memoize = true
database.pool = new ThrottledPoolingDatabase {
size = 1
openTimeout = 1.second
}
}
}
new FlockDB {
mappingFunction = ByteSwapper
jobRelay = NoJobRelay
nameServerReplicas = Seq(
new ProductionNameServerReplica("flockdb001.twitter.com"),
new ProductionNameServerReplica("flockdb002.twitter.com")
)
jobInjector.timeout = 100.millis
jobInjector.idleTimeout = 60.seconds
jobInjector.threadPool.minThreads = 30
val databaseConnection = new Credentials {
val hostnames = Seq("localhost")
val database = "edges"
urlOptions = Map("rewriteBatchedStatements" -> "true")
}
val edgesQueryEvaluator = new ProductionQueryEvaluator
val lowLatencyQueryEvaluator = new ProductionQueryEvaluator
val materializingQueryEvaluator = new ProductionQueryEvaluator {
database.pool = new ThrottledPoolingDatabase {
size = 1
openTimeout = 1.second
}
}
class ProductionScheduler(val name: String) extends Scheduler {
jobQueueName = name + "_jobs"
val schedulerType = new KestrelScheduler {
path = "/var/spool/kestrel"
maxMemorySize = 36.megabytes
}
errorLimit = 100
errorRetryDelay = 15.minutes
errorStrobeInterval = 1.second
perFlushItemLimit = 100
jitterRate = 0
}
val jobQueues = Map(
Priority.High.id -> new ProductionScheduler("edges") { threads = 32 },
Priority.Medium.id -> new ProductionScheduler("copy") { threads = 12; errorRetryDelay = 60.seconds },
Priority.Low.id -> new ProductionScheduler("edges_slow") { threads = 2 }
)
val adminConfig = new AdminServiceConfig {
httpPort = Some(9990)
}
loggers = List(
new LoggerConfig {
level = Some(Level.INFO)
handlers = List(
new ThrottledHandlerConfig {
duration = 60.seconds
maxToDisplay = 10
handler = new FileHandlerConfig {
filename = "/var/log/flock/production.log"
roll = Policy.Hourly
}
})
},
new LoggerConfig {
node = "stats"
useParents = false
level = Some(Level.INFO)
handlers = List(new ScribeHandlerConfig {
category = "flock-stats"
})
},
new LoggerConfig {
node = "bad_jobs"
useParents = false
level = Some(Level.INFO)
handlers = List(new FileHandlerConfig {
roll = Policy.Never
filename = "/var/log/flock/bad_jobs.log"
})
})
}