Permalink
Browse files

No commit message

  • Loading branch information...
1 parent 0cebfc0 commit 6f783e0ee5b79670c9443696521e84f9acb84357 @prekageo committed Jul 8, 2011
Showing with 194 additions and 123 deletions.
  1. 0 __init__.py
  2. +175 โˆ’42 rtmp_protocol.py
  3. +12 โˆ’74 sample_rtmp_client.py
  4. +7 โˆ’7 sample_rtmp_server.py
View
No changes.
View
@@ -6,6 +6,7 @@
import pyamf.amf0
import pyamf.util
import rtmp_protocol_base
+import socket
class FileDataTypeMixIn(pyamf.util.DataTypeMixIn):
"""
@@ -16,10 +17,10 @@ class FileDataTypeMixIn(pyamf.util.DataTypeMixIn):
def __init__(self, fileobject):
self.fileobject = fileobject
pyamf.util.DataTypeMixIn.__init__(self)
-
+
def read(self, length):
return self.fileobject.read(length)
-
+
def write(self, data):
self.fileobject.write(data)
@@ -84,13 +85,13 @@ def next(self):
# and expect this field here.
if header.timestamp >= 0x00ffffff:
self.stream.read_ulong()
- assert(next_header.streamId == -1)
- assert(next_header.datatype == -1)
- assert(next_header.timestamp == -1)
- assert(next_header.bodyLength == -1)
- assert(header.bodyLength == msg_body_len)
+ assert next_header.streamId == -1, (header, next_header)
+ assert next_header.datatype == -1, (header, next_header)
+ assert next_header.timestamp == -1, (header, next_header)
+ assert next_header.bodyLength == -1, (header, next_header)
+ assert header.bodyLength == msg_body_len, (header, msg_body_len)
body_stream = pyamf.util.BufferedByteStream(''.join(message_body))
-
+
# Decode the message based on the datatype present in the header
ret = {'msg':header.datatype}
if ret['msg'] == DataTypes.USER_CONTROL:
@@ -124,8 +125,7 @@ def next(self):
commands.append(decoder.readElement())
ret['command'] = commands
else:
- print 'ERROR: unknown message type %d' % (ret['msg'],)
- assert(False)
+ assert False, header
return ret
@@ -139,42 +139,43 @@ def read_shared_object_event(self, body_stream, decoder):
event = {'type':so_body_type}
if event['type'] == SOEventTypes.USE:
- assert(so_body_size == 0)
+ assert so_body_size == 0, so_body_size
event['data'] = ''
elif event['type'] == SOEventTypes.RELEASE:
- assert(so_body_size == 0)
+ assert so_body_size == 0, so_body_size
event['data'] = ''
elif event['type'] == SOEventTypes.CHANGE:
start_pos = body_stream.tell()
changes = {}
while body_stream.tell() < start_pos + so_body_size:
attrib_name = decoder.readString()
attrib_value = decoder.readElement()
- assert(attrib_name not in changes)
+ assert attrib_name not in changes, (attrib_name,changes.keys())
changes[attrib_name] = attrib_value
- assert(body_stream.tell() == start_pos + so_body_size)
+ assert body_stream.tell() == start_pos + so_body_size,\
+ (body_stream.tell(),start_pos,so_body_size)
event['data'] = changes
elif event['type'] == SOEventTypes.MESSAGE:
start_pos = body_stream.tell()
msg_params = []
while body_stream.tell() < start_pos + so_body_size:
msg_params.append(decoder.readElement())
- assert(body_stream.tell() == start_pos + so_body_size)
+ assert body_stream.tell() == start_pos + so_body_size,\
+ (body_stream.tell(),start_pos,so_body_size)
event['data'] = msg_params
elif event['type'] == SOEventTypes.CLEAR:
- assert(so_body_size == 0)
+ assert so_body_size == 0, so_body_size
event['data'] = ''
elif event['type'] == SOEventTypes.DELETE:
event['data'] = decoder.readString()
elif event['type'] == SOEventTypes.USE_SUCCESS:
- assert(so_body_size == 0)
+ assert so_body_size == 0, so_body_size
event['data'] = ''
else:
- print 'ERROR: unknown SO body type %d' % (event['type'],)
- assert(False)
-
+ assert False, event['type']
+
return event
-
+
class RtmpWriter:
""" This class writes RTMP messages into a stream. """
@@ -186,14 +187,14 @@ def __init__(self, stream):
stream.
"""
self.stream = stream
-
+
def flush(self):
""" Flush the underlying stream. """
self.stream.flush()
def write(self, message):
""" Encode and write the specified message into the stream. """
- datatype = message['msg']
+ datatype = message['msg']
body_stream = pyamf.util.BufferedByteStream()
encoder = pyamf.amf0.Encoder(body_stream)
@@ -212,39 +213,37 @@ def write(self, message):
encoder.writeString(message['obj_name'],writeType=False)
body_stream.write_ulong(message['curr_version'])
body_stream.write(message['flags'])
-
+
for event in message['events']:
self.write_shared_object_event(event, body_stream)
else:
- print 'ERROR: unknown message type %d' % (datatype,)
- assert(False)
+ assert False, message
self.send_msg(datatype, body_stream.getvalue())
-
+
def write_shared_object_event(self, event, body_stream):
"""
Helper method that writes one shared object inside a shared object RTMP
message.
"""
-
+
inner_stream = pyamf.util.BufferedByteStream()
encoder = pyamf.amf0.Encoder(inner_stream)
event_type = event['type']
if event_type == SOEventTypes.USE:
- assert(event['data'] == '')
+ assert event['data'] == '', event['data']
elif event_type == SOEventTypes.CHANGE:
for attrib_name in event['data']:
attrib_value = event['data'][attrib_name]
encoder.writeString(attrib_name,writeType=False)
encoder.writeElement(attrib_value)
elif event['type'] == SOEventTypes.CLEAR:
- assert(event['data'] == '')
+ assert event['data'] == '', event['data']
elif event['type'] == SOEventTypes.USE_SUCCESS:
- assert(event['data'] == '')
+ assert event['data'] == '', event['data']
else:
- print 'ERROR: unknown SO body type %d' % (event_type,)
- assert(False)
+ assert False, event
body_stream.write_uchar(event_type)
body_stream.write_ulong(len(inner_stream))
@@ -256,7 +255,7 @@ def send_msg(self, datatype, body):
care to prepend the necessary headers and split the message into
appropriately sized chunks.
"""
-
+
# Values that just work. :-)
if datatype >= 1 and datatype <= 7:
channel_id = 2
@@ -273,7 +272,7 @@ def send_msg(self, datatype, body):
bodyLength=len(body),
timestamp=timestamp)
rtmp_protocol_base.header_encode(self.stream, header)
-
+
for i in xrange(0,len(body),self.chunk_size):
chunk = body[i:i+self.chunk_size]
self.stream.write(chunk)
@@ -315,7 +314,7 @@ def use(self, reader, writer):
}
writer.write(msg)
writer.flush()
-
+
def handle_message(self, message):
"""
Handle an incoming RTMP message. Check if it is of any relevance for the
@@ -326,27 +325,161 @@ def handle_message(self, message):
events = message['events']
if not self.use_success:
- assert(events[0]['type'] == SOEventTypes.USE_SUCCESS)
- assert(events[1]['type'] == SOEventTypes.CLEAR)
+ assert events[0]['type'] == SOEventTypes.USE_SUCCESS, events[0]
+ assert events[1]['type'] == SOEventTypes.CLEAR, events[1]
events = events[2:]
self.use_success = True
self.handle_events(events)
return True
else:
return False
-
+
def handle_events(self, events):
""" Handle SO events that target the specific SO. """
for event in events:
event_type = event['type']
if event_type == SOEventTypes.CHANGE:
for key in event['data']:
self.data[key] = event['data'][key]
+ self.on_change(key)
elif event_type == SOEventTypes.DELETE:
key = event['data']
- assert(key in self.data)
+ assert key in self.data, (key,self.data.keys())
del self.data[key]
+ self.on_delete(key)
+ elif event_type == SOEventTypes.MESSAGE:
+ self.on_message(event['data'])
else:
- print 'ERROR: unknown event %d' % (event_type,)
- assert(False)
+ assert False, event
+
+ def on_change(self, key):
+ pass
+
+ def on_delete(self, key):
+ pass
+
+ def on_message(self, data):
+ pass
+
+class RtmpClient:
+ """ Represents an RTMP client. """
+
+ def __init__(self, ip, port, tc_url, page_url, swf_url):
+ """ Initialize a new RTMP client. """
+ self.ip = ip
+ self.port = port
+ self.tc_url = tc_url
+ self.page_url = page_url
+ self.swf_url = swf_url
+ self.shared_objects = []
+
+ def handshake(self):
+ """ Perform the handshake sequence with the server. """
+ self.stream.write_uchar(3)
+ c1 = rtmp_protocol_base.Packet()
+ c1.first = 0
+ c1.second = 0
+ c1.payload = 'x'*1528
+ c1.encode(self.stream)
+ self.stream.flush()
+
+ self.stream.read_uchar()
+ s1 = rtmp_protocol_base.Packet()
+ s1.decode(self.stream)
+
+ c2 = rtmp_protocol_base.Packet()
+ c2.first = s1.first
+ c2.second = s1.second
+ c2.payload = s1.payload
+ c2.encode(self.stream)
+ self.stream.flush()
+
+ s2 = rtmp_protocol_base.Packet()
+ s2.decode(self.stream)
+
+ def connect_rtmp(self, connect_params):
+ """ Initiate a NetConnection with a Flash Media Server. """
+ msg = {
+ 'msg': DataTypes.COMMAND,
+ 'command':
+ [
+ u'connect',
+ 1,
+ {
+ 'videoCodecs': 252,
+ 'audioCodecs': 3191,
+ 'flashVer': u'WIN 10,1,85,3',
+ 'app': u'zoo_chat',
+ 'tcUrl': self.tc_url,
+ 'videoFunction': 1,
+ 'capabilities': 239,
+ 'pageUrl': self.page_url,
+ 'fpad': False,
+ 'swfUrl': self.swf_url,
+ 'objectEncoding': 0
+ }
+ ]
+ }
+ msg['command'].extend(connect_params)
+ self.writer.write(msg)
+ self.writer.flush()
+
+ while True:
+ msg = self.reader.next()
+ if self.handle_message_pre_connect(msg):
+ break
+
+ def handle_message_pre_connect(self, msg):
+ """ Handle messages arriving before the connection is established. """
+ if msg['msg'] == DataTypes.COMMAND:
+ assert msg['command'][0] == '_result', msg
+ assert msg['command'][1] == 1, msg
+ assert msg['command'][3]['code'] == \
+ 'NetConnection.Connect.Success', msg
+ return True
+ elif msg['msg'] == DataTypes.WINDOW_ACK_SIZE:
+ assert msg['window_ack_size'] == 2500000, msg
+ elif msg['msg'] == DataTypes.SET_PEER_BANDWIDTH:
+ assert msg['window_ack_size'] == 2500000, msg
+ assert msg['limit_type'] == 2, msg
+ elif msg['msg'] == DataTypes.USER_CONTROL:
+ assert msg['event_type'] == 0, msg
+ assert msg['event_data'] == '\x00\x00\x00\x00', msg
+ else:
+ assert False, msg
+
+ return False
+
+ def connect(self, connect_params):
+ """ Connect to the server with the given connect parameters. """
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.connect((self.ip, self.port))
+ self.file = self.socket.makefile()
+ self.stream = FileDataTypeMixIn(self.file)
+
+ self.handshake()
+
+ self.reader = RtmpReader(self.stream)
+ self.writer = RtmpWriter(self.stream)
+
+ self.connect_rtmp(connect_params)
+
+ def shared_object_use(self, so):
+ """ Use a shared object and add it to the managed list of SOs. """
+ if so in self.shared_objects:
+ return
+ so.use(self.reader, self.writer)
+ self.shared_objects.append(so)
+
+ def handle_messages(self):
+ """ Start the message handling loop. """
+ while True:
+ msg = self.reader.next()
+ handled = False
+ for so in self.shared_objects:
+ if so.handle_message(msg):
+ handled = True
+ break
+ if not handled:
+ assert False, msg
Oops, something went wrong.

0 comments on commit 6f783e0

Please sign in to comment.