Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Bunch of work

  • Loading branch information...
commit 3f213c82d5d464742083632143964726d9857323 1 parent 7de1e47
Peter Kieltyka authored
View
12 Gemfile.lock
@@ -1,6 +1,6 @@
GIT
remote: git://github.com/igrigorik/em-http-request.git
- revision: ba1c1a3f34cb80c4bf4a49596238472f44e84522
+ revision: c7848ff95a603c0d97b465ed1f2058178b3f78c0
specs:
em-http-request (1.0.0.beta.4)
addressable (>= 2.2.3)
@@ -18,14 +18,14 @@ GIT
PATH
remote: .
specs:
- uber-s3 (0.1.0)
+ uber-s3 (0.1.1)
GEM
remote: http://rubygems.org/
specs:
addressable (2.2.6)
archive-tar-minitar (0.5.2)
- columnize (0.3.2)
+ columnize (0.3.3)
diff-lcs (1.1.2)
em-socksify (0.1.0)
eventmachine
@@ -33,15 +33,15 @@ GEM
eventmachine (1.0.0.beta.3-java)
http_parser.rb (0.5.1)
http_parser.rb (0.5.1-java)
- linecache (0.43)
+ linecache (0.45)
linecache19 (0.5.12)
ruby_core_source (>= 0.1.4)
- rake (0.9.1)
+ rake (0.9.2)
rspec (2.6.0)
rspec-core (~> 2.6.0)
rspec-expectations (~> 2.6.0)
rspec-mocks (~> 2.6.0)
- rspec-core (2.6.3)
+ rspec-core (2.6.4)
rspec-expectations (2.6.0)
diff-lcs (~> 1.1.2)
rspec-mocks (2.6.0)
View
6 README.md
@@ -33,7 +33,7 @@ o.save
# or..
-o = UberS3::Object.new(client.bucket, '/test.txt', 'heyo')
+o = UberS3::Object.new(s3.bucket, '/test.txt', 'heyo')
o.save # => true
@@ -125,7 +125,7 @@ Benchmarks were run with a speedy MBP on a 10Mbit connection
### Saving lots of 1KB files
<pre>
- user system total real
+ user system total real
saving 100x1024 byte objects (net-http) 0.160000 0.080000 0.240000 ( 26.128499)
saving 100x1024 byte objects (em-http-fibered) 0.080000 0.030000 0.110000 ( 0.917334)
</pre>
@@ -133,7 +133,7 @@ saving 100x1024 byte objects (em-http-fibered) 0.080000 0.030000 0.11000
### Saving lots of 500KB files
<pre>
- user system total real
+ user system total real
saving 100x512000 byte objects (net-http) 0.190000 0.740000 0.930000 ( 91.559123)
saving 100x512000 byte objects (em-http-fibered) 0.230000 0.700000 0.930000 ( 45.119033)
</pre>
View
2  lib/uber-s3/bucket.rb
@@ -56,7 +56,7 @@ def fetch(marker=nil)
# response = bucket.connection.get("/?prefix=#{CGI.escape(key)}&marker=#{marker}&max-keys=2")
if response[:status] != 200
- raise UberS3Error
+ raise UberS3Error, response.inspect
else
@objects = parse_contents(response[:body])
end
View
2  lib/uber-s3/connection.rb
@@ -35,7 +35,7 @@ def initialize(client, options={})
# Default headers
headers['Date'] = Time.now.httpdate if !headers.keys.include?('Date')
headers['User-Agent'] ||= "UberS3 v#{UberS3::VERSION}"
- headers['Connection'] = (persistent ? 'keep-alive' : 'close')
+ # headers['Connection'] = (persistent ? 'keep-alive' : 'close')
if body
headers['Content-Type'] ||= 'application/octet-stream'
View
8 lib/uber-s3/connection/em_http_fibered.rb
@@ -10,13 +10,15 @@ def request(verb, url, headers={}, body=nil)
params = {}
params[:head] = headers
params[:body] = body if body
-
+ # params[:keepalive] = true if persistent # causing issues ...?
+
r = EM::HttpRequest.new(url).send(verb, params)
-
+
{
:status => r.response_header.status,
:header => r.response_header,
- :body => r.response
+ :body => r.response,
+ :raw => r
}
end
View
3  lib/uber-s3/connection/net_http.rb
@@ -15,7 +15,8 @@ def request(verb, url, headers={}, body=nil)
{
:status => r.code.to_i,
:header => r.header.to_hash,
- :body => r.body
+ :body => r.body,
+ :raw => r
}
end
View
7 research/iterate/Gemfile
@@ -0,0 +1,7 @@
+source :rubygems
+
+gem 'eventmachine', '1.0.0.beta.3'
+gem 'em-http-request', :git => 'git://github.com/igrigorik/em-http-request.git'
+gem 'em-synchrony', :git => 'git://github.com/igrigorik/em-synchrony.git'
+
+gem 'ruby-debug19', :platforms => :mri_19
View
48 research/iterate/Gemfile.lock
@@ -0,0 +1,48 @@
+GIT
+ remote: git://github.com/igrigorik/em-http-request.git
+ revision: c7848ff95a603c0d97b465ed1f2058178b3f78c0
+ specs:
+ em-http-request (1.0.0.beta.4)
+ addressable (>= 2.2.3)
+ em-socksify
+ eventmachine (>= 1.0.0.beta.3)
+ http_parser.rb (>= 0.5.1)
+
+GIT
+ remote: git://github.com/igrigorik/em-synchrony.git
+ revision: 5c2aa0ec22509102a1e2fda659c1d2cea5cb6d5e
+ specs:
+ em-synchrony (0.3.0.beta.1)
+ eventmachine (>= 1.0.0.beta.1)
+
+GEM
+ remote: http://rubygems.org/
+ specs:
+ addressable (2.2.6)
+ archive-tar-minitar (0.5.2)
+ columnize (0.3.3)
+ em-socksify (0.1.0)
+ eventmachine
+ eventmachine (1.0.0.beta.3)
+ http_parser.rb (0.5.1)
+ linecache19 (0.5.12)
+ ruby_core_source (>= 0.1.4)
+ ruby-debug-base19 (0.11.25)
+ columnize (>= 0.3.1)
+ linecache19 (>= 0.5.11)
+ ruby_core_source (>= 0.1.4)
+ ruby-debug19 (0.11.6)
+ columnize (>= 0.3.1)
+ linecache19 (>= 0.5.11)
+ ruby-debug-base19 (>= 0.11.19)
+ ruby_core_source (0.1.5)
+ archive-tar-minitar (>= 0.5.2)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ em-http-request!
+ em-synchrony!
+ eventmachine (= 1.0.0.beta.3)
+ ruby-debug19
View
82 research/iterate/fiber_pool.rb
@@ -0,0 +1,82 @@
+# Author:: Mohammad A. Ali (mailto:oldmoe@gmail.com)
+# Copyright:: Copyright (c) 2008 eSpace, Inc.
+# License:: Distributes under the same terms as Ruby
+
+require 'fiber'
+
+class Fiber
+
+ #Attribute Reference--Returns the value of a fiber-local variable, using
+ #either a symbol or a string name. If the specified variable does not exist,
+ #returns nil.
+ def [](key)
+ local_fiber_variables[key]
+ end
+
+ #Attribute Assignment--Sets or creates the value of a fiber-local variable,
+ #using either a symbol or a string. See also Fiber#[].
+ def []=(key,value)
+ local_fiber_variables[key] = value
+ end
+
+ private
+
+ def local_fiber_variables
+ @local_fiber_variables ||= {}
+ end
+end
+
+class FiberPool
+
+ # gives access to the currently free fibers
+ attr_reader :fibers
+ attr_reader :busy_fibers
+
+ # Code can register a proc with this FiberPool to be called
+ # every time a Fiber is finished. Good for releasing resources
+ # like ActiveRecord database connections.
+ attr_accessor :generic_callbacks
+
+ # Prepare a list of fibers that are able to run different blocks of code
+ # every time. Once a fiber is done with its block, it attempts to fetch
+ # another one from the queue
+ def initialize(count = 100)
+ @fibers,@busy_fibers,@queue,@generic_callbacks = [],{},[],[]
+ count.times do |i|
+ fiber = Fiber.new do |block|
+ loop do
+ block.call
+ # callbacks are called in a reverse order, much like c++ destructor
+ Fiber.current[:callbacks].pop.call while Fiber.current[:callbacks].length > 0
+ generic_callbacks.each do |cb|
+ cb.call
+ end
+ unless @queue.empty?
+ block = @queue.shift
+ else
+ @busy_fibers.delete(Fiber.current.object_id)
+ @fibers.unshift Fiber.current
+ block = Fiber.yield
+ end
+ end
+ end
+ fiber[:callbacks] = []
+ fiber[:em_keys] = []
+ @fibers << fiber
+ end
+ end
+
+ # If there is an available fiber use it, otherwise, leave it to linger
+ # in a queue
+ def spawn(&block)
+ if fiber = @fibers.shift
+ fiber[:callbacks] = []
+ @busy_fibers[fiber.object_id] = fiber
+ fiber.resume(block)
+ else
+ @queue << block
+ end
+ self # we are keen on hiding our queue
+ end
+
+end
View
72 research/iterate/iterate.rb
@@ -0,0 +1,72 @@
+$:.unshift "."
+$:.unshift "../../lib"
+
+require 'bundler'
+Bundler.setup
+
+require 'ruby-debug'
+require 'eventmachine'
+require 'em-http'
+require 'em-synchrony'
+require 'em-synchrony/em-http'
+
+require 'fiber_pool'
+require 'uber-s3'
+
+
+# ************* Main issue:
+# At a pool size of 50 or above .. this thing will crash within 15 seconds
+# .. but at pool of 10 .. it will keep going and going..
+# no idea why .. perhaps my connection is tapped, and the queue increases
+# or file descriptor limit.. etc. eventually the EM deferred_status will
+# be :failed
+@fiber_pool = FiberPool.new(100)
+
+EM.run do
+
+ s3 = UberS3.new({
+ :access_key => 'x',
+ :secret_access_key => 'y',
+ :bucket => 'bucket-goes-here',
+ :adapter => :em_http_fibered
+ })
+
+ @num = 0
+ @start = Time.now
+
+ trap(:INT) do
+ finish = Time.now
+ puts "I'm outta here -- #{@num} -- #{finish - @start}"
+ exit
+ end
+
+ Fiber.new {
+
+ # Grab up to 50 objects
+ list = []
+ s3.objects('/').each do |obj|
+ list << obj
+ break if list.length > 50
+ end
+
+ # We duplicate the list to make sure we have enough objects to iterate
+ (list*10000).each do |obj|
+ @fiber_pool.spawn do
+ x = obj.bucket.connection.head(obj.key)
+
+ if x[:status] == 0
+ puts "ERROR: we got 0 status.. weird.. here's the raw response"
+ # For more details, throw a debugger in here and look at x[:raw] closer
+ puts x[:raw].inspect
+
+ exit
+ end
+
+ puts obj.to_s + " -- #{x[:status]}"
+ @num += 1 if x
+ end
+ end
+
+ }.resume
+
+end
View
118 research/parallel-http.rb
@@ -1,6 +1,7 @@
require 'bundler'
Bundler.setup
+require 'ruby-debug'
require 'eventmachine'
require 'em-http-request'
require 'fiber'
@@ -13,7 +14,114 @@ def http_get(url)
req.callback { f.resume(req) }
req.errback { f.resume(req) }
- return Fiber.yield
+ Fiber.yield
+end
+
+EM.run do
+
+ urls = [
+ 'http://nulayer.com/',
+ 'http://google.ca/'# ,
+ # 'http://facebook.com/',
+ # 'http://nulayer.com',
+ # 'http://data.crowdreel.com.s3.amazonaws.com/2011/05/17-02/1f55685cc1e9c91d80079a49485c718b1f53d97f.jpg',
+ # 'http://data.crowdreel.com.s3.amazonaws.com/2011/05/17-02/54acc4f94b81268982aab94e0d273693962741ab.jpg',
+ # 'http://data.crowdreel.com.s3.amazonaws.com/2011/05/17-02/66ca6cb906f1b517cc576477b3d69e155dc21284.jpg',
+ # 'http://data.crowdreel.com.s3.amazonaws.com/2011/05/17-02/cfeb066a4f0ba75a2081898a6c750d371aafd7e9.jpg'
+ ]
+ processed = 0
+
+Fiber.new {
+ urls.each do |url|
+ # url = urls.first
+
+ x = nil
+ page = nil
+
+ Fiber.new {
+ puts "START => #{url}"
+ page = http_get(url)
+ puts "DONE:"+page.response_header.status.to_s+" from #{url}"
+
+ t = nil
+ f = Fiber.current
+
+ j = Fiber.new {
+ # 1. This fiber will execute.. then yield once it setups the async callbacks
+ puts "************ 1"
+ t = http_get(url).response_header.status
+
+ # **** But... we want the code to pause here... and let another fiber go
+ # or just block .. like typical sync API
+
+ puts "############ #{t}"
+
+ f.resume
+
+ puts "************ 3"
+ # 3. code will resume from here once the callback is done...
+
+ # puts "b:"+t.to_s
+ }.resume
+
+ Fiber.yield
+
+
+ puts "************ 2"
+ # 2. code will continue here even before the callback returns...? .. we don't want
+ # that tho .. if we can figure that out .. then we're golden..
+
+
+ # NOTE: we will never get the varaibles we need.. with async+fiber .. we have to put
+ # the context in blocks .. only way .. we can't return shit either
+ # puts ">>>b:"+t.to_s
+
+ # j = Fiber.new {
+ # puts "c:"+http_get("http://google.ca/").response_header.status.to_s
+ # }.resume
+ #
+ #
+ # j = Fiber.new {
+ # puts "d:"+http_get("http://google.ca/").response_header.status.to_s
+ # }.resume
+
+ # puts "jjjjjjj #{j}"
+
+ x = "WOOOOOOOOOT"
+ processed += 1
+ }.resume
+
+ puts "zzzzzz #{x}"
+ puts ">>>>>> #{page}"
+
+ # processed += 1
+ # if processed == urls.length
+ # EM.stop
+ # end
+ end
+}.resume
+
+end
+
+
+
+__END__
+require 'bundler'
+Bundler.setup
+
+require 'eventmachine'
+require 'em-http-request'
+require 'fiber'
+
+
+def http_get(url)
+ f = Fiber.current
+
+ req = EM::HttpRequest.new(url).get
+ req.callback { f.resume(req) }
+ req.errback { f.resume(req) }
+
+ Fiber.yield
end
EM.run do
@@ -30,9 +138,9 @@ def http_get(url)
]
processed = 0
-Fiber.new {
+# Fiber.new {
urls.each do |url|
- # Fiber.new {
+ Fiber.new {
puts "START => #{url}"
page = http_get(url)
puts "DONE => #{url}"
@@ -42,8 +150,8 @@ def http_get(url)
EM.stop
end
- # }.resume
+ }.resume
end
-}.resume
+# }.resume
end
View
5 spec/spec_helper.rb
@@ -43,9 +43,8 @@ def spec(client, &block)
when 'UberS3::Connection::NetHttp'
block.call(client)
when 'UberS3::Connection::EmHttpFibered'
- EM.synchrony do
- block.call(client)
- EM.stop
+ EM.run do
+ Fiber.new { block.call(client); EM.stop }.resume
end
else
raise "Unknown connection adapter"
Please sign in to comment.
Something went wrong with that request. Please try again.