Skip to content

Commit

Permalink
Initial commit with database modeling efforts
Browse files Browse the repository at this point in the history
  • Loading branch information
sorentwo committed Jan 13, 2019
0 parents commit 0ac3cc8
Show file tree
Hide file tree
Showing 13 changed files with 533 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
24 changes: 24 additions & 0 deletions .gitignore
@@ -0,0 +1,24 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
oban-*.tar

53 changes: 53 additions & 0 deletions README.md
@@ -0,0 +1,53 @@
Kiq is the most powerful software I have ever built. It is also the most
complex鈥攚hich I believe to be unnecessary.

Oban is a refinement and simplification of Kiq/Sidekiq. Much of the complexity
inside of Kiq is an artifact of integrating with Sidekiq and adhering to its
hodgepodge use of Redis data types. With the release of Redis 5 we now have streams,
which are powerful enough to model queues, retries, scheduled jobs, backup jobs and
job resurrection _in a single data type_.

Here are my gripes, complaints and general thoughts about the shortcomings of
Kiq and Sidekiq:

* Many of the pro/enterprise features that Kiq provides are implemented in a
very brittle way. This was done for interoperability with the Sidekiq UI and
isn't necessary in a green field system.
* The client and pooling system are deeply intertwined with Redis, which makes
testing very opaque and running jobs in-line impossible.
* There is a heavy reliance on polling for pushing jobs, fetching jobs and
performing retries. With blocking stream operations we have a more responsive
system with greater accuracy.
* The reporter system introduces a layer of asynchrony that could cause jobs not
to be retried, logged or have statistics recorded. The reporter system will be
replaced with middleware that can run _synchronously_ before or after a job.
* The lack of key namespacing makes it impossible to run integration tests
asynchronously.
* As Kiq's feature set grew so did the reliance on Lua scripts to orchestrate
atomic operations such as dequeueing, descheduling and taking locks. With
streams we don't need to rely on scripting for any of our operations. Not that
there is anything wrong with Lua or using scripts, but it adds to the overall
complexity.
* There is integration with the Telemetry library, but it isn't leveraged. It
can be used by library users to build a logger, we don't need to provide that.
* Workers in Kiq are static and executed in isolation. Only job arguments are
passed to `perform/1`, which makes it impossible for the function to know
anything else about the job being executed. Job execution should be a step in
a pipeline where a `call` function is passed a job structure and must return a
job structure.

Miscellaneous thoughts and notes:

* The XINFO command provides detail about the size of a stream and also the
number of pending jobs.
* Stats should be namespaced and contained in a single HASH.
* Process information is set to expire every minute, there isn't any point in
recording it. Instead, use pubsub to broadcast information about running jobs.
* GenStage and Redix have been rock solid. Keep with that.
* Avoiding global state and `Application` configuration is perfect. Keep doing
that.
* Try to make more/better use of property based tests. Can I use Proper instead
of StreamData to get stateful properties defined?
* The underlying structure and behavior should be easily replicated in other
languages (ruby, python, node).
* Get rid of the `init` function and optional callback.
18 changes: 18 additions & 0 deletions lib/oban.ex
@@ -0,0 +1,18 @@
defmodule Oban do
@moduledoc """
Documentation for Oban.
"""

@doc """
Hello world.
## Examples
iex> Oban.hello()
:world
"""
def hello do
:world
end
end
5 changes: 5 additions & 0 deletions lib/oban/config.ex
@@ -0,0 +1,5 @@
defmodule Oban.Config do
# "GROUP" is the wrong value here
# What is the overlap between IDENT and OTP_APP? Would I have multiple identities?
defstruct [:group, :ident, :maxlen, :otp_app, streams: []]
end
77 changes: 77 additions & 0 deletions lib/oban/database.ex
@@ -0,0 +1,77 @@
defmodule Oban.Database do
@moduledoc """
The Database behaviour specifies how workers can push, pull and otherwise interact with
persistent storage.
"""

