-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #4 from Pchelolo/travis
Set up infrastructure for testing with kafka locally and in travis
- Loading branch information
Showing
10 changed files
with
298 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,3 +3,6 @@ coverage | |
node_modules | ||
npm-debug.log | ||
*~ | ||
|
||
.idea/* | ||
config.yaml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
num_workers: 0 | ||
logging: | ||
name: changeprop | ||
level: info | ||
services: | ||
- name: changeprop | ||
module: hyperswitch | ||
conf: | ||
port: 7273 | ||
spec: | ||
title: The Change Propagation root | ||
paths: | ||
/{domain:a}/sys/queue: | ||
x-modules: | ||
- path: sys/kafka.js | ||
options: | ||
uri: 127.0.0.1:2181 | ||
templates: | ||
simple_test_rule: | ||
topic: test_topic_simple_test_rule | ||
exec: | ||
method: post | ||
uri: 'http://mock.com' | ||
headers: | ||
test_header_name: test_header_value | ||
content-type: application/json | ||
body: | ||
test_field_name: test_field_value | ||
derived_field: '{{message.message}}' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
"use strict"; | ||
|
||
var kafka = require('wmf-kafka-node'); | ||
var uuid = require('cassandra-uuid'); | ||
var P = require('bluebird'); | ||
|
||
/** | ||
* Utility class providing high-level interfaces to kafka modules. | ||
* | ||
* @param {Object} kafkaConf Kafka connection configuration | ||
* @param {string} kafkaConf.uri Zookeper URI with host and port | ||
* @param {string} kafkaConf.clientId Client identification string | ||
* @constructor | ||
*/ | ||
function KafkaFactory(kafkaConf) { | ||
this.kafkaConf = kafkaConf; | ||
} | ||
|
||
/** | ||
* Creates a new kafka client. | ||
* | ||
* @returns {Client} | ||
*/ | ||
KafkaFactory.prototype.newClient = function() { | ||
return new kafka.Client(this.kafkaConf.uri, | ||
this.kafkaConf.clientId + '-' + uuid.TimeUuid.now() + '-' + uuid.Uuid.random(), | ||
{} | ||
); | ||
}; | ||
|
||
/** | ||
* Creates and initializes an new kafka producer. | ||
* | ||
* @param {Client} client a kafka client to use. | ||
* @param {Object} [options] producer options | ||
* | ||
* @returns {Promise<HighLevelProducer>} | ||
*/ | ||
KafkaFactory.prototype.newProducer = function(client, options) { | ||
return new P(function(resolve, reject) { | ||
var producer = new kafka.HighLevelProducer(client, options || {}); | ||
producer.once('ready', function() { | ||
resolve(P.promisifyAll(producer)); | ||
}); | ||
producer.once('error', reject); | ||
}); | ||
}; | ||
|
||
/** | ||
* Creates a kafka consumer. | ||
* | ||
* @param {Client} client a kafka client to use | ||
* @param {string} topic a topic name to consume | ||
* @param {string} groupId consumer group ID | ||
* @param {Number} [offset] an offset which to start consumption from. | ||
* | ||
* @returns {Promise} a promise that's resolved when a consumer is ready | ||
*/ | ||
KafkaFactory.prototype.newConsumer = function(client, topic, groupId, offset) { | ||
var topicConf = { topic: topic }; | ||
if (offset !== undefined) { | ||
topicConf.offset = offset; | ||
} | ||
return new P(function(resolve, reject) { | ||
var consumer = new kafka.HighLevelConsumer(client, [ topicConf ], { groupId: groupId }); | ||
consumer.once('error', reject); | ||
consumer.once('rebalanced', function() { | ||
resolve(consumer); | ||
}); | ||
}); | ||
}; | ||
|
||
module.exports = KafkaFactory; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
"use strict"; | ||
|
||
var ChangeProp = require('../utils/changeProp'); | ||
var KafkaFactory = require('../../lib/kafka_factory'); | ||
var nock = require('nock'); | ||
|
||
describe('Basic rule management', function() { | ||
var changeProp = new ChangeProp('config.test.yaml'); | ||
var kafkaFactory = new KafkaFactory({ | ||
uri: 'localhost:2181/', // TODO: find out from the config | ||
clientId: 'change-prop-test-suite' | ||
}); | ||
var producer; | ||
|
||
before(function() { | ||
return kafkaFactory.newProducer(kafkaFactory.newClient()) | ||
.then(function(newProducer) { | ||
producer = newProducer; | ||
return producer.createTopicsAsync([ 'test_topic_simple_test_rule' ], false) | ||
}) | ||
.then(function() { | ||
return changeProp.start(); | ||
}); | ||
}); | ||
|
||
|
||
it('Should call simple executor', function() { | ||
var service = nock('http://mock.com', { | ||
reqheaders: { | ||
test_header_name: 'test_header_value', | ||
'content-type': 'application/json' | ||
} | ||
}) | ||
.post('/', { | ||
'test_field_name': 'test_field_value', | ||
'derived_field': 'test' | ||
}).reply({}); | ||
|
||
return producer.sendAsync([{ | ||
topic: 'test_topic_simple_test_rule', | ||
messages: [ JSON.stringify({ message: 'test' }) ] | ||
}]) | ||
.delay(100) | ||
.then(function() { service.done(); }) | ||
.finally(function() { nock.cleanAll(); }); | ||
}); | ||
|
||
after(function() { return changeProp.stop(); }); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
'use strict'; | ||
|
||
var ServiceRunner = require('service-runner'); | ||
var fs = require('fs'); | ||
var yaml = require('js-yaml'); | ||
var P = require('bluebird'); | ||
|
||
var ChangeProp = function(configPath) { | ||
this._configPath = configPath; | ||
this._config = this._loadConfig(); | ||
this._config.num_workers = 0; | ||
this._config.logging = { | ||
name: 'change-prop', | ||
level: 'fatal', | ||
streams: [{ type: 'stdout'}] | ||
}; | ||
this._runner = new ServiceRunner(); | ||
}; | ||
|
||
ChangeProp.prototype._loadConfig = function() { | ||
return yaml.safeLoad(fs.readFileSync(this._configPath).toString()); | ||
}; | ||
|
||
ChangeProp.prototype.start = function() { | ||
var self = this; | ||
self.port = self._config.services[0].conf.port; | ||
self.hostPort = 'http://localhost:' + self.port; | ||
return self._runner.run(self._config) | ||
.then(function(servers) { | ||
self._servers = servers; | ||
return true; | ||
}); | ||
}; | ||
|
||
ChangeProp.prototype.stop = function() { | ||
var self = this; | ||
if (self._servers) { | ||
return P.each(self._servers, function(server) { | ||
return server.close(); | ||
}) | ||
.then(function() { | ||
self._servers = undefined; | ||
}); | ||
} else { | ||
return P.resolve(); | ||
} | ||
}; | ||
|
||
module.exports = ChangeProp; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
#!/bin/bash | ||
|
||
dropTopics ( ) { | ||
if [ "$#" -eq 1 ] | ||
then | ||
PATTERN=$1 | ||
echo "looking for topics named '*${PATTERN}*'..." | ||
TOPICS=`${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --list \ | ||
| grep ${PATTERN} \ | ||
| grep -v 'marked for deletion$'` | ||
for TOPIC in ${TOPICS} | ||
do | ||
echo "dropping topic ${TOPIC}" | ||
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic ${TOPIC} > /dev/null | ||
done | ||
fi | ||
} | ||
|
||
dropTopics "test_topic" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
#!/bin/bash | ||
|
||
if [ "x$KAFKA_HOME" = "x" ]; then | ||
echo "Please set KAFKA_HOME env variable to the kafka install directory" | ||
exit 1 | ||
fi | ||
|
||
if [ "$1" = "start" ]; then | ||
if [ `nc localhost 2181 < /dev/null; echo $?` != 0 ]; then | ||
sh $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties > /dev/null & | ||
while [ `nc localhost 2181 < /dev/null; echo $?` != 0 ]; do | ||
echo "waiting for Zookeeper..." | ||
sleep 1 ; | ||
done | ||
else | ||
echo "Zookeper already running" | ||
fi | ||
|
||
if [ `nc localhost 9092 < /dev/null; echo $?` != 0 ]; then | ||
sh $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > /dev/null & | ||
while [ `nc localhost 9092 < /dev/null; echo $?` != 0 ]; do | ||
echo "waiting for Kafka..." ; | ||
sleep 1 ; | ||
done | ||
else | ||
echo "Kafka already running"; | ||
fi | ||
elif [ "$1" = "stop" ]; then | ||
sh $KAFKA_HOME/bin/kafka-server-stop.sh & | ||
sh $KAFKA_HOME/bin/zookeeper-server-stop.sh & | ||
fi |