Skip to content

Commit

Permalink
add filters to logstash
Browse files Browse the repository at this point in the history
  • Loading branch information
GreatYYX committed Aug 29, 2017
1 parent 90892c2 commit 229b606
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Expand Up @@ -31,7 +31,7 @@ services:
KAFKA_ADVERTISED_HOST_NAME: kafka # ${DOCKER_HOST_NAME}
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test:1:1"
# KAFKA_CREATE_TOPICS: "test:1:1" # development
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
Expand Down
15 changes: 10 additions & 5 deletions manager.py
Expand Up @@ -51,7 +51,8 @@ def create_project():
ensure_topic_exists(output_topic, output_zookeeper_server, output_partition)

# update logstash pipeline
update_logstash_pipeline(args['project_name'])
output_server = project_config.get('output_server', config['output_server'])
update_logstash_pipeline(args['project_name'], output_server, output_topic)

return jsonify({}), 201

Expand Down Expand Up @@ -190,18 +191,22 @@ def kill_etk_process(project_name, ignore_error=False):
print 'kill_etk_process finish'


def update_logstash_pipeline(project_name):
def update_logstash_pipeline(project_name, output_server, output_topic):
content = \
'''input {
kafka {
bootstrap_servers => ["''' + '","'.join(config['output_server']) + '''"]
topics => ["''' + project_name + '''_out"]
bootstrap_servers => ["''' + '","'.join(output_server) + '''"]
topics => ["''' + output_topic + '''"]
consumer_threads => "4"
codec => json {}
type => "''' + project_name + '''"
}
}
filter {
if [type] == "''' + project_name + '''" {
mutate { remove_field => ["_id"] }
}
}
output {
if [type] == "''' + project_name + '''" {
elasticsearch {
Expand Down
2 changes: 1 addition & 1 deletion reset_offset.py
Expand Up @@ -35,7 +35,7 @@
consumer.assign(assigned_parts)
for p in assigned_parts:
consumer.seek(p, 0)
# sometimes it is blocked, need to restart them
# sometimes it is blocked, need to restart
consumer.commit({p:OffsetAndMetadata(0, meta)})

print 'done'

0 comments on commit 229b606

Please sign in to comment.