Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Back working MPEG TS after media refactoring

  • Loading branch information...
commit 11b662bdba612a2ef28690940e25359956c96729 1 parent 096ef12
@maxlapshin maxlapshin authored
View
36 src/formats/ts_lander.erl
@@ -74,25 +74,6 @@
-export([pat/4, pmt/4, pes/1]).
-% start() ->
-% Pid = start(?HOST, ?PORT, ?PATH),
-% %VideoPesExPid = start_pes_extractor(start_pes_writer("video.264")),
-% VideoPesExPid = start_pes_extractor(spawn(?MODULE, nal_receiver, [[]])),
-% %AudioPesExPid = start_pes_extractor(start_pes_writer("audio.aac")),
-% Pid ! {demuxer, {subscribe, 69, VideoPesExPid}},
-% %Pid ! {demuxer, {subscribe, 68, AudioPesExPid}},
-% %DtsCounterPid = start_dts_counter(),
-% %VideoPesExPid = start_pes_extractor(DtsCounterPid),
-% %AudioPesExPid = start_pes_extractor(DtsCounterPid),
-% %Pid ! {demuxer, {subscribe, video, fun(P) -> P == 101 end, VideoPesExPid}},
-% %Pid ! {demuxer, {subscribe, audio, fun(P) -> P == 100 end, AudioPesExPid}},
-%
-% %SubtitlesPesExPid = start_pes_extractor(start_pes_writer("subs.bin")),
-% %Pid ! {demuxer, {subscribe, 66, SubtitlesPesExPid}},
-%
-% Pid.
-
-
% {ok, Socket} = gen_tcp:connect("ya.ru", 80, [binary, {packet, http_bin}, {active, false}], 1000),
% gen_tcp:send(Socket, "GET / HTTP/1.0\r\n\r\n"),
% {ok, Reply} = gen_tcp:recv(Socket, 0, 1000),
@@ -113,7 +94,6 @@ init([URL, Consumer]) ->
gen_tcp:send(Socket, "GET "++Path++"?"++Query++" HTTP/1.0\r\n\r\n"),
ok = inet:setopts(Socket, [{active, once}]),
- timer:send_after(10000, {stop}),
timer:send_after(3000, {byte_count}),
{ok, #ts_lander{socket = Socket, url = URL, consumer = Consumer, pids = [#stream{pid = 0, handler = pat}]}}.
@@ -351,7 +331,7 @@ pes(#stream{synced = true, pid = Pid, ts_buffer = Buf} = Stream) ->
pes_packet(_, #stream{type = unhandled} = Stream, _) -> Stream#stream{ts_buffer = []};
-pes_packet(<<1:24, _:5/binary, Length, _PESHeader:Length/binary, _/binary>> = Packet, #stream{type = audio} = Stream, Header) ->
+pes_packet(<<1:24, _:5/binary, Length, _PESHeader:Length/binary, Data/binary>> = Packet, #stream{type = audio, es_buffer = Buffer} = Stream, Header) ->
Stream1 = stream_timestamp(Packet, Stream, Header),
% ?D({"Audio", Stream1#stream.timestamp}),
Stream1;
@@ -360,7 +340,7 @@ pes_packet(<<1:24, _:5/binary, Length, _PESHeader:Length/binary, _/binary>> = Pa
pes_packet(<<1:24, _:5/binary, PESHeaderLength, _PESHeader:PESHeaderLength/binary, Rest/binary>> = Packet, #stream{es_buffer = Buffer, type = video} = Stream, Header) ->
% ?D({"Timestamp1", Stream#stream.timestamp, Stream#stream.start_time}),
Stream1 = stream_timestamp(Packet, Stream, Header),
- ?D({"Video", Stream1#stream.timestamp}),
+ % ?D({"Video", Stream1#stream.timestamp}),
decode_avc(Stream1#stream{es_buffer = <<Buffer/binary, Rest/binary>>}).
@@ -412,7 +392,7 @@ decode_aac(#stream{send_audio_config = false, consumer = Consumer} = Stream) ->
sound_rate = ?FLV_AUDIO_RATE_44,
raw_body = false
},
- ems_play:send(Consumer, AudioConfig),
+ Consumer ! {audio_config, AudioConfig},
?D({"Send audio config", AudioConfig}),
decode_aac(Stream#stream{send_audio_config = true});
@@ -430,7 +410,7 @@ decode_aac(#stream{es_buffer = <<_Syncword:12, _ID:1, _Layer:2, _ProtectionAbsen
send_aac(Stream#stream{es_buffer = Rest}).
send_aac(#stream{es_buffer = Data, consumer = Consumer, timestamp = Timestamp} = Stream) ->
- ?D({"Audio", Stream#stream.timestamp}),
+ % ?D({"Audio", Stream#stream.timestamp}),
AudioFrame = #video_frame{
type = ?FLV_TAG_TYPE_AUDIO,
timestamp_abs = Timestamp,
@@ -442,7 +422,7 @@ send_aac(#stream{es_buffer = Data, consumer = Consumer, timestamp = Timestamp} =
sound_rate = ?FLV_AUDIO_RATE_44,
raw_body = false
},
- ems_play:send(Consumer, AudioFrame),
+ Consumer ! {audio, AudioFrame},
Stream#stream{es_buffer = <<>>}.
@@ -486,7 +466,7 @@ send_video_config(#stream{consumer = Consumer} = Stream) ->
streamid = 1
},
?D({"Send decoder config to", Consumer}),
- ems_play:send(Consumer, VideoFrame),
+ Consumer ! {video_config, VideoFrame},
Stream#stream{video_config = DecoderConfig}
end.
@@ -543,7 +523,7 @@ decode_nal(<<0:1, _NalRefIdc:2, 1:5, _/binary>> = Data, #stream{timestamp = Time
streamid = 1
},
% ?D({"Send slice to", TimeStamp, Consumer}),
- ems_play:send(Consumer, VideoFrame),
+ Consumer ! {video, VideoFrame},
Stream;
@@ -574,7 +554,7 @@ decode_nal(<<0:1, _NalRefIdc:2, 5:5, _/binary>> = Data, #stream{timestamp = Time
streamid = 1
},
% ?D({"Send keyframe to", Consumer}),
- ems_play:send(Consumer, VideoFrame),
+ Consumer ! {video, VideoFrame},
Stream;
decode_nal(<<0:1, _NalRefIdc:2, 8:5, _/binary>> = PPS, Stream) ->
View
3  src/media/file_media.erl
@@ -64,9 +64,6 @@ handle_call({create_player, Options}, _From, #media_info{file_name = Name, clien
{reply, {ok, Pid}, MediaInfo};
-handle_call({first}, _From, #media_info{frames = FrameTable} = MediaInfo) ->
- {reply, ets:first(FrameTable), MediaInfo};
-
handle_call({codec_config, Type}, _From, #media_info{format = FileFormat} = MediaInfo) ->
{reply, FileFormat:codec_config(Type, MediaInfo), MediaInfo};
View
2  src/media/file_play.erl
@@ -154,7 +154,7 @@ play(#file_player{media_info = MediaInfo, pos = Key} = Player) ->
send_frame(#file_player{consumer = Consumer} = Player, {ok, done}) ->
?D("Video file finished"),
gen_fsm:send_event(Consumer, {status, ?NS_PLAY_COMPLETE, 1}),
- ?MODULE:ready(Player);
+ ?MODULE:ready(Player#file_player{sent_video_config = false, sent_audio_config = false, ts_prev = 0, pos = undefined});
send_frame(#file_player{consumer = Consumer, stream_id = StreamId} = Player, {ok, #video_frame{nextpos = NextPos} = Frame}) ->
% ?D({"Frame", Key, Frame#video_frame.timestamp_abs, NextPos}),
View
4 src/media/media_provider.erl
@@ -115,7 +115,7 @@ open_media_entry(Name, #media_provider{opened_media = OpenedMedia} = MediaProvid
ets:insert(OpenedMedia, #media_entry{name = Name, handler = Pid}),
?D({"Opened", Type, Name, Pid}),
Pid;
- ignore ->
+ _ ->
?D({"Error opening", Type, Name}),
{notfound, "Failed to open "++Name}
end;
@@ -127,7 +127,7 @@ detect_type(Name) ->
detect_mpeg_ts(Name).
detect_mpeg_ts(Name) ->
- {ok, Re} = re:compile("http://(.*).ts"),
+ {ok, Re} = re:compile("http://(.*)"),
case re:run(Name, Re) of
{match, _Captured} -> mpeg_ts;
_ -> detect_file(Name)
View
244 src/media/stream_media.erl
@@ -0,0 +1,244 @@
+% Media entry is instance of some resource
+
+-module(stream_media).
+-author(max@maxidoors.ru).
+-include("../include/ems.hrl").
+-include("../include/media_info.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-behaviour(gen_server).
+
+%% External API
+-export([start_link/2, codec_config/2, metadata/1, publish/2, set_owner/2]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+
+start_link(Path, Type) ->
+ gen_server:start_link(?MODULE, [Path, Type], []).
+
+metadata(Server) ->
+ gen_server:call(Server, {metadata}).
+
+codec_config(MediaEntry, Type) -> gen_server:call(MediaEntry, {codec_config, Type}).
+
+publish(Server, Frame) ->
+ gen_server:call(Server, {publish, Frame}).
+
+set_owner(Server, Owner) ->
+ gen_server:call(Server, {set_owner, Owner}).
+
+
+init([URL, mpeg_ts]) ->
+ process_flag(trap_exit, true),
+ error_logger:info_msg("HTTP MPEG TS ~p~n", [URL]),
+ Clients = ets:new(clients, [set, private]),
+ % Header = flv:header(#flv_header{version = 1, audio = 1, video = 1}),
+ {ok, Device} = ems_sup:start_ts_lander(URL, self()),
+ link(Device),
+ Recorder = #media_info{type=mpeg_ts, file_name = URL, ts_prev = 0, clients = Clients, device = Device},
+ {ok, Recorder, ?TIMEOUT};
+
+
+init([Name, live]) ->
+ process_flag(trap_exit, true),
+ error_logger:info_msg("Live streaming stream ~p~n", [Name]),
+ Clients = ets:new(clients, [set, private]),
+ % Header = flv:header(#flv_header{version = 1, audio = 1, video = 1}),
+ Recorder = #media_info{type=live, ts_prev = 0, clients = Clients},
+ {ok, Recorder, ?TIMEOUT};
+
+
+init([Name, record]) ->
+ process_flag(trap_exit, true),
+ error_logger:info_msg("Recording stream ~p~n", [Name]),
+ Clients = ets:new(clients, [set, private]),
+ FileName = filename:join([file_play:file_dir(), Name ++ ".flv"]),
+ (catch file:delete(FileName)),
+ Header = flv:header(#flv_header{version = 1, audio = 1, video = 1}),
+ ?D({"Recording to file", FileName}),
+ case file:open(FileName, [write, {delayed_write, 1024, 50}]) of
+ {ok, Device} ->
+ file:write(Device, Header),
+ Recorder = #media_info{type=record, device = Device, file_name = FileName, ts_prev = 0, clients = Clients},
+ {ok, Recorder, ?TIMEOUT};
+ _Error ->
+ ignore
+ end.
+
+
+
+%%-------------------------------------------------------------------------
+%% @spec (Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @doc Callback for synchronous server calls. If `{stop, ...}' tuple
+%% is returned, the server is stopped and `terminate/2' is called.
+%% @end
+%% @private
+%%-------------------------------------------------------------------------
+
+handle_call({create_player, Options}, {Caller, _Ref},
+ #media_info{file_name = Name, clients = Clients, video_decoder_config = Video, audio_decoder_config = Audio} = MediaInfo) ->
+ % {ok, Pid} = stream_play:start(self(), Options),
+ Pid = proplists:get_value(consumer, Options, Caller),
+ ets:insert(Clients, {Pid}),
+ link(Pid),
+ ?D({"Creating media player for", Name, "client", Pid}),
+ ems_play:send(Pid, Video),
+ ems_play:send(Pid, Audio),
+ {reply, {ok, self()}, MediaInfo};
+
+handle_call({codec_config, video}, _From, #media_info{video_decoder_config = Config} = MediaInfo) ->
+ {reply, Config, MediaInfo};
+
+handle_call({codec_config, audio}, _From, #media_info{audio_decoder_config = Config} = MediaInfo) ->
+ {reply, Config, MediaInfo};
+
+handle_call({metadata}, _From, MediaInfo) ->
+ {reply, undefined, MediaInfo};
+
+handle_call({set_owner, Owner}, _From, #media_info{owner = undefined} = MediaInfo) ->
+ ?D({"Setting owner to", Owner}),
+ {reply, ok, MediaInfo#media_info{owner = Owner}};
+
+handle_call({set_owner, _Owner}, _From, #media_info{owner = Owner} = MediaInfo) ->
+ {reply, {error, {owner_exists, Owner}}, MediaInfo};
+
+
+handle_call({publish, Channel}, _From, #media_info{ts_prev = PrevTs, device = Device, clients = Clients} = Recorder) ->
+ % ?D({"Record",Channel#channel.id, Channel#channel.type,size(Channel#channel.msg),Channel#channel.timestamp,PrevTs}),
+ {Tag,NextTimeStamp} = ems_flv:to_tag(Channel,PrevTs),
+ case Device of
+ undefined -> ok;
+ _ -> file:write(Device, Tag)
+ end,
+
+ NextTimeStamp = PrevTs + Channel#channel.timestamp,
+ % ?D({"Broadcast",Channel#channel.id,Channel#channel.type,size(Channel#channel.msg),NextTimeStamp}),
+ Packet = Channel#channel{id = ems_play:channel_id(Channel#channel.type,1), timestamp = NextTimeStamp},
+ % ?D({"Broadcast to", ets:info(Clients, size)}),
+ ets:foldl(fun send_packet/2, Packet, Clients),
+
+ {reply, ok, Recorder#media_info{ts_prev = NextTimeStamp}};
+
+handle_call(Request, _From, State) ->
+ ?D({"Undefined call", Request, _From}),
+ {stop, {unknown_call, Request}, State}.
+
+%%-------------------------------------------------------------------------
+%% @spec (Msg, State) ->{noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @doc Callback for asyncrous server calls. If `{stop, ...}' tuple
+%% is returned, the server is stopped and `terminate/2' is called.
+%% @end
+%% @private
+%%-------------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ ?D({"Undefined cast", _Msg}),
+ {noreply, State}.
+
+%%-------------------------------------------------------------------------
+%% @spec (Msg, State) ->{noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @doc Callback for messages sent directly to server's mailbox.
+%% If `{stop, ...}' tuple is returned, the server is stopped and
+%% `terminate/2' is called.
+%% @end
+%% @private
+%%-------------------------------------------------------------------------
+
+handle_info({graceful}, #media_info{owner = undefined, file_name = FileName, clients = Clients} = MediaInfo) ->
+ case ets:info(Clients, size) of
+ 0 -> ?D({"No readers for file", FileName}),
+ {stop, normal, MediaInfo};
+ _ -> {noreply, MediaInfo}
+ end;
+
+
+handle_info({graceful}, #media_info{owner = _Owner} = MediaInfo) ->
+ {noreply, MediaInfo};
+
+handle_info({'EXIT', Owner, _Reason}, #media_info{owner = Owner, clients = Clients} = MediaInfo) ->
+ case ets:info(Clients, size) of
+ 0 -> timer:send_after(?FILE_CACHE_TIME, {graceful});
+ _ -> ok
+ end,
+ {noreply, MediaInfo#media_info{owner = Owner}};
+
+handle_info({'EXIT', Device, _Reason}, #media_info{device = Device, type = mpeg_ts, clients = Clients} = MediaInfo) ->
+ ?D("MPEG TS finished"),
+ ets:foldl(fun({Client}, Packet) -> gen_fsm:send_event(Client, Packet), Packet end, {status, ?NS_PLAY_COMPLETE, 1}, Clients),
+ {stop, normal, MediaInfo};
+
+handle_info({'EXIT', Client, _Reason}, #media_info{clients = Clients} = MediaInfo) ->
+ ets:delete(Clients, Client),
+ ?D({"Removing client", Client, "left", ets:info(Clients, size)}),
+ case ets:info(Clients, size) of
+ 0 -> timer:send_after(?FILE_CACHE_TIME, {graceful});
+ _ -> ok
+ end,
+ {noreply, MediaInfo};
+
+handle_info({video, Video}, #media_info{clients = Clients} = MediaInfo) ->
+ ets:foldl(fun({Client}, Packet) -> ems_play:send(Client, Packet), Packet end, Video, Clients),
+ {noreply, MediaInfo};
+
+handle_info({video_config, Video}, #media_info{clients = Clients} = MediaInfo) ->
+ ets:foldl(fun({Client}, Packet) -> ems_play:send(Client, Packet), Packet end, Video, Clients),
+ {noreply, MediaInfo#media_info{video_decoder_config = Video}};
+
+handle_info({audio, Audio}, #media_info{clients = Clients} = MediaInfo) ->
+ ets:foldl(fun({Client}, Packet) -> ems_play:send(Client, Packet), Packet end, Audio, Clients),
+ {noreply, MediaInfo};
+
+handle_info({audio_config, Audio}, #media_info{clients = Clients} = MediaInfo) ->
+ ets:foldl(fun({Client}, Packet) -> ems_play:send(Client, Packet), Packet end, Audio, Clients),
+ {noreply, MediaInfo#media_info{audio_decoder_config = Audio}};
+
+handle_info(start, State) ->
+ {noreply, State};
+
+handle_info(_Info, State) ->
+ ?D({"Undefined info", _Info}),
+ {noreply, State}.
+
+%%-------------------------------------------------------------------------
+%% @spec (Reason, State) -> any
+%% @doc Callback executed on server shutdown. It is only invoked if
+%% `process_flag(trap_exit, true)' is set by the server process.
+%% The return value is ignored.
+%% @end
+%% @private
+%%-------------------------------------------------------------------------
+terminate(_Reason, #media_info{device = Device} = _MediaInfo) ->
+ (catch file:close(Device)),
+ ?D({"Media entry terminating", _Reason}),
+ ok.
+
+%%-------------------------------------------------------------------------
+%% @spec (OldVsn, State, Extra) -> {ok, NewState}
+%% @doc Convert process state when code is changed.
+%% @end
+%% @private
+%%-------------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+send_packet({Client}, #channel{msg = Data} = Channel) ->
+ % ?D({"Send to", Client}),
+ gen_fsm:send_event(Client, {send, {Channel, Data}}),
+ Channel;
+
+send_packet({Client}, Packet) ->
+ % ?D({"Send to", Client}),
+ gen_fsm:send_event(Client, {send, Packet}),
+ Packet.
+
View
2  src/rtmp_client.erl
@@ -284,7 +284,7 @@ handle_info(_Info, StateName, StateData) ->
%%-------------------------------------------------------------------------
terminate(_Reason, _StateName, #rtmp_client{socket=Socket, video_player = Player}) ->
rtmp_server:logout(),
- Player ! exit,
+ (catch Player ! exit),
(catch gen_tcp:close(Socket)),
ok.
Please sign in to comment.
Something went wrong with that request. Please try again.