Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dsl'
Browse files Browse the repository at this point in the history
  • Loading branch information
zorbash committed Oct 20, 2016
2 parents b0f0d70 + 655b8f6 commit 8e40a53
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 68 deletions.
12 changes: 7 additions & 5 deletions installer/templates/new/jobs/buzzwords.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
random = fn -> :rand.uniform * 100 |> Float.round end
use Kitto.Job.DSL

job :buzzwords, every: :second do
random = fn -> :rand.uniform * 100 |> Float.round end

Kitto.Job.every 2, :seconds, fn (notifier) ->
list = ~w[synergy startup catalyst docker microservice container elixir react]
|> Enum.map(fn (w) -> %{ label: w, value: random.() } end)
|> Enum.shuffle
|> Enum.map(fn (w) -> %{ label: w, value: random.() } end)
|> Enum.shuffle

notifier.broadcast! :buzzwords, %{items: list}
broadcast! :buzzwords, %{items: list}
end
6 changes: 4 additions & 2 deletions installer/templates/new/jobs/convergence.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use Kitto.Job.DSL

defmodule Kitto.Jobs.Convergence do
def new, do: Agent.start(fn -> 0 end)

Expand All @@ -16,6 +18,6 @@ end
{:ok, convergence} = Kitto.Jobs.Convergence.new
points = &(&1 |> Kitto.Jobs.Convergence.points)

Kitto.Job.every 2, :seconds, fn (notifier) ->
notifier.broadcast! :convergence, %{points: convergence |> points.()}
job :convergence, every: {2, :seconds} do
broadcast! :convergence, %{points: convergence |> points.()}
end
6 changes: 4 additions & 2 deletions installer/templates/new/jobs/random.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
Kitto.Job.every :second, fn (notifier) ->
notifier.broadcast! :random, %{value: :rand.uniform * 100 |> Float.round}
use Kitto.Job.DSL

job :random, every: :second do
broadcast! :random, %{value: :rand.uniform * 100 |> Float.round}
end
8 changes: 4 additions & 4 deletions installer/templates/new/jobs/text.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Kitto.Job.every 4, :seconds, fn (notifier) ->
use Kitto.Job.DSL

job :text, every: {4, :seconds} do
phrases = ["This is your shiny new dashboard", "Built on the Kitto Framework"]

with text <- (phrases |> Enum.shuffle |> Enum.take(1)) do
notifier.broadcast! :text, %{text: text}
end
broadcast! :text, %{text: (phrases |> Enum.shuffle |> Enum.take(1))}
end
55 changes: 27 additions & 28 deletions lib/job.ex
Original file line number Diff line number Diff line change
@@ -1,49 +1,48 @@
defmodule Kitto.Job do
@defaults first_at: nil
def start_link(job) do
{:ok, spawn_link(Kitto.Job, :new, [job])}
end

def start_link(interval, job, options \\ @defaults) do
pid = spawn_link(Kitto.Job, :new, [interval, job, options])
def register(name, options, job) do
import Kitto.Time

{:ok, pid}
end
opts = [interval: options[:every] |> mseconds,
first_at: options[:first_at] |> mseconds]

def new(job), do: register(job, interval: nil)
def new(nil, job, _) do
job.(Kitto.Notifier)
Kitto.Runner.register(name: name, job: job, options: opts)
end

receive do
def new(job) do
case job[:options][:interval] do
nil -> once(job)
_ -> with_interval(job)
end
end

def new(interval, job, options) do
first_at(options[:first_at], job)
defp with_interval(job) do
first_at(job[:options][:first_at], job)

receive do
after
interval ->
job.(Kitto.Notifier)
new(interval, job, first_at: false)
job[:options][:interval] ->
run job
with_interval(put_in(job[:options][:first_at], false))
end
end

def every(n, :seconds, job), do: register(job, interval: n * 1000)
def every(n, :minutes, job), do: register(job, interval: n * 1000 * 60)
def every(n, :hours, job), do: register(job, interval: n * 1000 * 60)
defp run(job), do: Kitto.StatsServer.measure(job[:name], job[:job])

def every(:second, job), do: every(1, :seconds, job)
def every(:minute, job), do: every(1, :minutes, job)
def every(:hour, job), do: every(60, :minutes, job)
def every(:day, job), do: every(24, :hours, job)
defp once(job) do
run job

defp first_at(false, _job), do: nil
receive do
end
end

defp first_at(false, _job), do: nil
defp first_at(t, job) do
if t, do: :timer.sleep(round(t * 1000))

job.(Kitto.Notifier)
end
if t, do: :timer.sleep(t)

defp register(job, options) do
Kitto.Runner.register(job: job, options: options)
run job
end
end
16 changes: 16 additions & 0 deletions lib/job_dsl.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Kitto.Job.DSL do
defmacro __using__(_opts) do
quote do
import Kitto.Job.DSL
import Kitto.Notifier, only: [broadcast!: 2]
end
end

