Permalink
Browse files

Had to refactor whole system of handling timestamps in RTMP protocol

  • Loading branch information...
maxlapshin committed Nov 23, 2009
1 parent 76bc923 commit 6dbee2e310955416ccbc193e89e8312315767a77
Showing with 47 additions and 18 deletions.
  1. +1 −0 include/ems.hrl
  2. +1 −1 src/media/stream_media.erl
  3. +45 −17 src/rtmp.erl
View
@@ -215,6 +215,7 @@
-record(channel,{
id = undefined,
timestamp = undefined,
+ delta = undefined,
length = undefined,
type = undefined,
stream = undefined,
@@ -120,7 +120,7 @@ handle_call({set_owner, _Owner}, _From, #media_info{owner = Owner} = MediaInfo)
handle_call({publish, Channel}, _From, #media_info{device = Device, clients = Clients} = Recorder) ->
Tag = ems_flv:to_tag(Channel),
- ?D({"Record",Channel#channel.id, Channel#channel.type,Channel#channel.timestamp}),
+ ?D({"Record",Channel#channel.id, Channel#channel.type,size(Channel#channel.msg), Channel#channel.delta, Channel#channel.timestamp}),
case Device of
undefined -> ok;
_ -> file:write(Device, Tag)
View
@@ -110,49 +110,70 @@ decode_channel_id(#rtmp_client{buff = <<Format:2, Id:6,Rest/binary>>} = State) -
% Now extracting channel header
decode_channel_header(Rest, ?RTMP_HDR_CONTINUE, Id, State) ->
- {value, Channel} = lists:keysearch(Id, #channel.id, State#rtmp_client.channels),
- decode_channel(Channel, Rest, State);
+ {value, #channel{msg = Msg, timestamp = Timestamp, delta = Delta} = Channel} = lists:keysearch(Id, #channel.id, State#rtmp_client.channels),
+ Channel1 = case size(Msg) of
+ 0 -> Channel#channel{timestamp = Timestamp + Delta};
+ _ -> Channel
+ end,
+ % case Channel#channel.type of
+ % 8 -> ?D({" Continue", Id, size(Msg), size(Rest), Channel1#channel.length, Delta, Channel1#channel.timestamp});
+ % _ -> ok
+ % end,
+ decode_channel(Channel1, Rest, State);
decode_channel_header(<<16#ffffff:24, TimeStamp:24, Rest/binary>>, ?RTMP_HDR_TS_CHG, Id, State) ->
{value, Channel} = lists:keysearch(Id, #channel.id, State#rtmp_client.channels),
- decode_channel(Channel#channel{timestamp = TimeStamp+16#ffffff}, Rest, State);
+ decode_channel(Channel#channel{timestamp = TimeStamp+16#ffffff, delta = undefined}, Rest, State);
decode_channel_header(<<Delta:24, Rest/binary>>, ?RTMP_HDR_TS_CHG, Id, State) ->
- {value, #channel{timestamp = TimeStamp, type = Type} = Channel} = lists:keysearch(Id, #channel.id, State#rtmp_client.channels),
- decode_channel(Channel#channel{timestamp = TimeStamp + Delta}, Rest, State);
+ {value, #channel{timestamp = TimeStamp} = Channel} = lists:keysearch(Id, #channel.id, State#rtmp_client.channels),
+ % case Channel#channel.type of
+ % 8 -> ?D({" TSDelta", Id, size(Channel#channel.msg), size(Rest), Channel#channel.length, Delta, TimeStamp+Delta});
+ % _ -> ok
+ % end,
+ decode_channel(Channel#channel{timestamp = TimeStamp + Delta, delta = Delta}, Rest, State);
decode_channel_header(<<16#ffffff:24,Length:24,Type:8,TimeStamp:24,Rest/binary>>, ?RTMP_HDR_SAME_SRC, Id, State) ->
{value, Channel} = lists:keysearch(Id, #channel.id, State#rtmp_client.channels),
- decode_channel(Channel#channel{timestamp=TimeStamp+16#ffffff,length=Length,type=Type},Rest,State);
+ decode_channel(Channel#channel{timestamp=TimeStamp+16#ffffff, delta = undefined, length=Length,type=Type},Rest,State);
decode_channel_header(<<Delta:24,Length:24,Type:8,Rest/binary>>, ?RTMP_HDR_SAME_SRC, Id, State) ->
{value, #channel{timestamp = TimeStamp} = Channel} = lists:keysearch(Id, #channel.id, State#rtmp_client.channels),
- decode_channel(Channel#channel{timestamp=TimeStamp + Delta,length=Length,type=Type},Rest,State);
+ % case Channel#channel.type of
+ % 8 -> ?D({" NewTS", Id, size(Channel#channel.msg), Length, Channel#channel.length, Delta, TimeStamp+Delta});
+ % _ -> ok
+ % end,
+ decode_channel(Channel#channel{timestamp=TimeStamp + Delta, delta = Delta, length=Length,type=Type},Rest,State);
decode_channel_header(<<16#ffffff:24,Length:24,Type:8,StreamId:32/little,TimeStamp:24,Rest/binary>>,?RTMP_HDR_NEW,Id, State) ->
case lists:keysearch(Id, #channel.id, State#rtmp_client.channels) of
{value, Channel} -> ok;
_ -> Channel = #channel{}
end,
- decode_channel(Channel#channel{id=Id,timestamp=TimeStamp+16#ffffff,length=Length,type=Type,stream=StreamId},Rest,State);
+ decode_channel(Channel#channel{id=Id,timestamp=TimeStamp+16#ffffff,delta = undefined, length=Length,type=Type,stream=StreamId},Rest,State);
decode_channel_header(<<TimeStamp:24,Length:24,Type:8,StreamId:32/little,Rest/binary>>,?RTMP_HDR_NEW,Id, State) ->
case lists:keysearch(Id, #channel.id, State#rtmp_client.channels) of
{value, Channel} -> ok;
_ -> Channel = #channel{}
end,
- decode_channel(Channel#channel{id=Id,timestamp=TimeStamp,length=Length,type=Type,stream=StreamId},Rest,State);
+ % case Type of
+ % 8 -> ?D({" New", Id, Type, 0, TimeStamp});
+ % _ -> ok
+ % end,
+ decode_channel(Channel#channel{id=Id,timestamp=TimeStamp,delta = undefined, length=Length,type=Type,stream=StreamId},Rest,State);
decode_channel_header(_Rest,_Type, _Id, State) -> % Still small buffer
State.
% Now trying to fill channel with required data
-bytes_for_channel(#channel{length = Length, msg = Msg}, #rtmp_client{client_chunk_size = ChunkSize}) ->
- RemainingBytes = Length - size(Msg),
- if
- RemainingBytes < ChunkSize -> RemainingBytes;
- true -> ChunkSize
- end.
+bytes_for_channel(#channel{length = Length, msg = Msg}, _) when size(Msg) == Length ->
+ 0;
+
+bytes_for_channel(#channel{length = Length, msg = Msg}, #rtmp_client{client_chunk_size = ChunkSize}) when Length - size(Msg) < ChunkSize ->
+ Length - size(Msg);
+
+bytes_for_channel(_, #rtmp_client{client_chunk_size = ChunkSize}) -> ChunkSize.
decode_channel(Channel, Data, State) ->
@@ -162,14 +183,16 @@ decode_channel(Channel, Data, State) ->
% Nothing to do when buffer is small
-push_channel_packet(#channel{} = _Channel, Data, State, BytesRequired) when size(Data) < BytesRequired ->
+push_channel_packet(#channel{} = _Channel, Data, State, BytesRequired) when size(Data) < BytesRequired ->
State;
% And decode channel when bytes required are in buffer
push_channel_packet(#channel{msg = Msg} = Channel, Data, State, BytesRequired) ->
<<Chunk:BytesRequired/binary, Rest/binary>> = Data,
decode_channel_packet(Channel#channel{msg = <<Msg/binary, Chunk/binary>>}, State#rtmp_client{buff = Rest}).
+
+
% When chunked packet hasn't arived, just accumulate it
decode_channel_packet(#channel{msg = Msg, length = Length} = Channel, #rtmp_client{channels = Channels} = State) when size(Msg) < Length ->
NextChannelList = lists:keystore(Channel#channel.id, #channel.id, Channels, Channel),
@@ -192,7 +215,7 @@ command(#channel{type = ?RTMP_TYPE_WINDOW_ACK_SIZE, msg = <<WindowSize:32/big-in
State;
command(#channel{type = ?RTMP_TYPE_CHUNK_SIZE, msg = <<ChunkSize:32/big-integer>>} = _Channel, State) ->
- % ?D({"Change Chunk Size",Channel,ChunkSize}),
+ ?D({"Change Chunk Size",ChunkSize}),
State#rtmp_client{client_chunk_size = ChunkSize};
command(#channel{type = ?RTMP_TYPE_CONTROL, msg = <<?RTMP_CONTROL_STREAM_PONG:16/big-integer, _Timestamp:32/big-integer>>}, State) ->
@@ -218,6 +241,11 @@ command(#channel{type = ?RTMP_TYPE_CONTROL, msg = <<EventType:16/big-integer, _/
State;
+command(#channel{type = Type, delta = 0}, State)
+ when (Type =:= ?RTMP_TYPE_AUDIO) or (Type =:= ?RTMP_TYPE_VIDEO) or (Type =:= ?RTMP_TYPE_METADATA_AMF0) ->
+ ?D({"Throw away garbage audio"}),
+ State;
+
command(#channel{type = Type} = Channel, State)
when (Type =:= ?RTMP_TYPE_AUDIO) or (Type =:= ?RTMP_TYPE_VIDEO) or (Type =:= ?RTMP_TYPE_METADATA_AMF0) ->
% ?D({"Recording",Type}),

0 comments on commit 6dbee2e

Please sign in to comment.