diff --git a/lib/net/rtmp.rb b/lib/net/rtmp.rb index 5bdc6e0..cd07575 100644 --- a/lib/net/rtmp.rb +++ b/lib/net/rtmp.rb @@ -17,4 +17,3 @@ def connect end end end - diff --git a/lib/net/rtmp/amf.rb b/lib/net/rtmp/amf.rb index f340e00..d9fc2fe 100644 --- a/lib/net/rtmp/amf.rb +++ b/lib/net/rtmp/amf.rb @@ -3,95 +3,94 @@ require 'stringio' module Net -class RTMP -class AMF - EndOfPacket = Class.new - - DECODE_DATA_TYPE = { - 0x00 => :number, - 0x01 => :boolean, - 0x02 => :string, - 0x03 => :object, - 0x04 => :movieclip, - 0x05 => :null, - 0x06 => :undefined, - 0x07 => :reference, - 0x08 => :ecma_array, - 0x09 => :object_end, - 0x0a => :strict_array, - 0x0b => :date, - 0x0c => :long_string, - 0x0d => :unsupported, - 0x0e => :record_set, - 0x0f => :xml_object, - 0x10 => :typed_object - } - ENCODE_DATA_TYPE = DECODE_DATA_TYPE.invert - - def initialize - @elements = [] - end - - def parse(data) - @elements = recursive_parse(Bytestream.new(StringIO.new(data))) - end + class RTMP + class AMF + EndOfPacket = Class.new + + DECODE_DATA_TYPE = { + 0x00 => :number, + 0x01 => :boolean, + 0x02 => :string, + 0x03 => :object, + 0x04 => :movieclip, + 0x05 => :null, + 0x06 => :undefined, + 0x07 => :reference, + 0x08 => :ecma_array, + 0x09 => :object_end, + 0x0a => :strict_array, + 0x0b => :date, + 0x0c => :long_string, + 0x0d => :unsupported, + 0x0e => :record_set, + 0x0f => :xml_object, + 0x10 => :typed_object + } + ENCODE_DATA_TYPE = DECODE_DATA_TYPE.invert + + def initialize + @elements = [] + end + + def parse(data) + @elements = recursive_parse(Bytestream.new(StringIO.new(data))) + end + + def to_a + @elements + end + + private + def recursive_parse(bytestream) + elements = [] + until bytestream.eof? || (e = next_element(bytestream)) == EndOfPacket + elements << e + end + elements + end + + def next_element(bytestream) + data_type = DECODE_DATA_TYPE[read_data_type(bytestream)] + value = __send__("read_#{data_type}", bytestream) + value + end + + def read_data_type(bytestream) + bytestream.read_uint8 + end + + def read_length_prefixed_data(bytestream) + length = bytestream.read_uint16_be + bytestream.read(length) + end + + alias_method :read_string, :read_length_prefixed_data + alias_method :read_long_string, :read_length_prefixed_data + + def read_number(bytestream) + bytestream.read_double_be + end + + def read_boolean(bytestream) + bytestream.read_uint8 != 0 + end + + def read_object(bytestream) + hash = {} + until (key = read_length_prefixed_data(bytestream)) == '' + hash[key] = next_element(bytestream) + end + hash + end + + def read_null(_) + nil + end + + def read_object_end(_) + EndOfPacket + end - def to_a - @elements - end - -private - - def recursive_parse(bytestream) - elements = [] - until bytestream.eof? || (e = next_element(bytestream)) == EndOfPacket - elements << e end - elements - end - - def next_element(bytestream) - data_type = DECODE_DATA_TYPE[read_data_type(bytestream)] - value = __send__("read_#{data_type}", bytestream) - value - end - - def read_data_type(bytestream) - bytestream.read_uint8 end - - def read_length_prefixed_data(bytestream) - length = bytestream.read_uint16_be - bytestream.read(length) - end - - alias_method :read_string, :read_length_prefixed_data - alias_method :read_long_string, :read_length_prefixed_data - - def read_number(bytestream) - bytestream.read_double_be - end - - def read_boolean(bytestream) - bytestream.read_uint8 != 0 - end - - def read_object(bytestream) - hash = {} - until (key = read_length_prefixed_data(bytestream)) == '' - hash[key] = next_element(bytestream) - end - hash - end - - def read_null(_) - nil - end - - def read_object_end(_) - EndOfPacket - end - -end -end end diff --git a/lib/net/rtmp/connection.rb b/lib/net/rtmp/connection.rb index bb92de6..bc2dbdd 100644 --- a/lib/net/rtmp/connection.rb +++ b/lib/net/rtmp/connection.rb @@ -2,85 +2,84 @@ require 'net/rtmp/errors' module Net -class RTMP -class Connection - HEADER_BYTE = "\x03" - HANDSHAKE_LENGTH = 1536 + class RTMP + class Connection + HEADER_BYTE = "\x03" + HANDSHAKE_LENGTH = 1536 - def initialize(socket) - @socket = WrappedSocket.new(socket) - @packets = {} - @headers = {} - end + def initialize(socket) + @socket = WrappedSocket.new(socket) + @packets = {} + @headers = {} + end - def handshake - @socket.write("\x03" + random_string(HANDSHAKE_LENGTH)) - shared = @socket.read(2 * HANDSHAKE_LENGTH + 1)[(HANDSHAKE_LENGTH + 1)..-1] - @socket.write(shared) - end + def handshake + @socket.write("\x03" + random_string(HANDSHAKE_LENGTH)) + shared = @socket.read(2 * HANDSHAKE_LENGTH + 1)[(HANDSHAKE_LENGTH + 1)..-1] + @socket.write(shared) + end - def get_data - header = Packet::Header.new - header.parse(@socket) - if packet = @packets[header.oid] - packet.endow(header) - else - if previous_header = @headers[header.oid] - header.inherit(previous_header) + def get_data + header = Packet::Header.new + header.parse(@socket) + if packet = @packets[header.oid] + packet.endow(header) + else + if previous_header = @headers[header.oid] + header.inherit(previous_header) + end + packet = @packets[header.oid] = Packet.new(header) + end + @headers[header.oid] = header + packet << @socket.read(packet.bytes_to_fetch) + if packet.complete? + @packets.delete(header.oid) + yield packet + end end - packet = @packets[header.oid] = Packet.new(header) - end - @headers[header.oid] = header - packet << @socket.read(packet.bytes_to_fetch) - if packet.complete? - @packets.delete(header.oid) - yield packet - end - end - def need_data? - @packets.any? - end + def need_data? + @packets.any? + end - def fetch(&blk) - get_data(&blk) - while need_data? - get_data(&blk) - end - end + def fetch(&blk) + get_data(&blk) + while need_data? + get_data(&blk) + end + end - def send(packet) - packet.generate do |chunk| - @socket.write(chunk) - end - end + def send(packet) + packet.generate do |chunk| + @socket.write(chunk) + end + end -private + private + def random_string(length) + (0...length).map{ rand(256) }.pack('C*') + end - def random_string(length) - (0...length).map{ rand(256) }.pack('C*') - end + class WrappedSocket + def initialize(socket) + @socket = socket + end - class WrappedSocket - def initialize(socket) - @socket = socket - end + def read(length=nil) + if length + data = @socket.read(length) + raise NoMoreData if data.nil? + return data + else + return @socket.read + end + end - def read(length=nil) - if length - data = @socket.read(length) - raise NoMoreData if data.nil? - return data - else - return @socket.read + def write(data) + @socket.write(data) + end end - end - def write(data) - @socket.write(data) end end - -end -end end diff --git a/lib/net/rtmp/constants.rb b/lib/net/rtmp/constants.rb index 5cdf58c..49e7318 100644 --- a/lib/net/rtmp/constants.rb +++ b/lib/net/rtmp/constants.rb @@ -1,36 +1,36 @@ module Net -class RTMP - PORT = 1935 + class RTMP + PORT = 1935 - SHARED_OBJECTS = { - :connect => 0x01, - :disconnect => 0x02, - :set_attribute => 0x03, - :update_data => 0x04, - :update_attribute => 0x05, - :send_message => 0x06, - :status => 0x07, - :clear_data => 0x08, - :delete_data => 0x09, - :delete_attribute => 0x0a, - :initial_data => 0x0b - } + SHARED_OBJECTS = { + :connect => 0x01, + :disconnect => 0x02, + :set_attribute => 0x03, + :update_data => 0x04, + :update_attribute => 0x05, + :send_message => 0x06, + :status => 0x07, + :clear_data => 0x08, + :delete_data => 0x09, + :delete_attribute => 0x0a, + :initial_data => 0x0b + } - DATATYPES = { - :chunk_size => 0x01, - :bytes_read => 0x03, - :ping => 0x04, - :server_bw => 0x05, - :client_bw => 0x06, - :audio_data => 0x08, - :video_data => 0x09, - :flex_stream => 0x0f, - :flex_shared_object => 0x10, - :flex_message => 0x11, - :notify => 0x12, - :shared_object => 0x13, - :invoke => 0x14, - :flv_data => 0x16 - } -end + DATATYPES = { + :chunk_size => 0x01, + :bytes_read => 0x03, + :ping => 0x04, + :server_bw => 0x05, + :client_bw => 0x06, + :audio_data => 0x08, + :video_data => 0x09, + :flex_stream => 0x0f, + :flex_shared_object => 0x10, + :flex_message => 0x11, + :notify => 0x12, + :shared_object => 0x13, + :invoke => 0x14, + :flv_data => 0x16 + } + end end diff --git a/lib/net/rtmp/errors.rb b/lib/net/rtmp/errors.rb index 2bc3c2b..f8c5c99 100644 --- a/lib/net/rtmp/errors.rb +++ b/lib/net/rtmp/errors.rb @@ -1,5 +1,5 @@ module Net -class RTMP - class NoMoreData < RuntimeError; end -end + class RTMP + class NoMoreData < RuntimeError; end + end end diff --git a/lib/net/rtmp/packet.rb b/lib/net/rtmp/packet.rb index 626fdfe..d3153f6 100644 --- a/lib/net/rtmp/packet.rb +++ b/lib/net/rtmp/packet.rb @@ -1,63 +1,62 @@ require 'net/rtmp/packet/header' module Net -class RTMP -class Packet - - attr_accessor :body, :oid, :timestamp, :content_type, :stream_id - - def initialize(header=nil) - parse_header(header) if header - @body = '' - end - - def endow(header) - header.inherit(@header) - end - - def <<(data) - @body << data - self - end - - def complete? - bytes_to_fetch <= 0 - end - - def bytes_to_fetch - [@body_length - @body.length, 128].min - end - - def generate - bytes_sent = 0 - header = build_header - while bytes_sent < @body.length - bytes_to_send = [@body.length - bytes_sent, 128].min - header_length = bytes_sent == 0 ? 12 : 1 - yield(header.generate(header_length) + body[bytes_sent, bytes_to_send]) - bytes_sent += bytes_to_send + class RTMP + class Packet + + attr_accessor :body, :oid, :timestamp, :content_type, :stream_id + + def initialize(header=nil) + parse_header(header) if header + @body = '' + end + + def endow(header) + header.inherit(@header) + end + + def <<(data) + @body << data + self + end + + def complete? + bytes_to_fetch <= 0 + end + + def bytes_to_fetch + [@body_length - @body.length, 128].min + end + + def generate + bytes_sent = 0 + header = build_header + while bytes_sent < @body.length + bytes_to_send = [@body.length - bytes_sent, 128].min + header_length = bytes_sent == 0 ? 12 : 1 + yield(header.generate(header_length) + body[bytes_sent, bytes_to_send]) + bytes_sent += bytes_to_send + end + end + + def parse_header(header) + @header = header + @body_length = header.body_length + @oid = header.oid + @timestamp = header.timestamp + @content_type = header.content_type + @stream_id = header.stream_id + end + + def build_header + header = Header.new + header.body_length = @body.length + header.oid = @oid + header.timestamp = @timestamp + header.content_type = @content_type + header.stream_id = @stream_id + header + end end end - - def parse_header(header) - @header = header - @body_length = header.body_length - @oid = header.oid - @timestamp = header.timestamp - @content_type = header.content_type - @stream_id = header.stream_id - end - - def build_header - header = Header.new - header.body_length = @body.length - header.oid = @oid - header.timestamp = @timestamp - header.content_type = @content_type - header.stream_id = @stream_id - header - end - -end -end end diff --git a/lib/net/rtmp/packet/header.rb b/lib/net/rtmp/packet/header.rb index 5c6a634..8b97a99 100644 --- a/lib/net/rtmp/packet/header.rb +++ b/lib/net/rtmp/packet/header.rb @@ -1,58 +1,57 @@ module Net -class RTMP -class Packet -class Header + class RTMP + class Packet + class Header - HEADER_LENGTHS = { - 0b00 => 12, - 0b01 => 8, - 0b10 => 4, - 0b11 => 1 - } + HEADER_LENGTHS = { + 0b00 => 12, + 0b01 => 8, + 0b10 => 4, + 0b11 => 1 + } - attr_accessor :oid, :timestamp, :body_length, :content_type, :stream_id + attr_accessor :oid, :timestamp, :body_length, :content_type, :stream_id - def inherit(previous) - @oid ||= previous.oid - @timestamp ||= previous.timestamp - @body_length ||= previous.body_length - @content_type ||= previous.content_type - @stream_id ||= previous.stream_id - end + def inherit(previous) + @oid ||= previous.oid + @timestamp ||= previous.timestamp + @body_length ||= previous.body_length + @content_type ||= previous.content_type + @stream_id ||= previous.stream_id + end - def body_length - @body_length || 128 - end + def body_length + @body_length || 128 + end - def parse(io) - byte = io.read(1).unpack('C')[0] - header_length = HEADER_LENGTHS[byte >> 6] - @oid = byte & 0b0011_1111 - if header_length >= 4 - @timestamp = ("\x00" + io.read(3)).unpack('N')[0] - end - if header_length >= 8 - @body_length = ("\x00" + io.read(3)).unpack('N')[0] - @content_type = io.read(1).unpack('C')[0] - end - if header_length == 12 - @stream_id = io.read(4).unpack('V')[0] - end - end + def parse(io) + byte = io.read(1).unpack('C')[0] + header_length = HEADER_LENGTHS[byte >> 6] + @oid = byte & 0b0011_1111 + if header_length >= 4 + @timestamp = ("\x00" + io.read(3)).unpack('N')[0] + end + if header_length >= 8 + @body_length = ("\x00" + io.read(3)).unpack('N')[0] + @content_type = io.read(1).unpack('C')[0] + end + if header_length == 12 + @stream_id = io.read(4).unpack('V')[0] + end + end - def generate(length=12) - length_marker = HEADER_LENGTHS.invert[length] << 6 - raw = [length_marker | @oid].pack('C') - if length == 12 - raw << [@timestamp].pack('N')[1,3] - raw << [@body_length].pack('N')[1,3] - raw << [@content_type].pack('C') - raw << [@stream_id].pack('V') + def generate(length=12) + length_marker = HEADER_LENGTHS.invert[length] << 6 + raw = [length_marker | @oid].pack('C') + if length == 12 + raw << [@timestamp].pack('N')[1,3] + raw << [@body_length].pack('N')[1,3] + raw << [@content_type].pack('C') + raw << [@stream_id].pack('V') + end + raw + end + end end - raw end - -end -end -end end diff --git a/test/test_amf.rb b/test/test_amf.rb index 9db6c81..c99c7dd 100644 --- a/test/test_amf.rb +++ b/test/test_amf.rb @@ -87,10 +87,10 @@ def self.to_bytes(array) 65 70 69 73 6f 64 65 2f 62 30 30 63 37 33 66 63 00 00 09 )) - + SAMPLE_2 = to_bytes(%w( 02 00 08 5f 63 68 65 63 6b 62 77 00 00 00 00 00 - 00 00 00 00 05 + 00 00 00 00 05 )) end diff --git a/test/test_connection.rb b/test/test_connection.rb index 260fd32..d21601f 100644 --- a/test/test_connection.rb +++ b/test/test_connection.rb @@ -45,8 +45,8 @@ def test_should_inherit_length_via_oid_even_if_oid_is_recycled 00 03 00 00 00 00 00 00 17 70 43 00 00 00 00 00 01 04 - 99 - c2 00 03 00 00 00 00 00 + 99 + c2 00 03 00 00 00 00 00 00 13 88 00 00 00 ].map{ |c| c.to_i(16) }.pack('C*') + "." * 1000 socket = MockSocket.new(sample)