Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
[split] refactoring snowflake configs to make it easier for release t…
Browse files Browse the repository at this point in the history
…o deploy to smfd Mostly just yak-factoring to make configs easier to manage (and be more validated).

- another config refactor
- refactor to put the worker ids in the config file
- validate configs in snowflake-twitter
- validate snowflake configs when testing
- comment out this test that requires a server. let us find a better way to test this
- take zookeeper out of libs
- switch scala-zookeeper-config to a project dependency
  • Loading branch information
Ryan King committed Nov 14, 2011
1 parent 085fb4d commit 6d4634a
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 128 deletions.
35 changes: 18 additions & 17 deletions config/development.scala
Expand Up @@ -2,28 +2,29 @@ import com.twitter.service.snowflake.{SnowflakeConfig, ReporterConfig}
import com.twitter.logging.config.{LoggerConfig, FileHandlerConfig}
import com.twitter.logging.Level
import com.twitter.zookeeper.ZookeeperClientConfig
import java.net.InetAddress

new SnowflakeConfig {
val serverPort = 7609
val datacenterId = 0
val workerId = 0
val adminPort = 9990
val adminBacklog = 100
val workerIdZkPath = "/snowflake-servers"
val skipSanityChecks = false
val startupSleepMs = 10000
val thriftServerThreads = 2
serverPort = 7609
datacenterId = 0
workerIdMap = Map(0 -> InetAddress.getLocalHost.getHostName)
adminPort = 9990
adminBacklog = 100
workerIdZkPath = "/snowflake-servers"
skipSanityChecks = false
startupSleepMs = 10000
thriftServerThreads = 2

val zookeeperClientConfig = new ZookeeperClientConfig {
val hostList = "localhost"
zookeeperClientConfig = new ZookeeperClientConfig {
hostList = "localhost"
}

val reporterConfig = new ReporterConfig {
val scribeCategory = "snowflake"
val scribeHost = "localhost"
val scribePort = 1463
val scribeSocketTimeout = 5000
val flushQueueLimit = 100000
reporterConfig = new ReporterConfig {
scribeCategory = "snowflake"
scribeHost = "localhost"
scribePort = 1463
scribeSocketTimeout = 5000
flushQueueLimit = 100000
}

loggers = new LoggerConfig {
Expand Down
35 changes: 18 additions & 17 deletions config/development2.scala
Expand Up @@ -2,28 +2,29 @@ import com.twitter.service.snowflake.{SnowflakeConfig, ReporterConfig}
import com.twitter.logging.config.{LoggerConfig, FileHandlerConfig}
import com.twitter.logging.Logger
import com.twitter.zookeeper.ZookeeperClientConfig
import java.net.InetAddress

new SnowflakeConfig {
val serverPort = 7610
val datacenterId = 0
val workerId = 1
val adminPort = 9991
val adminBacklog = 100
val workerIdZkPath = "/snowflake-servers"
val skipSanityChecks = false
val startupSleepMs = 10000
val thriftServerThreads = 2
serverPort = 7610
datacenterId = 0
workerIdMap = Map(1 -> InetAddress.getLocalHost.getHostName)
adminPort = 9991
adminBacklog = 100
workerIdZkPath = "/snowflake-servers"
skipSanityChecks = false
startupSleepMs = 10000
thriftServerThreads = 2

val zookeeperClientConfig = new ZookeeperClientConfig {
val hostList = "localhost"
zookeeperClientConfig = new ZookeeperClientConfig {
hostList = "localhost"
}

val reporterConfig = new ReporterConfig {
val scribeCategory = "snowflake"
val scribeHost = "localhost"
val scribePort = 1463
val scribeSocketTimeout = 5000
val flushQueueLimit = 100000
reporterConfig = new ReporterConfig {
scribeCategory = "snowflake"
scribeHost = "localhost"
scribePort = 1463
scribeSocketTimeout = 5000
flushQueueLimit = 100000
}

loggers = new LoggerConfig {
Expand Down
33 changes: 17 additions & 16 deletions config/test.scala
@@ -1,22 +1,23 @@
import com.twitter.service.snowflake.{SnowflakeConfig, ReporterConfig}
import com.twitter.zookeeper.ZookeeperClientConfig
import java.net.InetAddress

new SnowflakeConfig {
val serverPort = 7609
val datacenterId = 0
val workerId = 0
val adminPort = 9990
val adminBacklog = 100
val workerIdZkPath = "/snowflake-servers"
val zkHostlist = "localhost"
val skipSanityChecks = false
val startupSleepMs = 10000
val thriftServerThreads = 2
serverPort = 7609
datacenterId = 0
workerIdMap = Map(0 -> InetAddress.getLocalHost.getHostName)
adminPort = 9990
adminBacklog = 100
workerIdZkPath = "/snowflake-servers"
skipSanityChecks = false
startupSleepMs = 10000
thriftServerThreads = 2

val reporterConfig = new ReporterConfig {
val scribeCategory = "snowflake"
val scribeHost = "localhost"
val scribePort = 1463
val scribeSocketTimeout = 5000
val flushQueueLimit = 100000
zookeeperClientConfig = new ZookeeperClientConfig {
hostList = "localhost"
}

reporterConfig = new ReporterConfig {
flushQueueLimit = 100000
}
}
Binary file removed libs/zookeeper-3.3.3.jar
Binary file not shown.
7 changes: 4 additions & 3 deletions project/build/Snowflake.scala
Expand Up @@ -10,15 +10,16 @@ class SnowflakeProject(info: ProjectInfo) extends StandardServiceProject(info)
with NoisyDependencies {
val slf4jApi = "org.slf4j" % "slf4j-api" % "1.5.8"
val slf4jLog = "org.slf4j" % "slf4j-nop" % "1.5.8"
val sp = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5"
val sp = "org.scala-tools.testing" % "specs_2.8.1" % "1.6.8"
val thrift = "thrift" % "libthrift" % "0.5.0"
val commonsCodec = "commons-codec" % "commons-codec" % "1.4"
val zookeeperClient = "com.twitter" % "zookeeper-client" % "3.0.1"

projectDependencies(
"ostrich",
"util" ~ "util-logging",
"util" ~ "util-thrift")
"util" ~ "util-thrift",
"scala-zookeeper-client"
)

override def ivyXML =
<dependencies>
Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/com/twitter/service/snowflake/IdWorker.scala
Expand Up @@ -12,11 +12,10 @@ import com.twitter.logging.Logger
* we ever want to support multiple worker threads
* per process
*/
class IdWorker(workerId: Long, datacenterId: Long, reporterConfig: ReporterConfig) extends Snowflake.Iface {
class IdWorker(workerId: Long, datacenterId: Long, private val reporter: Reporter)
extends Snowflake.Iface {
private val genCounter = Stats.getCounter("ids_generated")
private val exceptionCounter = Stats.getCounter("exceptions")
private val reporter = new Reporter(reporterConfig.scribeCategory, reporterConfig.scribeHost,
reporterConfig.scribePort, reporterConfig.scribeSocketTimeout, reporterConfig.flushQueueLimit)
private val log = Logger.get
private val rand = new Random

Expand Down
18 changes: 12 additions & 6 deletions src/main/scala/com/twitter/service/snowflake/ReporterConfig.scala
@@ -1,9 +1,15 @@
package com.twitter.service.snowflake

trait ReporterConfig {
val scribeCategory: String
val scribeHost: String
val scribePort: Int
val scribeSocketTimeout: Int
val flushQueueLimit: Int
import com.twitter.util.Config

trait ReporterConfig extends Config[Reporter] {
var scribeCategory = "snowflake"
var scribeHost = "localhost"
var scribePort = 1463
var scribeSocketTimeout = 5000
var flushQueueLimit = 100000

def apply = {
new Reporter(scribeCategory, scribeHost, scribePort, scribeSocketTimeout, flushQueueLimit)
}
}
43 changes: 31 additions & 12 deletions src/main/scala/com/twitter/service/snowflake/SnowflakeConfig.scala
Expand Up @@ -3,23 +3,42 @@ package com.twitter.service.snowflake
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.ostrich.admin.config.ServerConfig
import com.twitter.zookeeper.ZookeeperClientConfig
import java.net.InetAddress
import com.twitter.zookeeper.ZookeeperClientConfig


trait SnowflakeConfig extends ServerConfig[SnowflakeServer] {
val serverPort: Int
val datacenterId: Int
val workerId: Int
val adminPort: Int
val adminBacklog: Int
val workerIdZkPath: String
val skipSanityChecks: Boolean
val startupSleepMs: Int
val thriftServerThreads: Int
var serverPort = 7609
var datacenterId = required[Int]
var workerIdMap = required[Map[Int, String]]
var adminPort = 9990
var adminBacklog = 10
var workerIdZkPath = "/snowflake-servers"
var skipSanityChecks = false
var startupSleepMs = 10000
var thriftServerThreads = 2

var reporterConfig = required[ReporterConfig]

val reporterConfig: ReporterConfig
var zookeeperClientConfig = required[ZookeeperClientConfig]

val zookeeperClientConfig: ZookeeperClientConfig
def workerIdFor(host: InetAddress) = {
workerIdMap.mapValues(
name => name.split(':')(0)
).find {
case(k,v) => v == host.getHostName.split(':')(0)
}.get._1
}

def apply(runtime: RuntimeEnvironment) = {
new SnowflakeServer(this)
new SnowflakeServer(serverPort, datacenterId, workerIdFor(InetAddress.getLocalHost),
adminPort, adminBacklog, workerIdZkPath, skipSanityChecks, startupSleepMs,
thriftServerThreads, reporterConfig(), zookeeperClientConfig())
}

override def validate = {
zookeeperClientConfig.validate
reporterConfig.validate
super.validate
}
}
56 changes: 28 additions & 28 deletions src/main/scala/com/twitter/service/snowflake/SnowflakeServer.scala
Expand Up @@ -17,7 +17,6 @@ import scala.collection.mutable
import java.net.InetAddress
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.ostrich.stats.Stats
import com.twitter.ostrich.admin.config.ServerConfig
import com.twitter.ostrich.admin.Service
import com.twitter.logging.Logger
import com.twitter.logging.config.LoggerConfig
Expand All @@ -40,16 +39,16 @@ object SnowflakeServer {
}
}

class SnowflakeServer(config: SnowflakeConfig) extends Service {
// NOTE: this is a bit unweildy. If we start using it in more than one place we should refactor
class SnowflakeServer(serverPort: Int, datacenterId: Int, workerId: Int, adminPort: Int,
adminBacklog: Int, workerIdZkPath: String, skipSanityChecks: Boolean, startupSleepMs: Int,
thriftServerThreads: Int, reporter: Reporter, zkClient: ZooKeeperClient) extends Service {

private val log = Logger.get
var server: TServer = null
lazy val zkClient = {
log.info("Creating ZooKeeper client connected to %s", config.zookeeperClientConfig.hostList)
new ZooKeeperClient(config.zookeeperClientConfig)
}

Stats.addGauge("datacenter_id") { config.datacenterId }
Stats.addGauge("worker_id") { config.workerId }
Stats.addGauge("datacenter_id") { datacenterId }
Stats.addGauge("worker_id") { workerId }

def shutdown(): Unit = {
if (server != null) {
Expand All @@ -60,26 +59,27 @@ class SnowflakeServer(config: SnowflakeConfig) extends Service {
}

def start {
if (!config.skipSanityChecks) {
if (!skipSanityChecks) {
sanityCheckPeers()
}

registerWorkerId(config.workerId)
val admin = new AdminService(config.adminPort, config.adminBacklog, new RuntimeEnvironment(getClass))
registerWorkerId(workerId)
// TODO: move to config application
val admin = new AdminService(adminPort, adminBacklog, new RuntimeEnvironment(getClass))

Thread.sleep(config.startupSleepMs)
Thread.sleep(startupSleepMs)

try {
val worker = new IdWorker(config.workerId, config.datacenterId, config.reporterConfig)
val worker = new IdWorker(workerId, datacenterId, reporter)

val processor = new Snowflake.Processor(worker)
val transport = new TNonblockingServerSocket(config.serverPort)
val transport = new TNonblockingServerSocket(serverPort)
val serverOpts = new THsHaServer.Options
serverOpts.workerThreads = config.thriftServerThreads
serverOpts.workerThreads = thriftServerThreads

val server = new THsHaServer(processor, transport, serverOpts)

log.info("Starting server on port %s with workerThreads=%s", config.serverPort, serverOpts.workerThreads)
log.info("Starting server on port %s with workerThreads=%s", serverPort, serverOpts.workerThreads)
server.serve()
} catch {
case e: Exception => {
Expand All @@ -94,8 +94,8 @@ class SnowflakeServer(config: SnowflakeConfig) extends Service {
var tries = 0
while (true) {
try {
zkClient.create("%s/%s".format(config.workerIdZkPath, i),
(getHostname + ':' + config.serverPort).getBytes(), EPHEMERAL)
zkClient.create("%s/%s".format(workerIdZkPath, i),
(getHostname + ':' + serverPort).getBytes(), EPHEMERAL)
return
} catch {
case e: NodeExistsException => {
Expand All @@ -115,17 +115,17 @@ class SnowflakeServer(config: SnowflakeConfig) extends Service {
def peers(): mutable.HashMap[Int, Peer] = {
var peerMap = new mutable.HashMap[Int, Peer]
try {
zkClient.get(config.workerIdZkPath)
zkClient.get(workerIdZkPath)
} catch {
case _ => {
log.info("%s missing, trying to create it", config.workerIdZkPath)
zkClient.create(config.workerIdZkPath, Array(), PERSISTENT)
log.info("%s missing, trying to create it", workerIdZkPath)
zkClient.create(workerIdZkPath, Array(), PERSISTENT)
}
}

val children = zkClient.getChildren(config.workerIdZkPath)
val children = zkClient.getChildren(workerIdZkPath)
children.foreach { i =>
val peer = zkClient.get("%s/%s".format(config.workerIdZkPath, i))
val peer = zkClient.get("%s/%s".format(workerIdZkPath, i))
val list = new String(peer).split(':')
peerMap(i.toInt) = new Peer(new String(list(0)), list(1).toInt)
}
Expand All @@ -137,7 +137,7 @@ class SnowflakeServer(config: SnowflakeConfig) extends Service {
def sanityCheckPeers() {
var peerCount = 0
val timestamps = peers().filter{ case (id: Int, peer: Peer) =>
!(peer.hostname == getHostname && peer.port == config.serverPort)
!(peer.hostname == getHostname && peer.port == serverPort)
}.map { case (id: Int, peer: Peer) =>
try {
log.info("connecting to %s:%s".format(peer.hostname, peer.port))
Expand All @@ -149,17 +149,17 @@ class SnowflakeServer(config: SnowflakeConfig) extends Service {
}

val reportedDatacenterId = c.get_datacenter_id()
if (reportedDatacenterId != config.datacenterId) {
log.error("Worker at %s:%s has datacenter_id %d, but ours is %d",
peer.hostname, peer.port, reportedDatacenterId, config.datacenterId)
if (reportedDatacenterId != datacenterId) {
log.error("Worker at %s:%s has datacenter_id %d, but ours is %d",
peer.hostname, peer.port, reportedDatacenterId, datacenterId)
throw new IllegalStateException("Datacenter id insanity.")
}

peerCount = peerCount + 1
c.get_timestamp().toLong
} catch {
case e: TTransportException => {
log.error("Couldn't talk to peer %s at %s:%s", config.workerId, peer.hostname, peer.port)
log.error("Couldn't talk to peer %s at %s:%s", workerId, peer.hostname, peer.port)
throw e
}
}
Expand Down

0 comments on commit 6d4634a

Please sign in to comment.