Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit b61810777bb2398bd52811993485239dbc48d24d 0 parents
@lusis authored
4 .gitignore
@@ -0,0 +1,4 @@
+*.gem
+.bundle
+Gemfile.lock
+pkg/*
1  .rvmrc
@@ -0,0 +1 @@
+rvm 1.9.2@noah_agent --create
4 Gemfile
@@ -0,0 +1,4 @@
+source "http://rubygems.org"
+
+# Specify your gem's dependencies in noah-agent.gemspec
+gemspec
2  Rakefile
@@ -0,0 +1,2 @@
+require 'bundler'
+Bundler::GemHelper.install_tasks
87 experiments/test.rb
@@ -0,0 +1,87 @@
+require 'rubygems'
+require 'logger'
+require 'eventmachine'
+require 'em-hiredis'
+require 'em-http-request'
+require 'multi_json'
+
+LOGGER = Logger.new(STDOUT)
+LOGGER.progname = __FILE__
+@watchlist = {}
+
+module Noah
+ module Agents
+ class Dummy
+
+ end
+ end
+end
+
+def parse_watch(watch)
+ LOGGER.info("Parsing watch: #{watch["id"]}")
+ LOGGER.debug("Watch contents: #{watch}")
+ {watch["id"] => {:pattern => watch["pattern"], :endpoint => watch["endpoint"]}}
+end
+
+def load_initial_watchers(watch_list)
+ LOGGER.debug("Parsing initial watch list")
+ watch_list.each do |watch|
+ @watchlist.merge! parse_watch(watch)
+ LOGGER.debug(@watchlist)
+ end
+end
+
+def reread_watchers(new_watch)
+ LOGGER.info("Watch message found")
+ w = MultiJson.decode(new_watch)
+ case w["action"]
+ when "delete"
+ LOGGER.info("Deleting watch: #{w["id"]}")
+ @watchlist.delete w["id"]
+ else
+ LOGGER.info("Adding new watch: #{w["id"]}")
+ @watchlist.merge! parse_watch(w)
+ end
+ LOGGER.debug("New watch list size: #{@watchlist.keys.size}")
+ LOGGER.debug("Current watchlist: #{@watchlist.keys}")
+end
+
+def broker(msg)
+ LOGGER.info("#{msg}")
+end
+
+EM.run do
+ EM.error_handler do |e|
+ LOGGER.warn(e)
+ end
+
+ trap("INT") { LOGGER.warn("Shutting down. Watches will not be fired");EM.stop }
+ h = EM::HttpRequest.new('http://localhost:5678/watches').get
+ h.callback {
+ case h.response_header.status
+ when 404
+ LOGGER.info("No registered watches found")
+ when 500
+ LOGGER.error("Noah returned an error: #{h.response}")
+ EM.stop
+ else
+ LOGGER.info("Pulled list of current watches from Noah")
+ w = MultiJson.decode(h.response)
+ load_initial_watchers(w)
+ LOGGER.info("Starting up with #{@watchlist.keys.size} watchers")
+ LOGGER.debug("#{@watchlist.keys}")
+ end
+ }
+ h.errback {
+ LOGGER.error("Unable to pull the current list of watchers.")
+ }
+ r = EM::Hiredis.connect "redis://localhost:6379"
+ r.errback {|x| LOGGER.fatal("Unable to connect to redis: #{x}");EM.stop}
+ LOGGER.info("Connected to redis")
+ r.psubscribe("*")
+ r.on(:pmessage) do |pattern, event, message|
+ LOGGER.debug("Message recieved")
+ reread_watchers(message) if event =~ /^\/\/noah\/watchers\/.*/
+ broker "#{event}|#{message}" unless @watchlist.keys.size == 0
+ end
+end
71 experiments/test2.rb
@@ -0,0 +1,71 @@
+require 'rubygems'
+require 'celluloid'
+require "logger"
+require "redis"
+
+LOGGER = Logger.new(STDOUT)
+LOGGER.progname = __FILE__
+Celluloid.logger = LOGGER
+
+@watchlist = {}
+
+class RedisWatcher
+ include Celluloid::Actor
+
+ def initialize
+ LOGGER.info "Starting Redis"
+ @r = Redis.connect
+ end
+
+ def watch
+ @r.psubscribe("*") do |on|
+ on.pmessage do |p, e, m|
+ LOGGER.debug("Message recieved")
+ case e
+ when /^\/\/noah\/tags\/.*$/
+ LOGGER.debug("Sending to Tags")
+ Celluloid::Actor[:tags].notify!(m)
+ when /^\/\/noah\/services\/.*$/
+ LOGGER.debug("Sending to Services")
+ Celluloid::Actor[:services].notify!(m)
+ else
+ LOGGER.debug("Sending to CatchAll")
+ Celluloid::Actor[:catchall].notify!(m)
+ end
+ end
+ end
+ end
+
+end
+
+class ServicesActor
+ include Celluloid::Actor
+
+ def notify(message)
+ LOGGER.info("Services got message: #{message}")
+ end
+end
+
+class TagsActor
+ include Celluloid::Actor
+
+ def notify(message)
+ LOGGER.info("Tags got message: #{message}")
+ end
+end
+
+class CatchallActor
+ include Celluloid::Actor
+
+ def notify(message)
+ LOGGER.info("Catchall got message: #{message}")
+ end
+end
+
+services_supervisor = ServicesActor.supervise_as :services
+tags_supervisor = TagsActor.supervise_as :tags
+catchall_supervisor = CatchallActor.supervise_as :catchall
+redis_supervisor = RedisWatcher.supervise_as :redis_watcher
+Celluloid::Actor[:redis_watcher].watch!
+LOGGER.info("Noah Watcher Agent started up")
+sleep
24 gems.txt
@@ -0,0 +1,24 @@
+addressable (2.2.6)
+awesome_print (0.3.2)
+bundler (1.0.12, 1.0.10)
+celluloid (0.2.1)
+colored (1.2)
+diff-lcs (1.1.2)
+em-hiredis (0.1.0)
+em-http-request (1.0.0.beta.4)
+em-socksify (0.1.0)
+eventmachine (1.0.0.beta.3)
+excon (0.6.5)
+git-up (0.5.0)
+grit (2.4.1)
+hirb (0.4.5, 0.4.0)
+hiredis (0.3.2)
+http_parser.rb (0.5.1)
+interactive_editor (0.0.8, 0.0.7)
+mime-types (1.16)
+multi_json (1.0.3)
+rake (0.8.7)
+rdoc (3.5.3)
+redis (2.2.1)
+spoon (0.0.1)
+wirble (0.1.3)
5 lib/noah-agent.rb
@@ -0,0 +1,5 @@
+module Noah
+ module Agent
+ # Your code goes here...
+ end
+end
0  lib/noah-agent/handler/dummy.rb
No changes.
0  lib/noah-agent/handler/http.rb
No changes.
0  lib/noah-agent/logging.rb
No changes.
24 lib/noah-agent/redis.rb
@@ -0,0 +1,24 @@
+module Noah::Agent
+ class Redis
+ include Celluloid::Actor
+
+ def initialize(host,port)
+ LOGGER.debug("Initializing Redis Actor")
+ @r = Redis.connect :host => host, :port => port
+ end
+
+ def watch
+ @r.psubscribe("*") do |on|
+ on.pmessage do |pattern, event, message|
+ LOGGER.info("Message recieved")
+ LOGGER.debug("Message contents: #{message}")
+ if event =~ /^\/\/noah\/watchers\/.*/
+ Celluloid::Actor[:watchlist_manager].reread_watchers(message)
+ else
+ Celluloid::Actor[:broker_manager].
+ end
+ end
+ end
+
+ end
+end
5 lib/noah-agent/version.rb
@@ -0,0 +1,5 @@
+module Noah
+ module Agent
+ VERSION = "0.0.1"
+ end
+end
59 lib/noah-agent/watchlist.rb
@@ -0,0 +1,59 @@
+module Noah::Agent
+ class WatchList
+ include Celluloid::Actor
+ attr_reader :watchlist
+
+ def initialize(noah_url)
+ LOGGER.info("Starting watchlist actor")
+ @noah_url = noah_url
+ end
+
+ def get_watchlist
+ LOGGER.info("Loading watchlist from Noah")
+ noah = Excon.get(@noah_url)
+ case noah.status
+ when 404
+ LOGGER.warn("Noah returned 404. Assuming empty watchlist")
+ when 500
+ data = MultiJSON.decode(noah.body)
+ LOGGER.fatal("Noah returned 500. This could be bad")
+ LOGGER.debug("Noah error: #{data["error_message"]}")
+ when 200
+ data = MultiJSON.decode(noah.body)
+ load_initial_watchers(data)
+ LOGGER.info("Starting up with #{@watchlist.keys.size} watchers")
+ LOGGER.debug("#{@watchlist.keys}")
+ end
+ end
+
+ def reread_watchers(new_watch)
+ LOGGER.info("Watch message found")
+ w = MultiJSON.decode(new_watch)
+ case w["action"]
+ when "delete"
+ LOGGER.info("Deleting watch: #{w["id"]}")
+ @watchlist.delete w["id"]
+ else
+ LOGGER.info("Adding new watch: #{w["id"]}")
+ @watchlist.merge! parse_watch(w)
+ end
+ LOGGER.info("New watch list size: #{@watchlist.keys.size}")
+ LOGGER.debug("Current watchlist: #{@watchlist.keys}")
+ end
+
+ private
+ def load_initial_watchers(watch_list)
+ LOGGER.debug("Parsing initial watch list")
+ watch_list.each do |watch|
+ @watchlist.merge! parse_watch(watch)
+ LOGGER.debug(@watchlist)
+ end
+ end
+
+ def parse_watch(watch)
+ LOGGER.info("Parsing watch: #{watch["id"]}")
+ LOGGER.debug("Watch contents: #{watch}")
+ {watch["id"] => {:pattern => watch["pattern"], :endpoint => watch["endpoint"]}}
+ end
+ end
+end
21 noah-agent.gemspec
@@ -0,0 +1,21 @@
+# -*- encoding: utf-8 -*-
+$:.push File.expand_path("../lib", __FILE__)
+require "noah-agent/version"
+
+Gem::Specification.new do |s|
+ s.name = "noah-agent"
+ s.version = Noah::Agent::VERSION
+ s.platform = Gem::Platform::RUBY
+ s.authors = ["TODO: Write your name"]
+ s.email = ["TODO: Write your email address"]
+ s.homepage = ""
+ s.summary = %q{TODO: Write a gem summary}
+ s.description = %q{TODO: Write a gem description}
+
+ s.rubyforge_project = "noah-agent"
+
+ s.files = `git ls-files`.split("\n")
+ s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
+ s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
+ s.require_paths = ["lib"]
+end
Please sign in to comment.
Something went wrong with that request. Please try again.