Permalink
Browse files

refactored actors, improved branch name in config

  • Loading branch information...
1 parent b7cc34b commit 8ea211bf8afa838ce73765348f98af554b4cc907 @oleganza committed Nov 3, 2009
Showing with 106 additions and 44 deletions.
  1. +101 −39 ampoule
  2. +5 −5 experiments/actor.rb
View
140 ampoule
@@ -1094,57 +1094,119 @@ module Ampoule
end
#
- # Helpers
+ # Other
#
- class FileSyncBuffer
+ class Actor
+ attr_accessor :queue
+
+ def self.new_instance(*args)
+ actor = self.allocate
+ actor.send(:initialize, *args)
+ actor
+ end
+
+ def self.new(*args)
+ Receiver.new(new_instance(*args))
+ end
+
def initialize
- @buffer = {} # path => content
@queue = Queue.new
- @mutex = Mutex.new
-
- if !ENV["AMPOULE_NOPULL"]
- @puller = Thread.new do
- loop do
- sleep(1 + rand(4))
- @queue.push(:pull) if @queue.size < 1
- end
+ @thread = Thread.new do
+ loop do
+ msg, args, blk = @queue.pop
+ send(msg, *(args||[]), &blk)
end
end
-
- @pusher = Thread.new do
- while msg = @queue.pop
- if msg == :stop
- puts "Ampoule::FileSyncBuffer: stopped."
- break
- end
+ @thread.abort_on_exception = true
+ end
+
+ def stop; @thread.join; end
+ def kill; @thread.kill; end
+
+ class Receiver
+ def initialize(actor)
+ @actor = actor
+ end
+ def send(name, *args, &blk)
+ @actor.queue.push([name, args, blk])
+ self
+ end
+ def method_missing(name, *args, &blk)
+ send(name, *args, &blk)
+ end
+ def stop
+ @actor.queue.clear
+ send(:kill)
+ @actor.stop
+ end
+ end
- `cd .git/ampoule; git pull`
-
- if msg == :push
-
- paths = []
- @mutex.synchronize do
- @buffer.each do |path, data|
- paths << path
- File.open(path, 'w'){|f|f.write(data)}
- end
- @buffer = {}
- end
-
- if !ENV["AMPOULE_NOPUSH"]
- `cd .git/ampoule; git add .; git commit -m "synced #{paths.join(', ')}"; git push`
- end
+ end
+
+ class FileSyncBuffer
+
+ class Pusher < Actor
+ def push
+ # do not push if head did not change
+ new_head = (File.read(".git/ampoule/.git/refs/heads/ampoule-tasks") rescue nil)
+ if @head != new_head
+ @head = new_head
+ system %{cd .git/ampoule; git pull; git push} if !ENV["AMPOULE_NOPUSH"]
+ end
+ end
+ end
+
+ class Commiter < Actor
+ def initialize(pusher, buffer, mutex)
+ super()
+ @pusher = pusher
+ @buffer = buffer
+ @mutex = mutex
+ end
+ def commit
+ system "cd .git/ampoule; git pull"
+ paths = []
+ @mutex.synchronize do
+ @buffer.each do |path, data|
+ paths << path
+ File.open(path, 'w'){|f|f.write(data)}
end
+ @buffer.update({})
end
+ system %{cd .git/ampoule; git add .; git commit -m "synced #{paths.join(', ')}";}
+ @pusher.push
+ end
+ end
+
+ class Puller < Actor
+ def initialize(interval = 1)
+ super()
+ @interval = interval
+ self.queue.push([:pull])
+ end
+ def pull
+ puts `cd .git/ampoule; git pull` if !ENV["AMPOULE_NOPULL"]
+ sleep(rand(@interval*2))
+ self.queue.push([:pull])
end
end
+ def initialize
+ @buffer = {} # path => content
+ @mutex = Mutex.new
+ @pusher = Pusher.new
+ @commiter = Commiter.new(@pusher, @buffer, @mutex)
+ @puller = Puller.new
+ @puller.pull
+ end
+
def stop
puts "Ampoule::FileSyncBuffer: stopping..."
- @queue.push :stop
- @puller.kill if @puller
- @pusher.join
+ @puller.stop
+ @commiter.stop
+ @pusher.stop
+ puts "Ampoule::FileSyncBuffer: stopped."
end
def read(path)
@@ -1157,7 +1219,7 @@ module Ampoule
@mutex.synchronize do
@buffer[path] = data
end
- @queue.push(:push)
+ @commiter.commit
end
def glob(wildcard)
@@ -1197,7 +1259,7 @@ module Ampoule
configure = Proc.new do |url|
`cd .git/ampoule; git config remote.origin.url #{url}`
`cd .git/ampoule; git config remote.origin.fetch +refs/heads/ampoule-tasks:refs/remotes/origin/ampoule-tasks`
- `cd .git/ampoule; git config remote.origin.push ampoule-tasks:ampoule-tasks`
+ `cd .git/ampoule; git config remote.origin.push ampoule-tasks:refs/heads/ampoule-tasks`
`cd .git/ampoule; git config branch.ampoule-tasks.remote origin`
`cd .git/ampoule; git config branch.ampoule-tasks.merge refs/heads/ampoule-tasks`
View
@@ -23,7 +23,7 @@ def initialize
end
end
- def join
+ def stop
@thread.join
end
@@ -56,9 +56,9 @@ def send(name, *args, &blk)
def method_missing(name, *args, &blk)
send(name, *args, &blk)
end
- def join
+ def stop
send(:kill)
- @actor.join
+ @actor.stop
end
end
@@ -85,7 +85,7 @@ def on_timer_tick(t)
pinger.ping!
pinger.ping!
- sleep 3
+ sleep 2
- pinger.join
+ pinger.stop
end

0 comments on commit 8ea211b

Please sign in to comment.