alias Oban.{Config, Job}

@type db :: GenServer.server()
@type id :: binary() | integer()
@type cursor :: binary() | integer() | :ets.continuation()
@type jobs :: [Job.t()]
@type conf :: Config.t()
@type stream :: binary()
@type count :: pos_integer()
@type info_mode :: :all | :processes | :queues | :stats

@doc """
Start a new database process linked to the calling process.
"""
@callback start_link(opts :: Keyword.t()) :: GenServer.on_start()

@doc """
Push a job into the database for asynchronous processing.
If storage is successful then the list of jobs will be returned with the `id` assigned by the
database and optional metadata additions.
"""
@callback push(db(), jobs(), conf()) :: jobs()

@doc """
Pull one or more jobs from the database for processing.
This is a blocking operation that will either return a list of raw job data or the atom
`:timeout`, indicating that no jobs were available within the blocking period. It is essential
that jobs remain in the database until they are acknowledged through `ack/2`.
"""
@callback pull(db(), stream(), count(), conf()) :: jobs()

@doc """
Check what is coming up in the stream without pulling anything out.
Peeking can be done in chunks, where the `count` limits the number of entries returned per call
and the `id` is a cursor used for pagination.
The function returns a tuple with the last matched id and a list of jobs. The id may be used to
continue pagination.
"""
@callback peek(db(), stream(), count(), nil | cursor(), conf()) :: {jobs(), cursor()} | []

@doc """
Acknowledge that a job has been processed successfully.
This call ensures that a job won't be processed again. It is a safeguard against data loss if
the server is terminated during processing or there are unexpected errors. All jobs should be
acknowledged, regardless of whether they succeeded or failed.
The return value is `true` if the job was acknowledged, or `false` if it wasn't.
"""
@callback ack(db(), stream(), id(), conf()) :: boolean()

@doc """
Restore a pending job back into its stream for processing.
If a job is consumed from the stream via `pull/4`, but it is never acknowledged via `ack/4` it
will be stuck in a pending state. Calling `restore/4` will push a pending job back to its
original stream.
The return value is `true` if the job was restored, `false` if it wasn't.
"""
@callback restore(db(), stream(), id(), conf()) :: boolean()

@doc """
Purge all queues, stats and other data associated with this database instance.
"""
@callback clear(db(), conf()) :: :ok
end
122 changes: 122 additions & 0 deletions lib/oban/database/memory.ex
@@ -0,0 +1,122 @@
defmodule Oban.Database.Memory do
@moduledoc false

use GenServer

alias Oban.{Config, Job}

@behaviour Oban.Database

@impl Oban.Database
def start_link(opts) do
name = Keyword.get(opts, :name)
conf = Keyword.get(opts, :conf)

GenServer.start_link(__MODULE__, conf, name: name)
end

# Database Callbacks

@impl Oban.Database
def push(_db, %Job{} = job, %Config{} = conf) do
jid = System.unique_integer([:positive, :monotonic])
job = %{job | id: jid}

true = :ets.insert(stream_table(conf), {{job.stream, job.id}, job})

job
end

@impl Oban.Database
def pull(_db, stream, limit, conf) when is_binary(stream) and limit > 0 do
stream_table = stream_table(conf)
claim_table = claim_table(conf)

reducer = fn {key, job}, acc ->
case :ets.take(stream_table, key) do
[{^key, ^job}] ->
:ets.insert(claim_table, {key, job})

[job | acc]

[] ->
acc
end
end

case :ets.select(stream_table, [{{{stream, :_}, :_}, [], [:"$_"]}], limit) do
{matches, _cont} ->
matches
|> Enum.reduce([], reducer)
|> Enum.reverse()

_ ->
[]
end
end

@impl Oban.Database
def peek(_db, _stream, _limit, cont, _conf) do
case :ets.select(cont) do
{_matches, _cont} = result -> result
_ -> []
end
end

