Skip to content

Commit

Permalink
new plugin to work with kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Glushkov committed Apr 10, 2018
1 parent e309853 commit b8d06dc
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 2 deletions.
20 changes: 19 additions & 1 deletion common_apps/mzbench_utils/src/mzb_subprocess.erl
Expand Up @@ -3,7 +3,8 @@
-export([
remote_cmd/5,
remote_cmd/6,
exec_format/4
exec_format/4,
check_output/4
]).


Expand Down Expand Up @@ -77,6 +78,23 @@ exec_format(Format, Args, Opts, Logger, Handler, InitState) ->
erlang:error({cmd_failed, lists:flatten(Command), Code, Output})
end.
-type logger() :: any(). % FIXME
-spec check_output(string(), [any()], [any()], logger()) -> {integer(), string()}.
check_output(Format, Args, Opts, Logger) ->
Handler = fun (eof, Acc) -> lists:flatten(Acc);
(Data, Acc) -> [Acc|Data]
end,
Command = io_lib:format(Format, Args),
BeforeExec = os:timestamp(),
Logger(info, "[ EXEC ] ~s (~p)", [Command, self()]),
Port = open_port({spawn, lists:flatten(Command)}, [stream, eof, exit_status | Opts]),
{Code, _Output} = Res = get_data(Port, Handler, []),
Duration = timer:now_diff(os:timestamp(), BeforeExec),
Logger(info, "[ EXEC ] Command executed in ~p ms~nCmd: ~s~nExit code: ~p~n",
[Duration / 1000, Command, Code]),
Res.
get_data(Port, Handler, State) ->
receive
{Port, {data, Bytes}} ->
Expand Down
6 changes: 5 additions & 1 deletion server/src/mzb_api_bench.erl
Expand Up @@ -92,6 +92,10 @@ init([Id, Params]) ->
end,

Tags = mzb_bc:maps_get(tags, Params, []),
ProvisionNodes = case Cloud of
k8s -> false;
_ -> maps:get(provision_nodes, Params)
end,

Config = #{
id => Id,
Expand All @@ -106,7 +110,7 @@ init([Id, Params]) ->
node_install_spec => NodeInstallSpec,
env => mzbl_script:normalize_env(generate_bench_env(Id, Params)),
deallocate_after_bench => maps:get(deallocate_after_bench, Params),
provision_nodes => maps:get(provision_nodes, Params),
provision_nodes => ProvisionNodes,
req_host => maps:get(req_host, Params),
initial_user => maps:get(user, Params),
director_host => undefined,
Expand Down
166 changes: 166 additions & 0 deletions server/src/mzb_k8s_plugin.erl
@@ -0,0 +1,166 @@
-module(mzb_k8s_plugin).

-export([start/2, create_cluster/3, destroy_cluster/1]).

-define(POLL_INTERVAL, 2000).
-define(MAX_POLL_COUNT, 150).

% ===========================================================
% Public API
% ===========================================================

% Possible config:
% {cloud_plugins,
% [{k8s, #{module => mzb_k8s_plugin,
% context => "stage.k8s", % optional, will be used from k8s
% namespace => "testing-and-quality", % optional, will be used from k8s
% pod_user => "bench", % optional, default: root
% pod_spec => [
% {image, "http://docker.io/ridrisov/mzbench:mylabel3},
% {cpu, "1"}, % limits = request
% {memory, "128Mi"},
% ]}
% }
% ]}

-type plugin_opts() :: #{pod_spec:=[any()], pod_user:=string(), namespace:=string(), context:=string(), _=>_}.
-type cluster_id() :: {Namespace :: string(), RCName :: string()}.

-spec start(any(), plugin_opts()) -> any().
start(_Name, PluginOpts) ->
PluginOpts.


