Skip to content

Commit

Permalink
export_uri file:// supported
Browse files Browse the repository at this point in the history
  • Loading branch information
Ole Rixmann committed Sep 9, 2015
1 parent 446f9da commit 2323e28
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 41 deletions.
9 changes: 6 additions & 3 deletions lib/ecto_export.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ defmodule Ecto.Export do
* `repo` - mandatory, the Ecto.Repo which contains the models to be exported
* `models` - mandatory, list of modules which implement Ecto.Model
* `options` - optional, Map whith the following options
* `"filename"` - mandatory, string, the filename to export/import from
* `"export_uri"` - mandatory, string, the uri to export/import from, supported protocols:
* `"file://"` - local filesystem
* `"zmq-tcp://"` - zero mq endpoint
* `"http://"` - http endpoint
* `"import"` - boolean, if true starts an import - default is false
* `"formatter"` - atom, defines which module to use for string conversion - default is `Ecto.Export.Formatter.JSON`
Expand All @@ -33,13 +36,13 @@ defmodule Ecto.Export do
def create(repo, models, options), do: Dispatcher.start_export(repo, models, options)

@doc """
Delete job.
Delete job.
Job process will be stopped if needed and job description will be removed from dispatcher.
## Params
* `job` - id or pid of job to stop
## Result
`:ok`
Expand Down
19 changes: 9 additions & 10 deletions lib/ecto_export/formatter/json.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
defmodule Ecto.Export.Formatter.JSON do

def export(filehandle, entries) do
IO.write(filehandle, <<"[">>)
export_entries(Enum.take(entries, 1), Stream.drop(entries, 1), filehandle)
IO.write(filehandle, <<"]">>)
:ok
def export(entries) do
case Enum.take(entries, 1) do
[] -> [<<"[]">>]
first_entry ->
formatted_entries = Stream.map Stream.drop(entries, 1), &export_entry/1
prefixed_formatted = Stream.concat [<<"[">>, :jsx.encode(first_entry)], formatted_entries
Stream.concat prefixed_formatted, [<<"]">>]
end
end

def import(repo, string_stream) do
Expand Down Expand Up @@ -32,11 +35,7 @@ defmodule Ecto.Export.Formatter.JSON do

defp string_to_elixir_atom(str), do: :erlang.list_to_atom(String.to_char_list(str))

defp export_entries([], _, _), do: nil
defp export_entries([first], entries, filehandle) do
IO.write(filehandle, :jsx.encode(first))
Enum.reduce entries, nil, fn(entry, _) -> IO.write(filehandle, <<",\n">> <> :jsx.encode(entry)) end
end
defp export_entry(entry), do: <<",\n">> <> :jsx.encode(entry)

def init([repo]) do
{{repo, 0}, :jsx_to_term.start_term([:return_maps])}
Expand Down
24 changes: 24 additions & 0 deletions lib/ecto_export/stream/file.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Ecto.Export.Stream.File do
alias Ecto.Export.Dispatcher

def send_stream formatted_entries, filename do
filehandle = File.open! filename, [:write, :utf8]
Enum.map formatted_entries, &IO.write(filehandle, &1)
File.close filehandle
end

def receive_stream filename do
Stream.resource(
fn -> {File.open!(filename, [:read, :utf8]), 0} end,
fn {filehandle, linecount} ->
case IO.read(filehandle, :line) do
:eof -> {:halt, filehandle}
{:error, _} = _err -> {:halt, filehandle}
data ->
Dispatcher.progress_update({:read_file, linecount})
{[data], {filehandle, linecount + 1}}
end
end,
fn filehandle -> File.close(filehandle) end)
end
end
40 changes: 17 additions & 23 deletions lib/ecto_export/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ defmodule Ecto.Export.Worker do
@default_batch_size 1000

def export_import(repo, modules, options \\ %{}) do
filename = options["filename"]
export_uri = options["export_uri"]
cond do
is_nil(filename) -> {:error, :no_filename}
options["import"] -> import(repo, options, filename)
true -> export(repo, modules, options, filename)
is_nil(export_uri) -> {:error, :no_export_uri}
options["import"] -> import(repo, options, export_uri)
true -> export(repo, modules, options, export_uri)
end
end

Expand All @@ -19,32 +19,24 @@ defmodule Ecto.Export.Worker do
end
end

defp import(repo, options, filename),
do: import_stream(filename) |> do_import(repo, options)
defp import(repo, options, export_uri),
do: import_stream(export_uri) |> do_import(repo, options)

defp export(repo, modules, options, filename) do
defp export(repo, modules, options, export_uri) do
formatter = options[:formatter] || Ecto.Export.Formatter.JSON
{streamer, cut_uri} = find_stream_module export_uri
ordered_modules = order_modules(modules)
entries = export_stream repo, ordered_modules
filehandle = File.open! filename, [:write, :utf8]
formatter.export(filehandle, entries)
File.close filehandle

formatted_entries = formatter.export(entries)
streamer.send_stream formatted_entries, cut_uri

Dispatcher.done # XXX: I don't sure that it is needed
end

def import_stream(filename) do
Stream.resource(
fn -> {File.open!(filename, [:read, :utf8]), 0} end,
fn {filehandle, linecount} ->
case IO.read(filehandle, :line) do
:eof -> {:halt, filehandle}
{:error, _} = _err -> {:halt, filehandle}
data ->
Dispatcher.progress_update({:read_file, linecount})
{[data], {filehandle, linecount + 1}}
end
end,
fn filehandle -> File.close(filehandle) end)
def import_stream(export_uri) do
{streamer, cut_uri} = find_stream_module export_uri
streamer.receive_stream cut_uri
end

defp export_stream(repo, modules) do
Expand Down Expand Up @@ -112,4 +104,6 @@ defmodule Ecto.Export.Worker do

defp batch_size, do: Application.get_env(:ecto_export, :batch_size, @default_batch_size)

defp find_stream_module("file://" <> rest), do: {Ecto.Export.Stream.File, rest}

end
10 changes: 5 additions & 5 deletions test/ecto_export_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ defmodule EctoExportTest do
test "export import" do
insert_data

assert {:ok, export_id} = Ecto.Export.create(Repo, @models, %{"filename" => "export.json"})
assert {:ok, export_id} = Ecto.Export.create(Repo, @models, %{"export_uri" => "file://export.json"})

assert :ok = wait_for_job(export_id)

delete_all

assert {:error, :not_found} == Ecto.Export.check(100)

assert {:ok, import_id} = Ecto.Export.create(Repo, @models, %{"filename" => "export.json", "import" => true})
assert {:ok, import_id} = Ecto.Export.create(Repo, @models, %{"export_uri" => "file://export.json", "import" => true})

assert :ok = wait_for_job(import_id)

Expand All @@ -51,11 +51,11 @@ defmodule EctoExportTest do
test "delete" do
insert_data

assert {:ok, id} = Ecto.Export.create(Repo, @models, %{"filename" => "export.json"})
assert {:ok, id} = Ecto.Export.create(Repo, @models, %{"export_uri" => "file://export.json"})
assert :ok = Ecto.Export.stop(id)
assert {:ok, :job_finished} == Ecto.Export.check(id)

assert {:ok, id} = Ecto.Export.create(Repo, @models, %{"filename" => "export.json"})
assert {:ok, id} = Ecto.Export.create(Repo, @models, %{"export_uri" => "file://export.json"})
:timer.sleep(10)
assert :ok = Ecto.Export.stop(id)
assert {:ok, :job_finished} == Ecto.Export.check(id)
Expand All @@ -69,7 +69,7 @@ defmodule EctoExportTest do
filename = "./abc/export.json"

# it will raise exception
assert {:ok, id} = Ecto.Export.create(Repo, @models, %{"filename" => filename})
assert {:ok, id} = Ecto.Export.create(Repo, @models, %{"export_uri" => "file://" <> filename})
assert :ok = wait_for_job(id)
assert false == File.exists?(filename)
assert :ok = Ecto.Export.stop(id)
Expand Down

0 comments on commit 2323e28

Please sign in to comment.