Skip to content

Commit

Permalink
Merge pull request #2 from sonots/sync_worker
Browse files Browse the repository at this point in the history
sync worker
  • Loading branch information
sonots committed Jun 6, 2014
2 parents 33375a8 + 4e4d24a commit 59ee584
Show file tree
Hide file tree
Showing 17 changed files with 295 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ HOST=0.0.0.0
# LOG_LEVEL=debug
# LOG_SHIFT_AGE=0
# LOG_SHIFT_SIZE=1048576
# LOCAL_STORAGE=false
# DATA_DIR=data
# SYNC_INTERVAL=60
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ rvm:
gemfile:
- Gemfile
env:
- DATA_DIR=
- DATA_DIR=spec/tmp
- LOCAL_STORAGE=false
- LOCAL_STORAGE=true DATA_DIR=spec/tmp
1 change: 1 addition & 0 deletions Procfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
web: bundle exec unicorn -E production -p $PORT -o $HOST -c config/unicorn.conf
job: bundle exec bin/fluentd-server job
sync: bundle exec bin/fluentd-server sync
serf: $(bundle exec gem path serf-td-agent)/bin/serf agent
19 changes: 14 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,26 +100,35 @@ HOST=0.0.0.0
# LOG_LEVEL=debug
# LOG_SHIFT_AGE=0
# LOG_SHIFT_SIZE=1048576
# LOCAL_STORAGE=false
# DATA_DIR=data
# SYNC_INTERVAL=60
```

### DATA_DIR (experimental)
### LOCAL FILE STORAGE

Configure `DATA_DIR` in `.env` file as:
Fluentd Server also supports to edit Fluentd configuration files as local files for the case using `git` (or any VCSs) to manage Fluentd configuration revisions.

To use this feature, enable `LOCAL_STORAGE` in `.env` file:

```
LOCAL_STORAGE=true
DATA_DIR=data
SYNC_INTERVAL=60
```

to store and load Fluentd config contents not from DB, but from local files located at the directory `DATA_DIR`.
This would be useful when you want to manage your config files with git.
The `DATA_DIR` is the location to place your configuration files locally, and the `SYNC_INTERVAL` is the interval where a synchronization worker works.

Putting any files whose name ends with `.erb` in `DATA_DIR` automatically synchronizes with DB by the `sync` worker. Removing `.erb` files is also synchronized with DB.

NOTE: Enabling this feature disables to edit the Fluentd configuration from the Web UI.

## HTTP API

See [API.md](API.md).

## ToDo

* Create a sync worker to synchronize local file existences with db (delete and create entries on db).
* Automatic deployment (restart) support like the one of chef-server

## ChangeLog
Expand Down
2 changes: 1 addition & 1 deletion fluentd-server.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_development_dependency "bundler", "~> 1.3"
spec.add_development_dependency "rspec"
spec.add_development_dependency "rspec", "~> 3"
spec.add_development_dependency "capybara"
spec.add_development_dependency "pry"
spec.add_development_dependency "pry-nav"
Expand Down
11 changes: 11 additions & 0 deletions lib/fluentd_server/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ class FluentdServer::CLI < Thor
LOG_LEVEL=warn
LOG_SHIFT_AGE=0
LOG_SHIFT_SIZE=1048576
LOCAL_STORAGE=false
DATA_DIR=#{DATA_DIR}
SYNC_INTERVAL=60
EOS

DEFAULT_PROCFILE =<<-EOS
web: unicorn -E production -p $PORT -o $HOST -c config/unicorn.conf
job: fluentd-server job
sync: fluentd-server sync
serf: $(gem path serf-td-agent)/bin/serf agent
EOS

Expand Down Expand Up @@ -105,6 +109,13 @@ def job_clear
Delayed::Job.delete_all
end

desc "sync", "Sartup fluentd_server sync worker"
def sync
Dotenv.load
require 'fluentd_server/sync_worker'
FluentdServer::SyncWorker.start
end

no_tasks do
def abort(msg)
$stderr.puts msg
Expand Down
16 changes: 12 additions & 4 deletions lib/fluentd_server/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
Dotenv.load

module FluentdServer::Config
def self.data_dir
ENV['DATA_DIR'] == "" ? nil : ENV['DATA_DIR']
end

def self.database_url
ENV.fetch('DATABASE_URL', 'sqlite3:data/fluentd_server.db')
end
Expand Down Expand Up @@ -36,4 +32,16 @@ def self.log_shift_size
def self.task_max_num
ENV.fetch('TASK_MAX_NUM', '20').to_i
end

def self.local_storage
ENV.fetch('LOCAL_STORAGE', 'false') == 'true' ? true : false
end

def self.data_dir
ENV.fetch('DATA_DIR', 'data')
end

def self.sync_interval
ENV.fetch('SYNC_INTERVAL', '60').to_i
end
end
3 changes: 1 addition & 2 deletions lib/fluentd_server/model.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ class Post < ActiveRecord::Base
include FluentdServer::Logger

validates :name, presence: true
validates :body, presence: true

if FluentdServer::Config.data_dir
if FluentdServer::Config.local_storage
include ActsAsFile

def filename
Expand Down
70 changes: 70 additions & 0 deletions lib/fluentd_server/sync_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
require 'fluentd_server/model'
require 'fluentd_server/logger'

class FluentdServer::SyncRunner
include FluentdServer::Logger

def self.run(opts = {})
self.new(opts).run
end

def initialize(opts = {})
end

def run
return nil unless FluentdServer::Config.local_storage
plus, minus = find_diff
create(plus)
delete(minus)
end

def find_locals
return [] unless FluentdServer::Config.local_storage
names = []
Dir.chdir(FluentdServer::Config.data_dir) do
Dir.glob("*.erb") do |filename|
names << filename.chomp('.erb')
end
end
names
end

def create(names)
# ToDo: bulk insert with sqlite, postgresql? use activerecord-import for mysql2
logger.debug "[sync] create #{names}"
names.each do |name|
begin
Post.create(name: name)
rescue ActiveRecord::RecordNotUnique => e
logger.debug "#{e.class} #{e.message} #{name}"
rescue => e
logger.warn "#{e.class} #{e.message} #{name}"
end
end
end

def delete(names)
logger.debug "[sync] remove #{names}"
begin
Post.where(:name => names).delete_all
rescue => e
logger.warn "#{e.class} #{e.message} #{names}"
end
end

# Find difference between given array of paths and paths stored in DB
#
# @param [Integer] batch_size The batch size of a select query
# @return [Array] Plus (array) and minus (array) differences
def find_diff(batch_size: 1000)
names = find_locals
plus = names
minus = []
Post.select('id, name').find_in_batches(batch_size: batch_size) do |batches|
batches = batches.map(&:name)
plus -= batches
minus += (batches - names)
end
[plus, minus]
end
end
81 changes: 81 additions & 0 deletions lib/fluentd_server/sync_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
require "fluentd_server/config"
require "fluentd_server/logger"
require "fluentd_server/sync_runner"

# reference: https://github.com/focuslight/focuslight/blob/master/lib/focuslight/worker.rb
# thanks!
class FluentdServer::SyncWorker
include FluentdServer::Logger

DEFAULT_INTERVAL = 60
attr_reader :interval

def self.start(opts = {})
self.new(opts).start
end

def initialize(opts = {})
@opts = opts
@interval = opts[:interval] || FluentdServer::Config.sync_interval || DEFAULT_INTERVAL
@signals = []
end

def update_next!
now = Time.now
@next_time = now - ( now.to_i % @interval ) + @interval
end

def start
Signal.trap(:INT){ @signals << :INT }
Signal.trap(:HUP){ @signals << :HUP }
Signal.trap(:TERM){ @signals << :TERM }
Signal.trap(:PIPE, "IGNORE")

update_next!
logger.info("[sync] first updater start in #{@next_time}")

childpid = nil
while sleep(0.5) do
if childpid
begin
if Process.waitpid(childpid, Process::WNOHANG)
#TODO: $? (Process::Status object)
logger.debug("[sync] update finished pid: #{childpid}, code: #{$? >> 8}")
logger.debug("[sync] next updater start in #{@next_time}")
childpid = nil
end
rescue Errno::ECHILD
logger.warn("[sync] no child process");
childpid = nil
end
end

unless @signals.empty?
logger.warn("[sync] signals_received: #{@signals.join(',')}")
break
end

next if Time.now < @next_time
update_next!
logger.debug("[sync] (#{@next_time}) updater start")

if childpid
logger.warn("[sync] previous updater exists, skipping this time")
next
end

childpid = fork do
FluentdServer::SyncRunner.run(@opts)
end
end

if childpid
logger.warn("[sync] waiting for updater process finishing")
begin
waitpid childpid
rescue Errno::ECHILD
# ignore
end
end
end
end
6 changes: 3 additions & 3 deletions spec/model_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
after { Task.delete_all }

context '#new?' do
it { expect(Task.new.new?).to be_true }
it { expect(Task.create.new?).to be_false }
it { expect(Task.new.new?).to be_truthy }
it { expect(Task.create.new?).to be_falsey }
end

context '#filename' do
Expand All @@ -21,7 +21,7 @@
end

context '#create_and_delete' do
before { FluentdServer::Config.stub(:task_max_num).and_return(1) }
before { allow(FluentdServer::Config).to receive(:task_max_num).and_return(1) }
before { @oldest = Task.create(name: 'Restart') }
it {
Task.create_and_delete(name: 'Restart')
Expand Down
77 changes: 77 additions & 0 deletions spec/sync_runner_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
require_relative 'spec_helper'
require 'fluentd_server/sync_runner'

if FluentdServer::Config.local_storage
describe 'SyncRunner' do
around {
filenames = File.join(FluentdServer::Config.data_dir, '*.erb')
Dir.glob(filenames).each { |f| File.delete(f) rescue nil }
Post.delete_all
}
let(:runner) { FluentdServer::SyncRunner.new }

context '#find_locals' do
before { Post.create(name: 'post1') }
before { Post.create(name: 'post2') }
let(:subject) { runner.find_locals }
it { should =~ ['post1', 'post2' ] }
end

context '#find_diff' do
before { Post.new(name: 'post1').save_without_file }
before { Post.create(name: 'post2') }
before { File.open(Post.new(name: 'post3').filename, "w") {} }
it {
plus, minus = runner.find_diff
expect(minus).to eql(['post1'])
expect(plus).to eql(['post3'])
}
end

context '#create' do
before { Post.create(name: 'post1') }
before { runner.create(%w[post1 post2]) }
it {
expect(Post.find_by(name: 'post1').body).not_to be_nil
expect(Post.find_by(name: 'post2').body).to be_nil
}
end

context '#delete' do
before {
post1 = Post.create(name: 'post1')
post2 = Post.create(name: 'post2')
runner.delete(%w[post1])
}
it {
expect(Post.find_by(name: 'post1')).to be_nil
expect(Post.find_by(name: 'post2')).not_to be_nil
}
end

context '#run' do
before { Post.new(name: 'post1').save_without_file }
before { Post.create(name: 'post2') }
before { File.open(Post.new(name: 'post3').filename, "w") {} }
it {
runner.run
expect(Post.find_by(name: 'post1')).to be_nil
expect(Post.find_by(name: 'post2')).not_to be_nil
expect(Post.find_by(name: 'post3')).not_to be_nil
}
end
end
else
describe 'SyncRunner' do
context '#run' do
let(:subject) { FluentdServer::SyncRunner.new.run }
it { should be_nil }
end
context '#find_locals' do
before { Post.create(name: 'post1') }
before { Post.create(name: 'post2') }
let(:subject) { FluentdServer::SyncRunner.new.find_locals }
it { should == [] }
end
end
end

0 comments on commit 59ee584

Please sign in to comment.