Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
muga committed May 2, 2012
0 parents commit f54a19a
Show file tree
Hide file tree
Showing 21 changed files with 1,451 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*~
*.gem
.bundle
Gemfile.lock
pkg/*
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Muga Nishizawa <muga.nishizawa _at_ gmail.com>
1 change: 1 addition & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Release 0.1.0 - 2012/05/02
105 changes: 105 additions & 0 deletions README.rdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
= Flume input/output plugin for Fluent

== Overview

This is a plugin for fluentd[https://github.com/fluentd] event collector. This plugin adds the Flume[https://github.com/apache/flume] compatible interface to fluentd.

== What's Flume?

Flume[https://github.com/apache/flume] is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.

It uses Thrift[http://thrift.apache.org/], a cross-language RPC framework, to communicate between clients and servers.

== What's Flume plugin for fluent?

The Flume plugin for fluentd, which enables fluentd to talk the Flume protocol. Flume protocol is defined as follows, in Thrift-IDL format:

typedef i64 Timestamp

enum Priority {
FATAL = 0,
ERROR = 1,
WARN = 2,
INFO = 3,
DEBUG = 4,
TRACE = 5
}

enum EventStatus {
ACK = 0,
COMMITED = 1,
ERR = 2
}

struct ThriftFlumeEvent {
1: Timestamp timestamp,
2: Priority priority,
3: binary body,
4: i64 nanos,
5: string host,
6: map<string,binary> fields
}

# Instead of using thrift's serialization, we just assume the contents are serialized already.
struct RawEvent {
1: binary raw
}

service ThriftFlumeEventServer {
oneway void append( 1:ThriftFlumeEvent evt ),
oneway void rawAppend( 1:RawEvent evt),
EventStatus ackedAppend( 1: ThriftFlumeEvent evt ),

void close(),
}

A value that is stored in the ThriftFlumeEvent.fields map is used as fluentd 'tag'. A key of the value enables be specified by users as configuration parameter.

== How to use?

fluent-plugin-flume contains both input and output.

== Flume Input

Please add the following configurations to fluent.conf.

# Flume input
<source>
type flume
port 56789
</source>

These options are supported.

* port: port number (default: 56789)
* bind: bind address (default: 0.0.0.0)
* server_type: server architecture either in 'simple', 'threaded', 'thread_pool', (default: simple)
* is_framed: use framed protocol or not (default: false)
* tag_field: key name of fluentd 'tag' that is stored in ThriftFlumeEvent.fields (default: nil)
* default_tag: default fluentd 'tag' (default: 'category')
* add_prefix: prefix string, added to the tag (default: nil)

== Flume Output

Please add the following configurations to fluent.conf. This allows fluentd to output its logs into another Flume server. Note that fluentd conveys semi-structured data while Flume conveys unstructured data. Thus the plugin translates semi-structured data into JSON data and conveys it to Flume.

# Flume output
<match *>
type flume
host flume-host.local
port 56789
</match>

These options are supported.

* host: host name or address (default: localhost)
* port: port number (default: 35863)
* timeout: thrift protocol timeout (default: 30)
* remove_prefix: prefix string, removed from the tag (default: nil)

== Contributors

== Copyright

Copyright:: Copyright (c) 2012 Treasure Data, Inc.
License:: Apache License, Version 2.0
57 changes: 57 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
require 'rake'
require 'rake/testtask'
require 'rake/clean'

begin
require 'jeweler'
Jeweler::Tasks.new do |gemspec|
gemspec.name = "fluent-plugin-flume"
gemspec.summary = "Flume Input/Output plugin for Fluentd event collector"
gemspec.author = "Muga Nishizawa"
gemspec.email = "muga.nishizawa@gmail.com"
gemspec.homepage = "https://github.com/muga/fluent-plugin-flume"
gemspec.has_rdoc = false
gemspec.require_paths = ["lib"]
gemspec.add_dependency "fluentd", "~> 0.10.16"
gemspec.add_dependency "thrift", "~> 0.6.0"
gemspec.test_files = Dir["test/**/*.rb"]
gemspec.files = Dir["bin/**/*", "lib/**/*", "test/**/*.rb"] +
%w[example.conf VERSION AUTHORS Rakefile fluent-plugin-flume.gemspec]
gemspec.executables = ['fluent-flume-remote']
end
Jeweler::GemcutterTasks.new
rescue LoadError
puts "Jeweler not available. Install it with: gem install jeweler"
end