-spec create_cluster(plugin_opts(), NumNodes :: pos_integer(), ClusterConfig :: #{}) -> {ok, cluster_id(), string(), [string()]}.
create_cluster(PluginOpts, NumNodes, ClusterConfig) when is_integer(NumNodes), NumNodes > 0 ->
UserName = maps:get(pod_user, PluginOpts, root),
Namespace = maps:get(namespace, PluginOpts, undefined),
Context = maps:get(context, PluginOpts, undefined),
PodSpec = maps:get(pod_spec, PluginOpts),

BenchId = maps:get(bench_id, ClusterConfig),

BenchName = "mzbench-" ++ integer_to_list(BenchId),
ID = {Context, Namespace, BenchName},


try
{ok, _} = create_rc(ID, PodSpec, NumNodes),
{ok, PodData1} = get_pods(Context, Namespace, ["-l bench=" ++ BenchName]),
PodNames = get_pod_names(PodData1),
wait_pods_start(NumNodes, ID, PodNames, ?MAX_POLL_COUNT),
lager:info("Pods are running ~p", [PodNames]),
% IP addresses were not known before
{ok, PodData2} = get_pods(Context, Namespace, ["-l bench=" ++ BenchName]),
IPs = get_pod_ips(PodData2),
wait_pods_ssh(IPs, ?MAX_POLL_COUNT),
lager:info("Pods are ssh-ready ~p", [PodNames]),
{ok, ID, UserName, IPs}
catch
C:E ->
ST = erlang:get_stacktrace(),
destroy_cluster(ID),
erlang:raise(C,E,ST)
end.

destroy_cluster(ID) ->
R = delete_rc(ID),
lager:info("Destroyed RC: ~p, result: ~p", [ID, R]),
{ok, _} = R,
ok.


%%%%%%%%%%%%%%%%%%%%
% Internal API
%%%%%%%%%%%%%%%%%%%%

context_arg(undefined) -> [];
context_arg(Context) -> ["--context", Context].

namespace_arg(undefined) -> [];
namespace_arg(Namespace) -> ["--namespace", Namespace].

create_rc({Context, Namespace, BenchName}, PodSpec, NumNodes) ->
% Might be a good idea to be able to define these values in the benchmark
Image = get_config_value(image, PodSpec),
CPU = get_config_value(cpu, PodSpec),
Memory = get_config_value(memory, PodSpec),

Resources = "cpu=" ++ CPU ++ ",memory=" ++ Memory,
Cmd = ["kubectl", "run", BenchName] ++
context_arg(Context) ++
namespace_arg(Namespace) ++
["--labels", "app=mzbench-worker,bench=" ++ BenchName,
"--image", Image,
"--replicas", integer_to_list(NumNodes),
"--requests", Resources,
"--limits", Resources,
"--port", "22",
"--generator", "run/v1",
"--", "/usr/sbin/sshd", "-D"],
run_kubectl(Cmd, false).

delete_rc({Context, Namespace, BenchName}) ->
Cmd = ["kubectl", "delete", "rc", BenchName] ++
context_arg(Context) ++
namespace_arg(Namespace),
run_kubectl(Cmd, false).

get_pods(Context, Namespace, Args) ->
Cmd = ["kubectl -o json get pods"] ++
context_arg(Context) ++
namespace_arg(Namespace) ++
Args,
run_kubectl(Cmd, true).

-spec run_kubectl([any()], boolean()) -> {ok, Res :: any()} | {error, Why :: string()}.
run_kubectl(CmdList, ParseJson) ->
Logger = mzb_api_app:default_logger(),
Cmd = string:join(CmdList, " "),
case mzb_subprocess:check_output(Cmd, [], [], Logger) of
{0, Res} when ParseJson -> {ok, jiffy:decode(Res, [return_maps])};
{0, Res} -> {ok, Res};
{_Code, Res} -> {error, Res}
end.


get_pod_names(PodData) ->
[ get_map_value(<<"metadata">>, <<"name">>, Pod) || Pod <- maps:get(<<"items">>, PodData) ].

get_pod_ips(PodData) ->
[ get_map_value(<<"status">>, <<"podIP">>, Pod) || Pod <- maps:get(<<"items">>, PodData) ].

% wait_pods_start will fail if len(Pods) < NumNodes
wait_pods_start(_, _, _, C) when C < 0 -> erlang:error({ec2_error, cluster_start_timed_out});
wait_pods_start(0, _, _, _) -> [];
wait_pods_start(N, ID, [H | T], C) ->
{Context, Namespace, _} = ID,
{ok, Res} = get_pods(Context, Namespace, [H]),
lager:info("Waiting pods result: ~p", [Res]),
case get_map_value(<<"status">>, <<"phase">>, Res) of
"Running" ->
[H | wait_pods_start(N-1, ID, T, C - 1)];
_ ->
timer:sleep(?POLL_INTERVAL),
wait_pods_start(N, ID, T ++ [H], C - 1)
end.

wait_pods_ssh(_, C) when C < 0 -> erlang:error({ec2_error, cluster_ssh_start_timed_out});
wait_pods_ssh([], _) -> ok;
wait_pods_ssh([H | T], C) ->
lager:info("Checking port 22 on ~p", [H]),
case gen_tcp:connect(H, 22, [], ?POLL_INTERVAL) of
{ok, Socket} -> gen_tcp:close(Socket), wait_pods_ssh(T, C - 1);
_ -> timer:sleep(?POLL_INTERVAL), wait_pods_ssh([H | T], C - 1)
end.

get_map_value(Key1, Key2, Map) ->
V = maps:get(Key2, maps:get(Key1, Map)),
binary_to_list(V).

get_config_value(Key, PropList) ->
Value = proplists:get_value(Key, PropList),
case Value == undefined orelse not is_list(Value) of
true -> erlang:error({k8s_error, incorrect_config_value, Key});
_ -> Value
end.

0 comments on commit b8d06dc

Please sign in to comment.