def peek(_db, stream, limit, cont, conf) when is_binary(stream) and limit > 0 do
case :ets.select(stream_table(conf), [{{{stream, :_}, :"$1"}, [], [:"$1"]}], limit) do
{_matches, _cont} = result -> result
_ -> []
end
end

@impl Oban.Database
def ack(_db, stream, id, conf) when is_binary(stream) and is_integer(id) do
case :ets.select_delete(claim_table(conf), [{{{stream, id}, :_}, [], [true]}]) do
1 -> true
0 -> false
end
end

@impl Oban.Database
def restore(_db, stream, id, conf) when is_binary(stream) and is_integer(id) do
case :ets.take(claim_table(conf), {stream, id}) do
[{key, job}] ->
:ets.insert(stream_table(conf), {key, job})

[] ->
false
end
end

@impl Oban.Database
def clear(_db, conf) do
true = :ets.delete_all_objects(claim_table(conf))
true = :ets.delete_all_objects(stream_table(conf))

:ok
end

# GenServer Callbacks

@impl GenServer
def init(%Config{} = conf) do
maybe_create_table(claim_table(conf))
maybe_create_table(stream_table(conf))

{:ok, nil}
end

# Helpers

defp claim_table(%Config{otp_app: app}), do: Module.concat([app, "Claim"])

defp stream_table(%Config{otp_app: app}), do: Module.concat([app, "Streams"])

defp maybe_create_table(table_name) do
case :ets.whereis(table_name) do
:undefined -> :ets.new(table_name, [:ordered_set, :public, :named_table])
_ -> :ok
end
end
end
60 changes: 60 additions & 0 deletions lib/oban/database/redis.ex
@@ -0,0 +1,60 @@
defmodule Oban.Database.Redis do
@moduledoc false

@behaviour Oban.Database

alias Oban.{Config, Job}

@impl Oban.Database
def start_link(opts) do
{host, opts} = Keyword.pop(opts, :redis_url)

# initialize afterwards

Redix.start_link(host, exit_on_disconnection: true)
end

def init(db, %Config{ident: ident, streams: streams}) do
commands = for stream <- streams do
["XGROUP", "CREATE", stream, group_name(ident, stream), "$", "MKSTREAM"]
end

# If the group already exists this will cause a "BUSYGROUP" error, we don't want to raise
# here.
Redix.pipeline!(db, commands)

# This is also the point where we need to claim dead entries
end

@impl Oban.Database
def push(db, %Job{stream: stream} = job, %Config{maxlen: maxlen}) do
# The id value is either the * character or a future timestamp
command = ["XADD", stream, "MAXLEN", "~", maxlen, "*"] ++ Job.to_fields(job)

%{job | id: Redix.command!(db, command)}
end

@impl Oban.Database
def pull(db, stream, limit, %Config{group: group, ident: ident}) when is_binary(stream) and limit > 0 do
# Somehow we need to get new values UP TO the given ID. XRANGE can give us the values, but
# doesn't count as reading for a consumer group.
# What is the consumer name? That needs to be specific for this node/dyno type thing.
["XREADGROUP", "GROUP", group, "ALICE"]
["COUNT", limit, "BLOCK", "1000"] # NOTE: Don't hardcode this value
["STREAMS", stream, "ID", "$"]

case Redix.command!(db, []) do
[_stream, entries] -> [:do_something]
nil -> []
end
end

@impl Oban.Database
def ack(db, stream, id, %Config{group: group}) when is_binary(id) do
Redix.command!(db, ["XACK", stream, group, id])
end

# Helpers

defp group_name(ident, stream), do: "#{ident}:#{stream}"
end
3 changes: 3 additions & 0 deletions lib/oban/job.ex
@@ -0,0 +1,3 @@
defmodule Oban.Job do
defstruct [:id, :stream]
end

0 comments on commit 0ac3cc8

Please sign in to comment.