task "thrift_gen" do
system "mkdir -p tmp"
system "thrift --gen rb -o tmp lib/fluent/plugin/thrift/flume.thrift"
system "mv tmp/gen-rb/* lib/fluent/plugin/thrift/"
system "rm -fR tmp"
end

Rake::TestTask.new(:test) do |t|
t.test_files = Dir['test/plugin/*.rb']
t.ruby_opts = ['-rubygems'] if defined? Gem
t.ruby_opts << '-I.'
end

#VERSION_FILE = "lib/fluent/version.rb"
#
#file VERSION_FILE => ["VERSION"] do |t|
# version = File.read("VERSION").strip
# File.open(VERSION_FILE, "w") {|f|
# f.write <<EOF
#module Fluent
#
#VERSION = '#{version}'
#
#end
#EOF
# }
#end
#
#task :default => [VERSION_FILE, :build]

task :default => [:build]
1 change: 1 addition & 0 deletions VERSION
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0.1.0
28 changes: 28 additions & 0 deletions bin/fluent-flume-remote
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env ruby
require 'thrift'
require 'json'
$:.unshift File.join(File.dirname(__FILE__), '../lib/fluent/plugin/thrift')
require 'flume_types'
require 'flume_constants'
require 'thrift_flume_event_server'

host = 'localhost'
port = 3586

socket = Thrift::Socket.new host, port.to_i
transport = Thrift::BufferedTransport.new socket
protocol = Thrift::BinaryProtocol.new transport
client = ThriftFlumeEventServer::Client.new protocol
transport.open

# 2011/09/02 Kazuki Ohta <kazuki.ohta@gmail.com>
# explicitly specify TCP_NODELAY for low-latency communication.
raw_sock = socket.to_io
raw_sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

entry = ThriftFlumeEvent.new(:body=>{'a'=>'b'}.to_json,
:priority=>Priority::INFO,
:timestamp=>(Time.now.to_i * 1000))
client.append entry

transport.close
10 changes: 10 additions & 0 deletions example-in-flume.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Flume input
<source>
type flume
port 56789
</source>

# match tag=debug.** and dump to console
<match **>
type stdout
</match>
11 changes: 11 additions & 0 deletions example-out-flume.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# built-in TCP input
# $ echo <json> | fluent-cat <tag>
<source>
type tcp
</source>

<match **>
type flume
host localhost
port 56789
</match>
56 changes: 56 additions & 0 deletions fluent-plugin-flume.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Generated by jeweler
# DO NOT EDIT THIS FILE DIRECTLY
# Instead, edit Jeweler::Tasks in Rakefile, and run 'rake gemspec'
# -*- encoding: utf-8 -*-

Gem::Specification.new do |s|
s.name = "fluent-plugin-flume"
s.version = "0.0.0"

s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Muga Nishizawa"]
s.date = "2012-04-07"
s.email = "muga.nishizawa@gmail.com"
s.executables = ["fluent-flume-remote"]
s.extra_rdoc_files = [
"ChangeLog",
"README.rdoc"
]
s.files = [
"AUTHORS",
"Rakefile",
"VERSION",
"bin/fluent-flume-remote",
"example.conf",
"fluent-plugin-flume.gemspec",
"lib/fluent/plugin/in_flume.rb",
"lib/fluent/plugin/out_flume.rb",
"lib/fluent/plugin/thrift/flume.thrift",
"lib/fluent/plugin/thrift/flume.thrift.orig",
"lib/fluent/plugin/thrift/flume_constants.rb",
"lib/fluent/plugin/thrift/flume_types.rb",
"lib/fluent/plugin/thrift/thrift_flume_event_server.rb",
"test/plugin/out_flume.rb"
]
s.homepage = "https://github.com/muga/fluent-plugin-flume"
s.require_paths = ["lib"]
s.rubygems_version = "1.8.15"
s.summary = "Flume Input/Output plugin for Fluentd event collector"
s.test_files = ["test/plugin/out_flume.rb"]

if s.respond_to? :specification_version then
s.specification_version = 3

if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
s.add_runtime_dependency(%q<fluentd>, ["~> 0.10.16"])
s.add_runtime_dependency(%q<thrift>, ["~> 0.6.0"])
else
s.add_dependency(%q<fluentd>, ["~> 0.10.16"])
s.add_dependency(%q<thrift>, ["~> 0.6.0"])
end
else
s.add_dependency(%q<fluentd>, ["~> 0.10.16"])
s.add_dependency(%q<thrift>, ["~> 0.6.0"])
end
end

Loading

0 comments on commit f54a19a

Please sign in to comment.