Skip to content

Commit

Permalink
added log search node
Browse files Browse the repository at this point in the history
  • Loading branch information
javasoze committed Feb 26, 2011
1 parent 0ee7089 commit 48a3dfa
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 43 deletions.
26 changes: 19 additions & 7 deletions config/TwitterStreamer.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,29 @@ socketTimeout = 60000
# chirp specific confs
kafka.host = "localhost"
kafka.port = 9092
kafka.topic = "tweets"

tweet.kafka.topic = "tweets"
log.kafka.topic = "logs"

voldemort.url = "tcp://localhost:6666"
voldemort.store = "tweets"
tweet.voldemort.store = "tweets"

zookeeper.url = "localhost:2181"
zookeeper.cluster = "chirp"

search.node.id = 0
search.node.port = 1234
search.node.partitions = "1"
search.node.index.dir = "config/sensei/index"
tweet.zookeeper.cluster = "chirp"
log.zookeeper.cluster = "chopchop"

tweet.search.node.id = 0
tweet.search.node.port = 1234
tweet.search.node.partitions = "1"
tweet.search.node.index.dir = "config/sensei/index"

log.search.node.id = 0
log.search.node.port = 1234
log.search.node.partitions = "1"
log.search.node.index.dir = "config/sensei/log-index"


search.node.index.batch = 100
search.perPage = 10

Expand Down
8 changes: 8 additions & 0 deletions config/log4j/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@ log4j.appender.console1.layout=org.apache.log4j.PatternLayout
log4j.appender.console1.layout.ConversionPattern=%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c] [%x] %m%n


log4j.appender.kafka=kafka.KafkaAppender
log4j.appender.kafka.host=localhost
log4j.appender.kafka.port=9092
log4j.appender.kafka.topic=chopchop
log4j.appender.kafka.encoder=com.linkedin.chopchop.log.ChopchopLogMessageEncoder

