Skip to content

Commit

Permalink
Register latest changes
Browse files Browse the repository at this point in the history
  • Loading branch information
julik committed Feb 26, 2009
1 parent 1056536 commit 8be7c9d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 34 deletions.
2 changes: 2 additions & 0 deletions History.txt
@@ -1,3 +1,5 @@
* Use streams for file uploads instead of in-memory strings (Julik)

== 0.5.0 / 2009-01-07

* Fixed bug in user favorites (thanks Pius Uzamere)
Expand Down
39 changes: 19 additions & 20 deletions lib/youtube_g/chain_io.rb
@@ -1,42 +1,41 @@
# Stream wrapper that read's IOs in succession. Can be fed to Net::HTTP. We use it to send a mixture of StringIOs
# and File handles to Net::HTTP which will be used when sending the request, not when composing it. Will skip over
# depleted IOs. Can be also used to roll across files like so:
#
# tape = TapeIO.new(File.open(__FILE__), File.open('/etc/passwd'))
require 'delegate'
class YouTubeG
# Stream wrapper that reads IOs in succession. Can be fed to Net::HTTP as post body stream. We use it internally to stream file content
# instead of reading whole video files into memory. Strings passed to the constructor will be wrapped in StringIOs. By default it will auto-close
# file handles when they have been read completely to prevent our uploader from leaking file handles
#
# chain = ChainIO.new(File.open(__FILE__), File.open('/etc/passwd'), "abcd")
class ChainIO
attr_accessor :substreams
attr_accessor :release_after_use
attr_accessor :autoclose

def initialize(*any_ios)
@release_after_use = true
@pending = any_ios.flatten.map{|e| e.respond_to?(:read) ? e : StringIO.new(e.to_s) }
@autoclose = true
@chain = any_ios.flatten.map{|e| e.respond_to?(:read) ? e : StringIO.new(e.to_s) }
end

def read(buffer_size = 1024)
# Read off the first element in the stack
current_io = @pending.shift
current_io = @chain.shift
return false if !current_io

buf = current_io.read(buffer_size)
if !buf && @pending.empty? # End of streams
release_handle(current_io)
if !buf && @chain.empty? # End of streams
release_handle(current_io) if @autoclose
false
elsif !buf # This IO is depleted, but next one is available
release_handle(current_io)
release_handle(current_io) if @autoclose
read(buffer_size)
elsif buf.length < buffer_size # This IO is depleted, but there might be more
release_handle(current_io)
elsif buf.length < buffer_size # This IO is depleted, but we were asked for more
release_handle(current_io) if @autoclose
buf + (read(buffer_size - buf.length) || '') # and recurse
else # just return the buffer
@pending.unshift(current_io) # put the current back
@chain.unshift(current_io) # put the current back
buf
end
end

def expected_length
@pending.inject(0) do | len, io |
@chain.inject(0) do | len, io |
if io.respond_to?(:length)
len + (io.length - io.pos)
elsif io.is_a?(File)
Expand All @@ -49,20 +48,20 @@ def expected_length

private
def release_handle(io)
return unless @release_after_use
io.close if io.respond_to?(:close)
end
end

# Net::HTTP only can send chunks of 1024 bytes. This is very inefficient, so we have a spare IO that will send more when asked for 1024
class GreedyChainIO < DelegateClass(ChainIO)
CHUNK = 512 * 1024 # 500 kb
BIG_CHUNK = 512 * 1024 # 500 kb

def initialize(*with_ios)
__setobj__(ChainIO.new(with_ios))
end

def read(any_buffer_size)
__getobj__.read(CHUNK)
__getobj__.read(BIG_CHUNK)
end
end
end
28 changes: 14 additions & 14 deletions lib/youtube_g/request/video_upload.rb
Expand Up @@ -64,15 +64,14 @@ def upload data, opts = {}
"Slug" => "#{@opts[:filename]}",
"Content-Type" => "multipart/related; boundary=#{boundary}",
"Content-Length" => "#{post_body_io.expected_length}",
"Transfer-Encoding" => "chunked" # We will stream instead of posting at once
# "Transfer-Encoding" => "chunked" # We will stream instead of posting at once
}

Net::HTTP.start(base_url) do | session |
# Use the more convoluted request creation to use the IO as post body. Due to the
# fact that Net::HTTP has been written by alies from Jupiter there is no other way to do it,
# at least currently
path = '/feeds/api/users/%s/uploads' % @user
post = HTTP::Post.new(nil, nil, path, upload_headers)
post = Net::HTTP::Post.new(path, upload_headers)

# Use the chained IO as body so that Net::HTTP reads into the socket for us
post.body_stream = post_body_io

response = session.request(post)
Expand Down Expand Up @@ -134,16 +133,17 @@ def video_xml #:nodoc:
end

def generate_upload_body(boundary, video_xml, data) #:nodoc:
post_body = []
post_body << "--#{boundary}\r\n"
post_body << "Content-Type: application/atom+xml; charset=UTF-8\r\n\r\n"
post_body << video_xml
post_body << "\r\n--#{boundary}\r\n"
post_body << "Content-Type: #{@opts[:mime_type]}\r\nContent-Transfer-Encoding: binary\r\n\r\n"
post_body << data
post_body << "\r\n--#{boundary}--\r\n"
post_body = [
"--#{boundary}\r\n",
"Content-Type: application/atom+xml; charset=UTF-8\r\n\r\n",
video_xml,
"\r\n--#{boundary}\r\n",
"Content-Type: #{@opts[:mime_type]}\r\nContent-Transfer-Encoding: binary\r\n\r\n",
data,
"\r\n--#{boundary}--\r\n",
]

YouTubeG::TapeIO.new(post_body)
YouTubeG::GreedyChainIO.new(post_body)
end

end
Expand Down

0 comments on commit 8be7c9d

Please sign in to comment.