Skip to content

Commit

Permalink
first working prototype - can send Atom diffs to callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
zh committed Jul 17, 2009
1 parent ea3bed0 commit 3765c68
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -4,3 +4,4 @@
*.log
*.db
backup
feeds/*.yml
Empty file added feeds/.gitkeep
Empty file.
112 changes: 112 additions & 0 deletions topics.rb
@@ -0,0 +1,112 @@
# Small fix for feeds with 'xhtml' type content
module Atom
class Content
class Xhtml < Base
def to_xml(nodeonly = true, name = 'content', namespace = nil, namespace_map = Atom::Xml::NamespaceMap.new)
node = XML::Node.new("#{namespace_map.prefix(Atom::NAMESPACE, name)}")
node['type'] = 'xhtml'
# fixed line - FriendFeed send 'xhtml' type content WITHOUT xml_lang :(
node['xml:lang'] = self.xml_lang ? self.xml_lang : "en"

div = XML::Node.new('div')
div['xmlns'] = XHTML

p = XML::Parser.string(to_s)
content = p.parse.root.copy(true)
div << content

node << div
node
end
end
end
end

module WebGlue

FEEDS_DIR=(File.join(File.dirname(__FILE__), 'feeds')).freeze

class InvalidTopicException < Exception; end

class Topic

attr_reader :entries

def Topic.feeds_dir()
return FEEDS_DIR
end

def Topic.to_hash(url)
[url].pack("m*").strip!
end

def Topic.to_url(hash)
hash.unpack("m")[0]
end

def Topic.sync(url)
raise InvalidTopicException unless url
feed = nil
begin
MyTimer.timeout(GIVEUP) do
feed = Atom::Feed.load_feed(URI.parse(url))
end
rescue
raise InvalidTopicException
end
raise InvalidTopicException unless feed
return feed
end

def Topic.load_file(hash)
path = File.join(FEEDS_DIR,"#{hash}.yml")
raise InvalidTopicException unless File.exists?(path)
return YAML::load_file(path)
end

def Topic.load_url(url)
raise InvalidTopicException unless url
h = Topic.to_hash(url)
return Topic.load_file(h)
end

def Topic.save!(url, feed)
raise InvalidTopicException unless (url and feed)
h = Topic.to_hash(url)
File.open(File.join(FEEDS_DIR,"#{h}.yml"), "w") do |out|
YAML::dump(feed, out)
end
end

def Topic.diff(url, to_atom = false)
raise InvalidTopicException unless url

begin
old_feed = Topic.load_url(url)
urls = old_feed.entries.collect {|e| e.links.first.href }
rescue InvalidTopicException
urls = []
end

new_feed = nil
begin
MyTimer.timeout(GIVEUP) do
new_feed = Atom::Feed.load_feed(URI.parse(url))
end
rescue Exception => e
raise e.to_s
end
raise InvalidTopicException unless new_feed

Topic.save!(url, new_feed)

new_feed.entries.delete_if {|e| urls.include?(e.links.first.href) }
return nil unless urls.length > 0 # do not send everything first time
return nil unless new_feed.entries.length > 0
return to_atom ? new_feed.to_xml : new_feed.entries
end

def Topic.atom_diff(url)
end
end
end
60 changes: 41 additions & 19 deletions webglue.rb
Expand Up @@ -5,6 +5,7 @@
require 'json'
require 'crack'
require 'httpclient'
require 'atom'

begin
require 'system_timer'
Expand All @@ -14,7 +15,10 @@
MyTimer = Timeout
end

require 'topics'

configure do
GIVEUP = 10
DB = Sequel.connect(ENV['DATABASE_URL'] || 'sqlite://webglue.db')

unless DB.table_exists? "topics"
Expand Down Expand Up @@ -49,14 +53,6 @@ def gen_id
Zlib.crc32(base + salt).to_s(36)
end

def url2hash(url)
[url].pack("m*").strip!
end

def hash2url(hash)
hash.unpack("m")[0]
end

def atom_time(date)
date.getgm.strftime("%Y-%m-%dT%H:%M:%SZ")
end
Expand All @@ -75,8 +71,26 @@ def atom_parse(text)
r
end

# post a message to a list of subscribers (urls)
def postman(subs, msg)
subs.each do |sub|
begin
MyTimer.timeout(GIVEUP) do
HTTPClient.post(sub, msg)
end
rescue Exception => e
case e
when Timeout::Error
puts "Timeout: #{sub}"
else
puts e.to_s
end
next
end
end
end

# verify subscribers callback
# TODO: use it in /hub/subscribe
def do_verify(url, data)
return false unless url and data
begin
Expand All @@ -85,11 +99,12 @@ def do_verify(url, data)
'hub.topic' => data[:topic],
'hub.challenge' => challenge,
'hub.verify_token' => data[:vtoken]}
MyTimer.timeout(5) do
MyTimer.timeout(GIVEUP) do
res = HTTPClient.get_content(url, query)
return false unless res and res == challenge
end
rescue
rescue Exception => e
puts e.to_s
return false
end
return true
Expand All @@ -109,13 +124,19 @@ def do_verify(url, data)
end
throw :halt, [400, "Bad request: Empty 'hub.url' parameter"] if params['hub.url'].empty?
begin
hash = url2hash(params['hub.url'])
hash = WebGlue::Topic.to_hash(params['hub.url'])
topic = DB[:topics].filter(:url => hash)
if topic.first # already registered
# minimum 5 min interval between pings
time_diff = (Time.now - topic.first[:updated]).to_i
throw :halt, [200, "204 Try after #{(300-time_diff)/60 +1} min"] if time_diff < 300
topic.update(:updated => Time.now)
# TODO: find the differences from the previous feed fetch
subscribers = DB[:subscriptions].filter(:topic_id => topic.first[:id])
urls = subscribers.collect { |u| WebGlue::Topic.to_url(u[:callback]) }
atom_diff = WebGlue::Topic.diff(params['hub.url'], true)
postman(urls, atom_diff) if (urls.length > 0 and atom_diff)
else
DB[:topics] << { :url => hash, :created => Time.now }
DB[:topics] << { :url => hash, :created => Time.now, :updated => Time.now }
end
throw :halt, [204, "204 No Content"]
rescue Exception => e
Expand All @@ -142,25 +163,26 @@ def do_verify(url, data)

# For now, only using the first preference of verify mode
verify = verify.split(',').first
throw :halt, [400, "Bad request: Unrecognized verification mode"] unless ['sync', 'async'].include?(verify)
# throw :halt, [400, "Bad request: Unrecognized verification mode"] unless ['sync', 'async'].include?(verify)
# will support anly 'sync' mode for now
throw :halt, [400, "Bad request: Unrecognized verification mode"] unless verify == 'sync'
begin
hash = url2hash(topic)
hash = WebGlue::Topic.to_hash(topic)
tp = DB[:topics].filter(:url => hash).first
throw :halt, [404, "Not Found"] unless tp[:id]

state = (verify == 'async') ? 1 : 0
data = { :mode => mode, :verify => verify, :vtoken => vtoken, :topic => topic }
if verify == 'sync'
# raise "sync do_verify() failed" unless do_verify(callback, data)
raise "sync do_verify() failed" unless do_verify(callback, data)
state = 0
end

# Add subscription
# subscribe/unsubscribe to/from ALL channels with that topici
cb = url2hash(callback)
cb = WebGlue::Topic.to_hash(callback)
if mode == 'subscribe'
unless DB[:subscriptions].filter(:topic_id => tp[:id], :callback => cb).first
puts "callback: #{cb}"
raise "DB insert failed" unless DB[:subscriptions] << {
:topic_id => tp[:id], :callback => cb, :vtoken => vtoken, :state => state }
end
Expand Down

0 comments on commit 3765c68

Please sign in to comment.