defmacro job(name, options, contents \\ []) do
quote do
Kitto.Job.register unquote(name), unquote(options), fn ->
unquote(contents[:do])
end
end
end
end
1 change: 1 addition & 0 deletions lib/kitto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Kitto do

children = [worker(__MODULE__, [], function: :start_server),
worker(Kitto.Notifier, []),
worker(Kitto.StatsServer, []),
worker(Kitto.Runner, [])]

Supervisor.start_link(children, [strategy: :one_for_one, name: Kitto.Supervisor])
Expand Down
11 changes: 6 additions & 5 deletions lib/runner.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
defmodule Kitto.Runner do
use Supervisor
alias Kitto.Job.Workspace

@max_restarts Application.get_env :kitto, :job_max_restarts, 300

def start_link do
Supervisor.start_link(__MODULE__, :ok, name: :runner_sup)
Expand All @@ -10,11 +13,9 @@ defmodule Kitto.Runner do

load_jobs

children = jobs |> Enum.map(fn (job) ->
worker(Kitto.Job, [job[:options][:interval], job[:job]], id: make_ref)
end)
children = jobs |> Enum.map(&(worker(Kitto.Job, [&1], id: make_ref)))

supervise(children, strategy: :one_for_one)
supervise(children, strategy: :one_for_one, max_restarts: @max_restarts)
end

def register(job) do
Expand All @@ -25,7 +26,7 @@ defmodule Kitto.Runner do

def jobs, do: runner |> Agent.get(&(&1))

defp load_jobs, do: job_files |> Enum.each(&Code.eval_file/1)
defp load_jobs, do: job_files |> Enum.each(&Workspace.eval_file/1)

defp job_files do
Path.wildcard Path.join(System.cwd, "jobs/**/*.{ex,exs}")
Expand Down
72 changes: 72 additions & 0 deletions lib/stats_server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
defmodule Kitto.StatsServer do
@default_stats %{
times_triggered: 0,
times_completed: 0,
failures: 0,
avg_time_took: 0.0,
total_running_time: 0.0
}

def start_link do
Agent.start_link(fn -> %{} end, name: :stats_server)
end

def measure(name, job) do
name |> initialize_stats
name |> update_trigger_count
name |> measure_func(job)
end

def stats do
server |> Agent.get(&(&1))
end

defp initialize_stats(name) do
server |> Agent.update(fn (stats) ->
Map.merge(stats, %{name => Map.get(stats, name, @default_stats)})
end)
end

defp update_trigger_count(name) do
server |> Agent.update(fn (stats) ->
new_stats = stats[name]

stats |> Map.merge(%{name => %{new_stats |
times_triggered: new_stats[:times_triggered] + 1}})
end)
end

defp measure_func(name, job) do
run = timed_call(job)

server |> Agent.update(fn (stats) ->
new_stats = stats[name]

new_stats = case run do
{:ok, time_took} ->
new_times_completed = new_stats[:times_completed] + 1
new_total_running_time = new_stats[:total_running_time] + time_took

%{new_stats |
times_completed: new_times_completed,
total_running_time: new_total_running_time}
|> Map.merge(%{avg_time_took: new_total_running_time / new_times_completed})
{:error, _} -> %{new_stats | failures: new_stats[:failures] + 1}
end

stats |> Map.merge(%{name => new_stats})
end)

if (elem(run, 0) == :error), do: raise(elem(run, 1))
end

defp timed_call(f) do
try do
{:ok, ((:timer.tc(f) |> elem(0)) / 1_000_000)}
rescue
e -> {:error, e}
end
end

defp server, do: Process.whereis(:stats_server)
end
51 changes: 51 additions & 0 deletions lib/time.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Kitto.Time do
@doc """
Return the number of milliseconds when for n seconds
"""
def mseconds({n, :milliseconds}), do: n

@doc """
Return the number of milliseconds when for n seconds
"""
def mseconds({n, :seconds}), do: n * 1000

@doc """
Return the number of milliseconds when for n minutes
"""
def mseconds({n, :minutes}), do: n * mseconds({60, :seconds})

@doc """
Return the number of milliseconds when for n hours
"""
def mseconds({n, :hours}), do: n * mseconds({60, :minutes})

@doc """
Return the number of milliseconds when for n days
"""
def mseconds({n, :days}), do: n * mseconds({24, :hours})

@doc """
Return the number of milliseconds when nil is passed
"""
def mseconds(nil), do: nil

@doc """
Return the number of milliseconds in a second
"""
def mseconds(:second), do: mseconds({1, :seconds})

@doc """
Return the number of milliseconds in a minute
"""
def mseconds(:minute), do: mseconds({1, :minutes})

@doc """
Return the number of milliseconds in an hour
"""
def mseconds(:hour), do: mseconds({1, :hours})

@doc """
Return the number of milliseconds in a day
"""
def mseconds(:day), do: mseconds({24, :hours})
end
3 changes: 3 additions & 0 deletions lib/workspace.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule Kitto.Job.Workspace do
defdelegate eval_file(file), to: Code, as: :eval_file
end

0 comments on commit 8e40a53

Please sign in to comment.