Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,15 @@ end

## Usage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention the new config feature in The README. Sorry I missed that.


### ElasticSync.Schema
### ElasticSync.Index

Like Ecto, ElasticSync has a concept of a schema and a repo. Here's how you'd configure your schema:

```elixir
defmodule MyApp.Food do
defstruct [:id, :name]

use ElasticSync.Schema,
index: "foods",
type: "foods"
use ElasticSync.Index, index: "foods"

@doc """
Convert a struct to a plain ol' map. This will become our document.
Expand Down
6 changes: 0 additions & 6 deletions lib/elastic_sync.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,2 @@
defmodule ElasticSync do
import Tirexs.HTTP

def version do
{:ok, 200, info} = get("/")
info.version
end
end
146 changes: 101 additions & 45 deletions lib/elastic_sync/index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,81 @@ defmodule ElasticSync.Index do
alias Tirexs.HTTP
alias Tirexs.Resources.APIs, as: API

# TODO: Allow developers to control mappings here
def create(names, config \\ []) do
names
|> API.index
|> HTTP.put(config)
defstruct [
name: nil,
type: nil,
alias: nil,
config: {ElasticSync.Index, :default_config}
]

defmacro __using__(opts) do
name = Keyword.get(opts, :index)
type = Keyword.get(opts, :type, name)
config = Keyword.get(opts, :config)

quote do
def __elastic_sync__ do
alias ElasticSync.Index

%Index{}
|> Index.put(:name, unquote(name))
|> Index.put(:type, unquote(type))
|> Index.put(:config, unquote(config))
end
end
end

def remove(names) do
names
def put(_index, :name, nil) do
raise ArgumentError, """
You must provide an index name. For example:

use ElasticSync.Index, index: "foods"
"""
end
def put(index, :config, nil), do: index
def put(_index, :config, value) when not is_tuple(value) do
raise ArgumentError, """
The index config must be a tuple in the format.

use ElasticSync.Index, index: "foods", config: {Food, :index_config}
"""
end
def put(index, key, value), do: Map.put(index, key, value)

def default_config do
%{}
end

def put_alias(%__MODULE__{name: name, alias: alias_name} = index) do
ms = :os.system_time(:milli_seconds)
base_name = alias_name || name
next_name = base_name <> "-" <> to_string(ms)
%__MODULE__{index | name: next_name, alias: base_name}
end

def create(%__MODULE__{name: name, config: {mod, fun}}) do
name
|> API.index
|> HTTP.delete
|> HTTP.put(apply(mod, fun, []))
end

def remove(%__MODULE__{name: name}) do
do_remove(name)
end

def exists?(name) do
def exists?(%__MODULE__{name: name}) do
case name |> API.index |> HTTP.get do
{:ok, _, _} -> true
{:error, _, _} -> false
end
end

def refresh(name) do
def refresh(%__MODULE__{name: name}) do
name
|> API._refresh
|> HTTP.post
end

def transition(name, config, fun) do
transition(name, get_new_alias_name(name), config, fun)
end

@doc """
Useful for reindexing. It will:

Expand All @@ -41,61 +86,72 @@ defmodule ElasticSync.Index do
4. Set the newly created index to the alias.
5. Remove old indicies.
"""
def transition(name, alias_name, config, fun) do
with {:ok, _, _} <- create(alias_name, config),
:ok <- fun.(alias_name),
{:ok, _, _} <- refresh(alias_name),
{:ok, _, _} <- replace_alias(name, index: alias_name),
{:ok, _, _} <- remove_indicies(name, except: [alias_name]),
do: :ok
def transition(%__MODULE__{} = index, fun) do
with index <- put_alias(index),
{:ok, _, _} <- create(index),
:ok <- fun.(index),
{:ok, _, _} <- refresh(index),
{:ok, _, _} <- replace_alias(index),
{:ok, _, _} <- clean_indicies(index),
do: {:ok, index}
end

def load(%__MODULE__{name: name, type: type}, data) do
import Tirexs.Bulk

# Tirexs requires keyword lists...
data = Enum.map data, fn
doc when is_list(doc) -> doc
doc -> Enum.into(doc, [])
end

payload =
[index: name, type: type]
|> bulk(do: index(data))

Tirexs.bump!(payload)._bulk()
end

@doc """
Attach the alias name to the newly created index. Remove
all old aliases.
"""
def replace_alias(name, index: index_name) do
add = %{add: %{alias: name, index: index_name}}
def replace_alias(%__MODULE__{name: name, alias: alias_name}) do
add = %{add: %{alias: alias_name, index: name}}

remove =
name
alias_name
|> get_aliases()
|> Enum.map(fn a ->
%{remove: %{alias: name, index: a}}
%{remove: %{alias: alias_name, index: a}}
end)

API._aliases()
|> HTTP.post(%{actions: remove ++ [add]})
end

@doc """
Generate an index name ending with the current timestamp in
milliseconds from a name.
"""
def get_new_alias_name(name) do
ms = :os.system_time(:milli_seconds)
name <> "-" <> to_string(ms)
HTTP.post(API._aliases(), %{actions: remove ++ [add]})
end

def remove_indicies(name, except: except) do
name
def clean_indicies(%__MODULE__{name: name, alias: alias_name}) do
alias_name
|> get_aliases()
|> Enum.filter(&(not &1 in except))
|> Enum.filter(&(&1 != name))
|> case do
[] ->
{:ok, 200, %{acknowledged: true}}
names ->
remove(names)
do_remove(names)
end
end

defp get_aliases(name) do
re = ~r/^#{name}-\d{13}$/
defp do_remove(names) do
names
|> API.index
|> HTTP.delete
end

API._aliases()
|> HTTP.get()
defp get_aliases(name) do
API._aliases
|> HTTP.get
|> normalize_aliases()
|> Enum.filter(&Regex.match?(re, &1))
|> Enum.filter(&Regex.match?(~r/^#{name}-\d{13}$/, &1))
end

defp normalize_aliases({:error, _, _}), do: []
Expand Down
107 changes: 36 additions & 71 deletions lib/elastic_sync/repo.ex
Original file line number Diff line number Diff line change
@@ -1,122 +1,87 @@
defmodule ElasticSync.Repo do
import Tirexs.Bulk

alias ElasticSync.{Schema, Index}
alias ElasticSync.Index
alias Tirexs.HTTP
alias Tirexs.Resources.APIs, as: API

def search(schema, query, opts \\ [])
def search(schema, query, opts) when is_binary(query) do
def search(schema, query) when is_binary(query) do
schema
|> to_search_url(opts)
|> to_search_url()
|> HTTP.get(%{q: query})
end
def search(schema, [search: query] = dsl, opts) do
opts =
dsl
|> Keyword.take([:index, :type])
|> Keyword.merge(opts)

search(schema, query, opts)
def search(schema, [search: query]) do
search(schema, query)
end
def search(schema, query, opts) do
def search(schema, query) do
schema
|> to_search_url(opts)
|> to_search_url()
|> HTTP.post(query)
end

def insert(record, opts \\ []) do
def insert(record) do
record.__struct__
|> to_index_url(opts)
|> to_index_url()
|> HTTP.post(%{id: record.id}, to_document(record))
end

def insert!(record, opts \\ []) do
def insert!(record) do
record.__struct__
|> to_index_url(opts)
|> to_index_url()
|> HTTP.post!(%{id: record.id}, to_document(record))
end

def update(record, opts \\ []) do
def update(record) do
record
|> to_document_url(opts)
|> to_document_url()
|> HTTP.put(to_document(record))
end

def update!(record, opts \\ []) do
def update!(record) do
record
|> to_document_url(opts)
|> to_document_url()
|> HTTP.put!(to_document(record))
end

def delete(record, opts \\ []) do
def delete(record) do
record
|> to_document_url(opts)
|> HTTP.delete!
|> to_document_url()
|> HTTP.delete
end

def delete!(record, opts \\ []) do
def delete!(record) do
record
|> to_document_url(opts)
|> to_document_url()
|> HTTP.delete!
end

def insert_all(schema, records, opts \\ []) when is_list(records) do
index_name =
schema
|> Schema.merge(opts)
|> Schema.get(:index)

with {:ok, 200, response} <- bulk_index(schema, records, opts),
{:ok, 200, _} <- Index.refresh(index_name),
def insert_all(schema, records) when is_list(records) do
with {:ok, 200, response} <- load(schema.__elastic_sync__, records),
{:ok, 200, _} <- Index.refresh(schema.__elastic_sync__),
do: {:ok, 200, response}
end

def bulk_index(schema, records, opts \\ []) when is_list(records) do
data = Enum.map(records, &to_reindex_document/1)

payload =
schema
|> Schema.merge(opts)
|> Map.take([:index, :type])
|> Map.to_list()
|> bulk(do: index(data))
@doc false
def load(index, records) do
Index.load(index, Enum.map(records, &to_document/1))
end

Tirexs.bump!(payload)._bulk()
def to_search_url(schema) do
url_for(:_search, schema)
end

def to_search_url(schema, opts \\ []) do
url_for(:_search, schema, opts)
def to_index_url(schema) do
url_for(:index, schema)
end

def to_index_url(schema, opts \\ []) do
url_for(:index, schema, opts)
def to_document_url(record) do
url_for(:doc, record.__struct__, [record.id])
end

def to_document_url(record, opts \\ []) do
url_for(:doc, record.__struct__, opts, [record.id])
defp url_for(fun_name, schema, paths \\ []) do
index = schema.__elastic_sync__
apply(API, fun_name, [index.name, index.type] ++ paths)
end

def to_document(record) do
record.__struct__.to_search_document(record)
end

defp url_for(fun_name, schema, opts, paths \\ []) do
schema = Schema.merge(schema, opts)
index = Schema.get(schema, :index)
type = Schema.get(schema, :type)
apply(API, fun_name, [index, type] ++ paths)
end

# Tirexs only accepts a list for bulk
defp to_reindex_document(record) do
document = to_document(record)

cond do
is_list(document) ->
document
true ->
Enum.into(document, [])
end
end
end
Loading