Permalink
Browse files

main: Improve RTMP protocol [PERFECTIVE]

  • Loading branch information...
1 parent 59d04bc commit 990792a24ae5c364f7325948fce1a4cfefc17af5 Janne Kulmala committed May 31, 2012
Showing with 55 additions and 9 deletions.
  1. +55 −9 main.cc
View
@@ -85,6 +85,11 @@ size_t send_all(int fd, const void *buf, size_t len)
return pos;
}
+bool is_safe(uint8_t b)
+{
+ return b >= ' ' && b < 128;
+}
+
void hexdump(const void *buf, size_t len)
{
const uint8_t *data = (const uint8_t *) buf;
@@ -97,7 +102,7 @@ void hexdump(const void *buf, size_t len)
}
for (int j = 0; j < 16; ++j) {
if (i + j < len) {
- putc((data[i + j] >= ' ') ? data[i + j] : '.',
+ putc(is_safe(data[i + j]) ? data[i + j] : '.',
stdout);
} else {
putc(' ', stdout);
@@ -192,6 +197,7 @@ void handle_fcpublish(Client *client, const RTMP_Message *msg, Decoder *dec)
Encoder invoke;
amf_write(&invoke, std::string("onFCPublish"));
amf_write(&invoke, 0.0);
+ amf_write_null(&invoke);
amf_write(&invoke, status);
rtmp_send(client, MSG_INVOKE, CONTROL_ID, invoke.buf);
@@ -231,12 +237,15 @@ void handle_publish(Client *client, const RTMP_Message *msg, Decoder *dec)
debug("publish %s\n", path.c_str());
amf_object_t status;
+ status.insert(std::make_pair("level", std::string("status")));
status.insert(std::make_pair("code", std::string("NetStream.Publish.Start")));
status.insert(std::make_pair("description", std::string("Stream is now published.")));
+ status.insert(std::make_pair("details", path));
Encoder invoke;
amf_write(&invoke, std::string("onStatus"));
amf_write(&invoke, 0.0);
+ amf_write_null(&invoke);
amf_write(&invoke, status);
rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf);
@@ -264,15 +273,37 @@ void handle_play(Client *client, const RTMP_Message *msg, Decoder *dec)
debug("play %s\n", path.c_str());
amf_object_t status;
- status.insert(std::make_pair("code", std::string("NetStream.Play.Start")));
- status.insert(std::make_pair("description", std::string("Stream is now playing.")));
+ status.insert(std::make_pair("level", std::string("status")));
+ status.insert(std::make_pair("code", std::string("NetStream.Play.Reset")));
+ status.insert(std::make_pair("description", std::string("Resetting and playing stream.")));
+ status.insert(std::make_pair("details", path));
Encoder invoke;
amf_write(&invoke, std::string("onStatus"));
amf_write(&invoke, 0.0);
+ amf_write_null(&invoke);
amf_write(&invoke, status);
rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf);
+ status.clear();
+ status.insert(std::make_pair("level", std::string("status")));
+ status.insert(std::make_pair("code", std::string("NetStream.Play.Start")));
+ status.insert(std::make_pair("description", std::string("Started playing.")));
+ status.insert(std::make_pair("details", path));
+
+ invoke.buf.clear();
+ amf_write(&invoke, std::string("onStatus"));
+ amf_write(&invoke, 0.0);
+ amf_write_null(&invoke);
+ amf_write(&invoke, status);
+ rtmp_send(client, MSG_INVOKE, STREAM_ID, invoke.buf);
+
+ invoke.buf.clear();
+ amf_write(&invoke, std::string("|RtmpSampleAccess"));
+ amf_write(&invoke, true);
+ amf_write(&invoke, true);
+ rtmp_send(client, MSG_NOTIFY, STREAM_ID, invoke.buf);
+
if (txid > 0) {
Encoder reply;
amf_write(&reply, std::string("_result"));
@@ -284,10 +315,12 @@ void handle_play(Client *client, const RTMP_Message *msg, Decoder *dec)
client->playing = true;
- Encoder notify;
- amf_write(&notify, std::string("onMetaData"));
- amf_write_ecma(&notify, metadata);
- rtmp_send(client, MSG_NOTIFY, STREAM_ID, notify.buf);
+ if (publisher != NULL) {
+ Encoder notify;
+ amf_write(&notify, std::string("onMetaData"));
+ amf_write_ecma(&notify, metadata);
+ rtmp_send(client, MSG_NOTIFY, STREAM_ID, notify.buf);
+ }
}
void handle_setdataframe(Client *client, const RTMP_Message *msg, Decoder *dec)
@@ -309,7 +342,7 @@ void handle_setdataframe(Client *client, const RTMP_Message *msg, Decoder *dec)
FOR_EACH(std::vector<Client *>, i, clients) {
Client *client = *i;
- if (client != NULL && client->ready) {
+ if (client != NULL && client->playing) {
rtmp_send(client, MSG_NOTIFY, STREAM_ID, notify.buf);
}
}
@@ -407,7 +440,14 @@ void handle_message(Client *client, const RTMP_Message *msg)
FOR_EACH(std::vector<Client *>, i, clients) {
Client *receiver = *i;
if (receiver != NULL && receiver->playing) {
- if (flags >> 4 == FLV_KEY_FRAME) {
+ if (flags >> 4 == FLV_KEY_FRAME &&
+ !receiver->ready) {
+ std::string control;
+ uint16_t type = htons(CONTROL_CLEAR_STREAM);
+ control.append((char *) &type, 2);
+ uint32_t stream = htonl(STREAM_ID);
+ control.append((char *) &stream, 4);
+ rtmp_send(receiver, MSG_USER_CONTROL, CONTROL_ID, control);
receiver->ready = true;
}
if (receiver->ready) {
@@ -586,6 +626,12 @@ void close_client(Client *client, size_t i)
if (client == publisher) {
printf("publisher disconnected.\n");
publisher = NULL;
+ FOR_EACH(std::vector<Client *>, i, clients) {
+ Client *client = *i;
+ if (client != NULL) {
+ client->ready = false;
+ }
+ }
}
delete client;

0 comments on commit 990792a

Please sign in to comment.