Permalink
Browse files

Initiated refactoring of media_provider

  • Loading branch information...
maxlapshin committed Nov 21, 2009
1 parent 66c8d49 commit abc722fdd63922045ae19f8b913be9072dd69f60
Showing with 139 additions and 269 deletions.
  1. +1 −1 doc/media.edoc
  2. +12 −12 src/apps/apps_streaming.erl
  3. +1 −1 src/ems_rtmp.erl
  4. +30 −6 src/ems_sup.erl
  5. +15 −135 src/media/{media_entry.erl → file_media.erl}
  6. +15 −14 src/media/file_play.erl
  7. +65 −100 src/media/media_provider.erl
View
@@ -38,7 +38,7 @@ Media_stream may be a client of recorded stream from web camera or client of dec
-Media opening:
+Media playing:
1. media_provider makes lookup in table.
@@ -43,21 +43,21 @@
'WAIT_FOR_DATA'({play, Name, StreamId}, #rtmp_client{client_buffer = ClientBuffer} = State) ->
case media_provider:play(Name, [{stream_id, StreamId}, {client_buffer, ClientBuffer}]) of
- {ok, PlayerPid} ->
- ?D({"Player starting", PlayerPid}),
- PlayerPid ! {start},
- {next_state, 'WAIT_FOR_DATA', State#rtmp_client{video_player = PlayerPid}, ?TIMEOUT};
- {notfound} ->
+ {ok, Player} ->
+ ?D({"Player starting", Player}),
+ Player ! start,
+ {next_state, 'WAIT_FOR_DATA', State#rtmp_client{video_player = Player}, ?TIMEOUT};
+ {notfound, _Reason} ->
gen_fsm:send_event(self(), {status, ?NS_PLAY_STREAM_NOT_FOUND, 1}),
{next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};
Reason ->
?D({"Failed to start video player", Reason}),
{error, Reason}
end;
-'WAIT_FOR_DATA'({stop}, #rtmp_client{video_player = PlayerPid} = State) when is_pid(PlayerPid) ->
- ?D({"Stopping video player", PlayerPid}),
- PlayerPid ! {stop},
+'WAIT_FOR_DATA'({stop}, #rtmp_client{video_player = Player} = State) when is_pid(Player) ->
+ ?D({"Stopping video player", Player}),
+ Player ! stop,
{next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};
@@ -113,7 +113,7 @@ deleteStream(_AMF, #rtmp_client{video_player = undefined} = State) ->
State;
deleteStream(_AMF, #rtmp_client{video_player = Player} = State) when is_pid(Player) ->
- gen_fsm:send_event(Player, {stop}),
+ Player ! stop,
?D("invoke - deleteStream"),
State.
@@ -133,7 +133,7 @@ play(AMF, #rtmp_client{video_player = Player} = State) ->
?D({"invoke - play", Name, AMF}),
case Player of
undefined -> ok;
- _ -> Player ! {stop}
+ _ -> Player ! stop
end,
gen_fsm:send_event(self(), {control, ?RTMP_CONTROL_STREAM_RECORDED, StreamId}),
gen_fsm:send_event(self(), {control, ?RTMP_CONTROL_STREAM_BEGIN, StreamId}),
@@ -154,11 +154,11 @@ pause(AMF, #rtmp_client{video_player = Player} = State) ->
case Pausing of
true ->
- Player ! {pause},
+ Player ! pause,
gen_fsm:send_event(self(), {status, ?NS_PAUSE_NOTIFY, 1}),
State;
false ->
- Player ! {resume},
+ Player ! resume,
gen_fsm:send_event(self(), {status, ?NS_UNPAUSE_NOTIFY, 1}),
State
end.
View
@@ -208,7 +208,7 @@ command(#channel{type = ?RTMP_TYPE_CONTROL, msg = <<?RTMP_CONTROL_STREAM_BUFFER:
?D({"Buffer size on stream id", BufferSize, _StreamId}),
case Player of
undefined -> ok;
- _ -> gen_fsm:send_event(Player, {client_buffer, BufferSize})
+ _ -> Player ! {client_buffer, BufferSize}
end,
State#rtmp_client{client_buffer = BufferSize};
View
@@ -70,7 +70,9 @@ start_rtsp_client() -> supervisor:start_child(rtsp_client_sup, []).
%% To be called by the media provider.
%% @end
%%--------------------------------------------------------------------
-start_media(Name, Type) -> supervisor:start_child(media_entry_sup, [Name, Type]).
+start_media(Name, file = Type) -> supervisor:start_child(file_media_sup, [Name, Type]);
+start_media(Name, mpeg_ts = Type) -> supervisor:start_child(stream_media_sup, [Name, Type]);
+start_media(Name, live = Type) -> supervisor:start_child(stream_media_sup, [Name, Type]).
%%--------------------------------------------------------------------
%% @spec () -> any()
@@ -119,17 +121,32 @@ init([rtsp_client]) ->
]
}
};
-init([media_entry]) ->
+init([file_media]) ->
{ok,
{_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
[
% MediaEntry
{ undefined, % Id = internal id
- {media_entry,start_link,[]}, % StartFun = {M, F, A}
+ {file_media,start_link,[]}, % StartFun = {M, F, A}
temporary, % Restart = permanent | transient | temporary
2000, % Shutdown = brutal_kill | int() >= 0 | infinity
worker, % Type = worker | supervisor
- [media_entry] % Modules = [Module] | dynamic
+ [file_media] % Modules = [Module] | dynamic
+ }
+ ]
+ }
+ };
+init([stream_media]) ->
+ {ok,
+ {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
+ [
+ % MediaEntry
+ { undefined, % Id = internal id
+ {stream_media,start_link,[]}, % StartFun = {M, F, A}
+ temporary, % Restart = permanent | transient | temporary
+ 2000, % Shutdown = brutal_kill | int() >= 0 | infinity
+ worker, % Type = worker | supervisor
+ [stream_media] % Modules = [Module] | dynamic
}
]
}
@@ -184,8 +201,15 @@ init([]) ->
[media_provider] % Modules = [Module] | dynamic
},
% Media entry supervisor
- { media_entry_sup,
- {supervisor,start_link,[{local, media_entry_sup}, ?MODULE, [media_entry]]},
+ { file_media_sup,
+ {supervisor,start_link,[{local, file_media_sup}, ?MODULE, [file_media]]},
+ permanent, % Restart = permanent | transient | temporary
+ infinity, % Shutdown = brutal_kill | int() >= 0 | infinity
+ supervisor, % Type = worker | supervisor
+ [] % Modules = [Module] | dynamic
+ },
+ { stream_media_sup,
+ {supervisor,start_link,[{local, stream_media_sup}, ?MODULE, [stream_media]]},
permanent, % Restart = permanent | transient | temporary
infinity, % Shutdown = brutal_kill | int() >= 0 | infinity
supervisor, % Type = worker | supervisor
@@ -1,6 +1,6 @@
% Media entry is instance of some resource
--module(media_entry).
+-module(file_media).
-author(max@maxidoors.ru).
-include("../include/ems.hrl").
-include("../include/media_info.hrl").
@@ -9,25 +9,21 @@
-behaviour(gen_server).
%% External API
--export([start_link/2, subscribe/2, first/1, read/2, file_name/1, seek/2, metadata/1, publish/2, is_stream/1, set_owner/2]).
+-export([start_link/2, first/1, read/2, file_name/1, seek/2, metadata/1]).
%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
start_link(Path, Type) ->
gen_server:start_link(?MODULE, [Path, Type], []).
-subscribe(Server, Client) ->
- gen_server:call(Server, {subscribe, Client}).
-
+first(Server) ->
+ gen_server:call(Server, {first}).
+
read(Server, Player) ->
gen_server:call(Server, {read, Player}).
-first(Server) ->
- gen_server:call(Server, {first}).
-
file_name(Server) ->
gen_server:call(Server, {file_name}).
@@ -37,66 +33,15 @@ seek(Server, Timestamp) ->
metadata(Server) ->
gen_server:call(Server, {metadata}).
-publish(Server, Frame) ->
- gen_server:call(Server, {publish, Frame}).
-
-is_stream(Server) ->
- gen_server:call(Server, {is_stream}).
-
-set_owner(Server, Owner) ->
- gen_server:call(Server, {set_owner, Owner}).
-
init([Name, file]) ->
process_flag(trap_exit, true),
- FileName = filename:join([file_play:file_dir(), Name]),
- case filelib:is_regular(FileName) of
- true ->
- error_logger:info_msg("Opening file ~p~n", [FileName]),
- Clients = ets:new(clients, [set, private]),
- {ok, Info} = open_file(FileName),
- {ok, Info#media_info{clients = Clients, type = file}};
- _ ->
- ignore
- end;
-
-
-init([URL, mpeg_ts]) ->
- process_flag(trap_exit, true),
- error_logger:info_msg("HTTP MPEG TS ~p~n", [URL]),
- Clients = ets:new(clients, [set, private]),
- % Header = flv:header(#flv_header{version = 1, audio = 1, video = 1}),
- Device = ems_sup:start_ts_lander(URL, self()),
- Recorder = #media_info{type=mpeg_ts, file_name = URL, ts_prev = 0, clients = Clients, device = Device},
- {ok, Recorder, ?TIMEOUT};
-
-
-init([Name, live]) ->
- process_flag(trap_exit, true),
- error_logger:info_msg("Live streaming stream ~p~n", [Name]),
+ error_logger:info_msg("Opening file ~p~n", [Name]),
Clients = ets:new(clients, [set, private]),
- % Header = flv:header(#flv_header{version = 1, audio = 1, video = 1}),
- Recorder = #media_info{type=live, ts_prev = 0, clients = Clients},
- {ok, Recorder, ?TIMEOUT};
+ {ok, Info} = open_file(Name),
+ {ok, Info#media_info{clients = Clients, type = file}}.
-init([Name, record]) ->
- process_flag(trap_exit, true),
- error_logger:info_msg("Recording stream ~p~n", [Name]),
- Clients = ets:new(clients, [set, private]),
- FileName = filename:join([file_play:file_dir(), Name ++ ".flv"]),
- (catch file:delete(FileName)),
- Header = flv:header(#flv_header{version = 1, audio = 1, video = 1}),
- ?D({"Recording to file", FileName}),
- case file:open(FileName, [write, {delayed_write, 1024, 50}]) of
- {ok, Device} ->
- file:write(Device, Header),
- Recorder = #media_info{type=record, device = Device, file_name = FileName, ts_prev = 0, clients = Clients},
- {ok, Recorder, ?TIMEOUT};
- _Error ->
- ignore
- end.
-
%%-------------------------------------------------------------------------
@@ -112,12 +57,12 @@ init([Name, record]) ->
%% @private
%%-------------------------------------------------------------------------
-handle_call({subscribe, Client}, _From, #media_info{file_name = Name, clients = Clients} = MediaInfo) when is_pid(Client) ->
- ets:insert(Clients, {Client}),
- link(Client),
- ?D({"Link from to", Name, self(), Client, ets:info(Clients, size)}),
- % link(_From),
- {reply, ok, MediaInfo};
+handle_call({create_player, Options}, _From, #media_info{file_name = Name, clients = Clients} = MediaInfo) ->
+ {ok, Pid} = file_play:start(self(), Options),
+ ets:insert(Clients, {Pid}),
+ link(Pid),
+ ?D({"Creating media player for", Name, "client", proplists:get_value(consumer, Options)}),
+ {reply, {ok, Pid}, MediaInfo};
handle_call({first}, _From, #media_info{frames = FrameTable} = MediaInfo) ->
@@ -145,37 +90,6 @@ handle_call({metadata}, _From, MediaInfo) ->
{reply, undefined, MediaInfo};
-handle_call({is_stream}, _From, #media_info{type = record} = MediaInfo) ->
- {reply, true, MediaInfo};
-
-handle_call({is_stream}, _From, MediaInfo) ->
- {reply, false, MediaInfo};
-
-
-handle_call({set_owner, Owner}, _From, #media_info{owner = undefined} = MediaInfo) ->
- ?D({"Setting owner to", Owner}),
- {reply, ok, MediaInfo#media_info{owner = Owner}};
-
-handle_call({set_owner, _Owner}, _From, #media_info{owner = Owner} = MediaInfo) ->
- {reply, {error, {owner_exists, Owner}}, MediaInfo};
-
-
-handle_call({publish, Channel}, _From, #media_info{ts_prev = PrevTs, device = Device, clients = Clients} = Recorder) ->
- % ?D({"Record",Channel#channel.id, Channel#channel.type,size(Channel#channel.msg),Channel#channel.timestamp,PrevTs}),
- {Tag,NextTimeStamp} = ems_flv:to_tag(Channel,PrevTs),
- case Device of
- undefined -> ok;
- _ -> file:write(Device, Tag)
- end,
-
- NextTimeStamp = PrevTs + Channel#channel.timestamp,
- % ?D({"Broadcast",Channel#channel.id,Channel#channel.type,size(Channel#channel.msg),NextTimeStamp}),
- Packet = Channel#channel{id = ems_play:channel_id(Channel#channel.type,1), timestamp = NextTimeStamp},
- % ?D({"Broadcast to", ets:info(Clients, size)}),
- ets:foldl(fun send_packet/2, Packet, Clients),
-
- {reply, ok, Recorder#media_info{ts_prev = NextTimeStamp}};
-
handle_call(Request, _From, State) ->
?D({"Undefined call", Request, _From}),
{stop, {unknown_call, Request}, State}.
@@ -231,30 +145,6 @@ handle_info({'EXIT', Client, _Reason}, #media_info{clients = Clients} = MediaInf
end,
{noreply, MediaInfo};
-handle_info({'$gen_event', {stop}}, State) ->
- {noreply, State};
-
-handle_info({'$gen_event', {exit}}, State) ->
- {noreply, State};
-
-handle_info({'$gen_event', {start}}, State) ->
- {noreply, State};
-
-handle_info({'$gen_event', {video, Video}}, #media_info{type = mpeg_ts, clients = Clients} = MediaInfo) ->
- % Packet = Channel#channel{id = ems_play:channel_id(video,1), timestamp = NextTimeStamp},
- % ets:foldl(fun send_packet/2, Packet, Clients),
-
- {noreply, MediaInfo};
-
-handle_info({'$gen_event', {send, Packet}}, #media_info{type = mpeg_ts, clients = Clients} = MediaInfo) ->
- % ?D(Frame),
- % Packet = Channel#channel{id = ems_play:channel_id(video,1), timestamp = NextTimeStamp},
- ets:foldl(fun send_packet/2, Packet, Clients),
-
- {noreply, MediaInfo};
-
-
-
handle_info(_Info, State) ->
?D({"Undefined info", _Info}),
@@ -282,16 +172,6 @@ terminate(_Reason, #media_info{device = Device} = _MediaInfo) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-send_packet({Client}, #channel{msg = Data} = Channel) ->
- % ?D({"Send to", Client}),
- gen_fsm:send_event(Client, {send, {Channel, Data}}),
- Channel;
-
-send_packet({Client}, Packet) ->
- % ?D({"Send to", Client}),
- gen_fsm:send_event(Client, {send, Packet}),
- Packet.
-
open_file(Name) ->
FileName = filename:join([file_play:file_dir(), Name]),
{ok, Device} = file:open(FileName, [read, binary, {read_ahead, 100000}]),
Oops, something went wrong.

0 comments on commit abc722f

Please sign in to comment.