log4j.logger.com.linkedin.chiper=info,kafka
log4j.logger.com.linkedin.chopchop=info,kafka
Binary file modified lib/sensei-0.0.1.jar
Binary file not shown.
10 changes: 9 additions & 1 deletion src/main/scala/com/linkedin/chirper/DefaultConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,20 @@ object DefaultConfigs{
val UTF8Charset = Charset.forName("UTF-8")

val kafkahost = Config.readString("kafka.host")
val port = Config.readInt("kafka.port")
val kafkaport = Config.readInt("kafka.port")

val batch = Config.readInt("search.node.index.batch")

// zookeeper settings
val zkurl = Config.readString("zookeeper.url")
val timeout = 30000

// zoie configuration, use default
val zoieConfig = new ZoieConfig[DefaultZoieVersion](new DefaultZoieVersionFactory());

// voldemort configuration
val voldemortUrl = Config.readString("voldemort.url")

// query builder
// define query parser builder
val queryParser = new QueryParser(Version.LUCENE_29,"contents",new StandardAnalyzer(Version.LUCENE_29))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ class ChirperServlet extends ScalatraServlet with ScalateSupport {

val log = Logger.get

val clusterName = Config.readString("zookeeper.cluster")
val clusterName = Config.readString("tweet.zookeeper.cluster")
val zkurl = Config.readString("zookeeper.url")
val timeout = 30000

val voldemortUrl = Config.readString("voldemort.url")
val voldemortStore = Config.readString("voldemort.store")
val voldemortStore = Config.readString("tweet.voldemort.store")

val factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(voldemortUrl));
val tweetStore: StoreClient[String, String] = factory.getStoreClient[String, String](voldemortStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import com.sensei.dataprovider.kafka.KafkaStreamIndexLoaderFactory.DefaultJsonFa

object ChirpSearchConfig{
// kafka config
val kafkatopic = Config.readString("kafka.topic")
val kafkatopic = Config.readString("tweet.kafka.topic")

val tweetIndexLoaderFactory = new DefaultJsonFactory(DefaultConfigs.kafkahost,DefaultConfigs.port,kafkatopic,DefaultConfigs.batch,30000)
val tweetIndexLoaderFactory = new DefaultJsonFactory(DefaultConfigs.kafkahost,DefaultConfigs.kafkaport,kafkatopic,DefaultConfigs.batch,30000)

// how do we convert an indexing event, in this case a json obj, into a lucene document
val interpreter = new ChirpJSONInterpreter()
Expand Down
27 changes: 10 additions & 17 deletions src/main/scala/com/linkedin/chirper/search/ChirpSearchNode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,15 @@ import proj.zoie.api.DefaultZoieVersion
import proj.zoie.api.DefaultZoieVersion.DefaultZoieVersionFactory
import proj.zoie.hourglass.impl.HourGlassScheduler
import proj.zoie.hourglass.impl.HourGlassScheduler.FREQUENCY

import proj.zoie.impl.indexing.ZoieConfig

import java.util._
import java.text.SimpleDateFormat

import com.linkedin.norbert.javacompat.cluster.ClusterClient
import com.linkedin.norbert.javacompat.cluster.ZooKeeperClusterClient
import com.linkedin.norbert.javacompat.network.NettyNetworkServer
import com.linkedin.norbert.javacompat.network.NetworkServer
import com.sensei.search.nodes.impl.SenseiBuilderHelper
import com.sensei.search.nodes.SenseiHourglassFactory
import com.sensei.search.nodes.SenseiIndexLoaderFactory
import com.sensei.search.nodes.SenseiIndexReaderDecorator
import com.sensei.search.nodes.SenseiServer
import com.linkedin.norbert.javacompat.cluster.{ClusterClient,ZooKeeperClusterClient}
import com.linkedin.norbert.javacompat.network.{NetworkServer,NettyNetworkServer}
import com.sensei.search.nodes.{SenseiHourglassFactory,SenseiIndexLoaderFactory,SenseiIndexReaderDecorator,SenseiServer}
import com.sensei.search.nodes.impl._


Expand All @@ -33,24 +28,22 @@ import java.io.File
object ChirpSearchNode{
def main(args: Array[String]) = {

val nodeid = Config.readInt("search.node.id")
val port = Config.readInt("search.node.port")
val partList = Config.readString("search.node.partitions")
val nodeid = Config.readInt("tweet.search.node.id")
val port = Config.readInt("tweet.search.node.port")
val partList = Config.readString("tweet.search.node.partitions")

// where to put the index
val idxDir = new File(Config.readString("search.node.index.dir"))
val idxDir = new File(Config.readString("tweet.search.node.index.dir"))

// rolls daily at midnight, keep 7 days
val hfFactory = new SenseiHourglassFactory[JSONObject, DefaultZoieVersion](idxDir,ChirpSearchConfig.interpreter,
new SenseiIndexReaderDecorator(ChirpSearchConfig.handlerList,null),
DefaultConfigs.zoieConfig, "00 00 00", 7, FREQUENCY.DAILY)

val clusterName = Config.readString("zookeeper.cluster")
val zkurl = Config.readString("zookeeper.url")
val timeout = 30000
val clusterName = Config.readString("tweet.zookeeper.cluster")

// zookeeper cluster client
val clusterClient = new ZooKeeperClusterClient(clusterName,zkurl,timeout);
val clusterClient = new ZooKeeperClusterClient(clusterName,DefaultConfigs.zkurl,DefaultConfigs.timeout);

// build a default netty-based network server
val networkServer = SenseiBuilderHelper.buildDefaultNetworkServer(clusterClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,20 @@ import kafka.message._
import kafka.producer._

import voldemort.scalmert.client.StoreClient
import voldemort.client.ClientConfig
import voldemort.client.SocketStoreClientFactory
import voldemort.client.{SocketStoreClientFactory,ClientConfig}
import voldemort.scalmert.Implicits._
import voldemort.scalmert.versioning._
import com.linkedin.chirper.DefaultConfigs

// processes each tweet from the streamer
class ChirperStreamProcessor extends StreamProcessor{
val kafkaHost = Config.readString("kafka.host")
val kafkaPort = Config.readInt("kafka.port")
val kafkaTopic = Config.readString("kafka.topic")
val voldemortUrl = Config.readString("voldemort.url")
val voldemortStore = Config.readString("voldemort.store")

val kafkaTopic = Config.readString("tweet.kafka.topic")
val voldemortStore = Config.readString("tweet.voldemort.store")

val factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(voldemortUrl));
val factory = new SocketStoreClientFactory(new ClientConfig().setBootstrapUrls(DefaultConfigs.voldemortUrl));
val tweetStore: StoreClient[String, String] = factory.getStoreClient[String, String](voldemortStore)
val kafkaProducer = new SimpleProducer(kafkaHost,kafkaPort, 64 * 1024, 100000, 10000)
val kafkaProducer = new SimpleProducer(DefaultConfigs.kafkahost,DefaultConfigs.kafkaport, 64 * 1024, 100000, 10000)

def shutdown() = {
kafkaProducer.close()
Expand All @@ -48,10 +45,12 @@ class ChirperStreamProcessor extends StreamProcessor{
println(line)
val jsonObj = new JSONObject(line)
val id = jsonObj.getString("id_str")
// send to voldemort store
tweetStore(id) = line

// send to kafka
kafkaProducer.send(kafkaTopic,new ByteBufferMessageSet(new Message(line.getBytes(DefaultConfigs.UTF8Charset))))
// send to voldemort store
tweetStore(id) = line

}
catch{
case je: JSONException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,19 @@ import com.linkedin.chirper.DefaultConfigs
import com.sensei.dataprovider.kafka.KafkaStreamIndexLoaderFactory.DefaultJsonFactory

class ChopchopLogMessageEncoder extends Encoder[LoggingEvent]{

override def toMessage(event: LoggingEvent):Message = {
val jsonString = toString(event)
new Message(jsonString.getBytes(DefaultConfigs.UTF8Charset))
}


def toString(event: LoggingEvent):String = {
val jsonObj = toJson(event)
jsonObj.toString()
}

def toJson(event: LoggingEvent):JSONObject = {
val jsonObj = new JSONObject()

jsonObj.put("class",event.getFQNOfLoggerClass())
Expand All @@ -21,7 +33,7 @@ class ChopchopLogMessageEncoder extends Encoder[LoggingEvent]{

//TODO: add properties
//TODO: add throwable info
val jsonString = jsonObj.toString()
new Message(jsonString.getBytes(DefaultConfigs.UTF8Charset))

jsonObj
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ class LogIndexEvent(uid:Long,timeStamp:Long){

@Text(name="contents")
var logMessage = ""

@Store(name="json")
var json = ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class LogMessageInterpreter extends JSONValueInterpreter[LogIndexEvent](classOf[
event.logLevel = jsonObj.optString("level")
event.logName = jsonObj.optString("logger")
event.logMessage = jsonObj.optString("message")
event.json = jsonObj.toString()
event
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.linkedin.chirper.DefaultConfigs

object LogSearchConfig{
val kafkaLogtopic = Config.readString("kafka.log.topic")
val logIndexLoaderFactory = new DefaultJsonFactory(DefaultConfigs.kafkahost,DefaultConfigs.port,kafkaLogtopic,DefaultConfigs.batch,30000)
val logIndexLoaderFactory = new DefaultJsonFactory(DefaultConfigs.kafkahost,DefaultConfigs.kafkaport,kafkaLogtopic,DefaultConfigs.batch,30000)

val logIndexInterpreter = new LogMessageInterpreter()

Expand Down
63 changes: 63 additions & 0 deletions src/main/scala/com/linkedin/chopchop/search/LogSearchNode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.linkedin.chopchop.search

import com.linkedin.led.twitter.config._

import com.linkedin.chirper.DefaultConfigs

import org.json.JSONObject

import proj.zoie.api.DefaultZoieVersion
import proj.zoie.api.DefaultZoieVersion.DefaultZoieVersionFactory
import proj.zoie.hourglass.impl.HourGlassScheduler
import proj.zoie.hourglass.impl.HourGlassScheduler.FREQUENCY

import proj.zoie.impl.indexing.ZoieConfig

import java.util._
import java.io.File
import java.text.SimpleDateFormat

import com.linkedin.norbert.javacompat.cluster.{ClusterClient,ZooKeeperClusterClient}
import com.linkedin.norbert.javacompat.network.{NetworkServer,NettyNetworkServer}
import com.sensei.search.nodes.{SenseiHourglassFactory,SenseiIndexLoaderFactory,SenseiIndexReaderDecorator,SenseiServer}
import com.sensei.search.nodes.impl._


// Build a search node
object LogSearchNode{
def main(args: Array[String]) = {

val nodeid = Config.readInt("log.search.node.id")
val port = Config.readInt("log.search.node.port")
val partList = Config.readString("log.search.node.partitions")

// where to put the index
val idxDir = new File(Config.readString("log.search.node.index.dir"))

// rolls daily at midnight, keep 7 days
val hfFactory = new SenseiHourglassFactory[JSONObject, DefaultZoieVersion](idxDir,LogSearchConfig.logIndexInterpreter,
new SenseiIndexReaderDecorator(LogSearchConfig.logHandlerList,null),
DefaultConfigs.zoieConfig, "00 00 00", 7, FREQUENCY.DAILY)

val clusterName = Config.readString("log.zookeeper.cluster")

// zookeeper cluster client
val clusterClient = new ZooKeeperClusterClient(clusterName,DefaultConfigs.zkurl,DefaultConfigs.timeout);


// build a default netty-based network server
val networkServer = SenseiBuilderHelper.buildDefaultNetworkServer(clusterClient);

// builds the server
val server = new SenseiServer(nodeid, port, partList.split(",").map{i=>i.toInt},
idxDir,networkServer,
clusterClient,hfFactory,LogSearchConfig.logIndexLoaderFactory,DefaultConfigs.queryBuilderFactory)

DefaultConfigs.addShutdownHook{ server.shutdown }

// starts the server
server.start(true)

}
}

0 comments on commit 48a3dfa

Please sign in to comment.