Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

simple client

  • Loading branch information...
commit 34d9f03a40eeaa456c8c1857d25df3635ad0b031 1 parent 4184f75
Roman Kamyk authored
Showing with 49 additions and 135 deletions.
  1. +4 −4 twitter/app/actions/UserTimeline.rb
  2. +45 −131 twitter/app/model.rb
View
8 twitter/app/actions/UserTimeline.rb
@@ -2,17 +2,17 @@ class UserTimeline < Cramp::Action
use_fiber_pool# :size => 1000
def start
- DB.new.db_for_user(params[:screen_name]) do |db, id, pool, fiber|
+ DB.new.db_for_user(params[:screen_name]) do |db, id, conns|
if db == nil
render "WON!"
finish
- pool.release(fiber)
+ conns.each { |c| c.close }
else
#puts "Query on #{fiber.inspect}"
q = db.aquery("SELECT * FROM statuses WHERE user_id = #{id} LIMIT 20")
q.errback do |r|
puts "Error123: #{r}"
- pool.release(fiber)
+ conns.each { |c| c.close }
finish
end
q.callback do |r|
@@ -23,7 +23,7 @@ def start
render result
render ']'
#puts "Releasing #{fiber.inspect} from #{db.inspect}"
- pool.release(fiber)
+ conns.each { |c| c.close }
finish
end
end
View
176 twitter/app/model.rb
@@ -1,131 +1,46 @@
require 'em-synchrony/mysql2'
require 'cramp/exception_handler'
-module Twit
- class ConnectionPool
- def initialize(opts, &block)
- @reserved = {} # map of in-progress connections
- @available = [] # pool of free connections
- @pending = [] # pending reservations (FIFO)
-
- opts[:size].times do
- @available.push(block.call) if block_given?
- end
- end
-
- # Choose first available connection and pass it to the supplied
- # block. This will block indefinitely until there is an available
- # connection to service the request.
- def execute(async)
- f = Fiber.current
-
- begin
- conn = acquire(f)
- yield conn, f
- ensure
- release(f) if not async
- end
- end
-
- # Acquire a lock on a connection and assign it to executing fiber
- # - if connection is available, pass it back to the calling block
- # - if pool is full, yield the current fiber until connection is available
- def acquire(fiber)
- #puts "ACQUIRE #{fiber.inspect}"
-
- if conn = @available.pop
- @reserved[fiber.object_id] = conn
- conn
- else
- Fiber.yield @pending.push fiber
- acquire(fiber)
- end
- end
-
- # Release connection assigned to the supplied fiber and
- # resume any other pending connections (which will
- # immediately try to run acquire on the pool)
- def release(fiber)
- #puts "RELEASEING: #{fiber.inspect}"
- @available.push(@reserved.delete(fiber.object_id))
-
- if pending = @pending.shift
- pending.resume
- end
- end
-
- # Allow the pool to behave as the underlying connection
- #
- # If the requesting method begins with "a" prefix, then
- # hijack the callbacks and errbacks to fire a connection
- # pool release whenever the request is complete. Otherwise
- # yield the connection within execute method and release
- # once it is complete (assumption: fiber will yield until
- # data is available, or request is complete)
- #
- def method_missing(method, *args, &blk)
- async = (method[0,1] == "a")
-
- execute(async) do |conn|
- df = conn.send(method, *args, &blk)
-
- if async
- fiber = Fiber.current
- df.callback { release(fiber) }
- df.errback { release(fiber) }
- end
-
- df
- end
- end
- end
-
-end
-
class DB
SHARD_COUNT = 4
- #MAX_CONN = 8
#DBCONN = {:host => "10.1.1.10", :username => "devcamp", :password => "devcamp"}
- #DBCONN = {:host => "localhost", :username => "root"}
- @@db = Twit::ConnectionPool.new(size: MAX_CONN/SHARD_COUNT) do
- #puts "PULA"
+ DBCONN = {:host => "localhost", :username => "root"}
+ @@db = Proc.new do
(1..SHARD_COUNT).map do |i|
conn = Mysql2::EM::Client.new(DBCONN.merge(:database => "twitter#{i}"))
- [conn, i]
end
end
def db_for_user(name)
counter = SHARD_COUNT
ret = []
- @@db.execute(true) do |acquired, fiber|
- #puts "Acquired #{fiber.inspect} on dbs: #{acquired.inspect}"
- acquired.each do |db, dbno|
- #puts "Querying: #{db.inspect}"
- q = db.aquery("SELECT id FROM users WHERE screen_name = '#{name}'")
- q.errback do |r|
- #puts "User get error: #{r}"
- counter -= 1
- end
- q.callback do |r|
- #puts "Result from #{db.inspect}"
- counter -= 1
- if r.size > 0
- r.each do |userRow|
- ret = [db, userRow['id']]
- end
- end
- if counter == 0
- yield ret[0], ret[1], @@db, fiber
+ acquired = @@db.call
+ #puts "Acquired #{fiber.inspect} on dbs: #{acquired.inspect}"
+ acquired.each do |db, dbno|
+ #puts "Querying: #{db.inspect}"
+ q = db.aquery("SELECT id FROM users WHERE screen_name = '#{name}'")
+ q.errback do |r|
+ #puts "User get error: #{r}"
+ counter -= 1
+ end
+ q.callback do |r|
+ #puts "Result from #{db.inspect}"
+ counter -= 1
+ if r.size > 0
+ r.each do |userRow|
+ ret = [db, userRow['id']]
end
end
+ if counter == 0
+ yield ret[0], ret[1], acquired
+ end
end
end
end
def home_timeline(name, &blk)
- db_for_user(name) do |db, user_id, pool, fiber|
- pool.release(fiber)
+ db_for_user(name) do |db, user_id, conns|
+ conns.each { |c| c.close }
if db.nil?
yield nil
else
@@ -137,31 +52,30 @@ def home_timeline(name, &blk)
def query_all(query)
counter = SHARD_COUNT
result = []
- @@db.execute(true) do |acquired, fiber|
- check_finish = Proc.new do
- #puts "Checking #{counter}"
- counter -= 1
- if counter == 0
- @@db.release(fiber)
- puts "SENDING RESULTS #{result.size}"
- yield result
- end
+ acquired = @@db.call
+ check_finish = Proc.new do
+ #puts "Checking #{counter}"
+ counter -= 1
+ if counter == 0
+ acquired.each { |c| c.close }
+ puts "SENDING RESULTS #{result.size}"
+ yield result
end
- acquired.each do |db, dbno|
- #puts "Querying: #{db.inspect}"
- begin
- q = db.aquery(query)
- rescue => e
- p e
- next
- end
- q.errback { |r| puts "ERROR in #{query} on #{db.inspect}:\n#{r}"; check_finish.call }
- q.callback do |r|
- puts "Partial results: #{r.size} for #{dbno}"
- result += r.each.to_a
- check_finish.call
- #puts "Bye"
- end
+ end
+ acquired.each do |db, dbno|
+ #puts "Querying: #{db.inspect}"
+ begin
+ q = db.aquery(query)
+ rescue => e
+ p e
+ next
+ end
+ q.errback { |r| puts "ERROR in #{query} on #{db.inspect}:\n#{r}"; check_finish.call }
+ q.callback do |r|
+ puts "Partial results: #{r.size} for #{dbno}"
+ result += r.each.to_a
+ check_finish.call
+ #puts "Bye"
end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.