Skip to content

Commit

Permalink
Add batched SQL for insert/update operations
Browse files Browse the repository at this point in the history
This introduce a new way of doing "migration" of data based on the
operation table. Before, we simply executed the SQL query as the
migration were run. Like for a "new" operation, we did an insert of a
"transactions" row and an update to the "operations" table _for each
entry in the synced file._ For a file containing 1000 keys, we did a
total of 2000 SQL queries.

After this commit, we only do 2 SQL queries (well 4 since we batch the
SQL params by 500). By aggregating operation like {:update_all,
operation.id, %{synced: true}}, we are able to batch the operations
efficiently.

This results in a much faster sync/add translations/correct/uncorrect
operations.
  • Loading branch information
simonprev committed Aug 29, 2022
1 parent a492e83 commit f4bad30
Show file tree
Hide file tree
Showing 22 changed files with 290 additions and 98 deletions.
8 changes: 7 additions & 1 deletion lib/accent/auth/user_remote/persister.ex
Expand Up @@ -15,6 +15,7 @@ defmodule Accent.UserRemote.Persister do
alias Accent.AuthProvider
alias Accent.Repo
alias Accent.User, as: RepoUser
alias Accent.UserRemote.TokenGiver
alias Accent.UserRemote.User, as: FetchedUser
alias Ecto.Changeset

Expand Down Expand Up @@ -44,7 +45,12 @@ defmodule Accent.UserRemote.Persister do
end

defp create_provider(user, name, uid), do: Repo.insert!(%AuthProvider{name: name, uid: uid, user_id: user.id})
defp create_user(fetched_user), do: Repo.insert!(%RepoUser{email: fetched_user.email, fullname: fetched_user.fullname, picture_url: fetched_user.picture_url})

defp create_user(fetched_user) do
user = Repo.insert!(%RepoUser{email: fetched_user.email, fullname: fetched_user.fullname, picture_url: fetched_user.picture_url})
TokenGiver.grant_global_token(user)
user
end

defp update_user(user, fetched_user) do
user
Expand Down
8 changes: 8 additions & 0 deletions lib/accent/auth/user_remote/token_giver.ex
Expand Up @@ -7,6 +7,14 @@ defmodule Accent.UserRemote.TokenGiver do
create_token(user)
end

def grant_global_token(user) do
Repo.insert!(%Accent.AccessToken{
user_id: user.id,
global: true,
token: Accent.Utils.SecureRandom.urlsafe_base64(70)
})
end

defp invalidate_tokens(user) do
user
|> Ecto.assoc(:private_access_tokens)
Expand Down
4 changes: 3 additions & 1 deletion lib/accent/schemas/operation.ex
Expand Up @@ -20,7 +20,9 @@ defmodule Accent.Operation do
:version_id,
:project_id,
:stats,
:previous_translation
:previous_translation,
:inserted_at,
:updated_at
]

schema "operations" do
Expand Down
16 changes: 14 additions & 2 deletions lib/graphql/resolvers/translation.ex
Expand Up @@ -40,7 +40,13 @@ defmodule Accent.GraphQL.Resolvers.Translation do
|> (&fn -> BasePersister.execute(&1) end).()
|> Repo.transaction()
|> case do
{:ok, {_context, [translation]}} ->
{:ok, {_context, _}} ->
translation =
Translation
|> Query.where(id: ^translation.id)
|> Repo.one()
|> Repo.preload(:revision)

{:ok, %{translation: translation, errors: nil}}

{:error, _reason} ->
Expand All @@ -57,7 +63,13 @@ defmodule Accent.GraphQL.Resolvers.Translation do
|> (&fn -> BasePersister.execute(&1) end).()
|> Repo.transaction()
|> case do
{:ok, {_context, [translation]}} ->
{:ok, {_context, _}} ->
translation =
Translation
|> Query.where(id: ^translation.id)
|> Repo.one()
|> Repo.preload(:revision)

{:ok, %{translation: translation, errors: nil}}

{:error, _reason} ->
Expand Down
1 change: 1 addition & 0 deletions lib/movement/builders/new_slave.ex
Expand Up @@ -32,6 +32,7 @@ defmodule Movement.Builders.NewSlave do
value_type: translation.value_type,
plural: translation.plural,
locked: translation.locked,
translation_id: translation.id,
placeholders: translation.placeholders,
options: assigns.new_slave_options
})
Expand Down
21 changes: 7 additions & 14 deletions lib/movement/ecto_migration_helper.ex
@@ -1,18 +1,11 @@
defmodule Movement.EctoMigrationHelper do
alias Accent.Repo
alias Movement.Migration
def update(struct, params), do: {:update, {struct, params}}

@doc """
Update given model by merging the existing parameters and the arguments.
"""
@spec update(model :: map, params :: map()) :: Migration.t()
def update(model, params) do
model
|> model.__struct__.changeset(params)
|> Repo.update!()
end
def insert(struct), do: {:insert, struct}

def insert(model) do
Repo.insert!(model)
end
def insert_all(schema, struct), do: {:insert_all, {schema, struct}}

def update_all_dynamic(struct, types, fields, values), do: {:update_all_dynamic, {struct.__struct__, struct.id, types, fields, values}}

def update_all(struct, params), do: {:update_all, {struct.__struct__, struct.id, params}}
end
1 change: 1 addition & 0 deletions lib/movement/entries_commit_processor.ex
Expand Up @@ -26,6 +26,7 @@ defmodule Movement.EntriesCommitProcessor do
value_type: entry.value_type,
plural: entry.plural,
locked: entry.locked,
translation_id: current_translation && current_translation.id,
revision_id: Map.get(assigns[:revision], :id),
version_id: assigns[:version] && Map.get(assigns[:version], :id),
placeholders: entry.placeholders
Expand Down
1 change: 1 addition & 0 deletions lib/movement/mappers/operation.ex
Expand Up @@ -15,6 +15,7 @@ defmodule Movement.Mappers.Operation do
revision_id: Map.get(suggested_translation, :revision_id),
document_id: Map.get(suggested_translation, :document_id),
version_id: Map.get(suggested_translation, :version_id),
translation_id: Map.get(suggested_translation, :translation_id),
previous_translation: PreviousTranslation.from_translation(current_translation),
placeholders: suggested_translation.placeholders
}
Expand Down
2 changes: 1 addition & 1 deletion lib/movement/migration.ex
@@ -1,5 +1,5 @@
defmodule Movement.Migration do
@type t :: map

@callback call(atom, map) :: t()
@callback call(atom, map) :: [t()] | t()
end
20 changes: 12 additions & 8 deletions lib/movement/migration/conflict.ex
Expand Up @@ -6,17 +6,21 @@ defmodule Movement.Migration.Conflict do
def call(:correct, operation) do
Accent.OperationBatcher.batch(operation)

update(operation.translation, %{
corrected_text: operation.text,
conflicted: false
})
update_all_dynamic(
operation.translation,
[:text, :boolean],
[:corrected_text, :conflicted],
[operation.text, false]
)
end

def call(:uncorrect, operation) do
update(operation.translation, %{
conflicted_text: operation.previous_translation && operation.previous_translation.conflicted_text,
conflicted: true
})
update_all_dynamic(
operation.translation,
[:text, :boolean],
[:conflicted_text, :conflicted],
[operation.previous_translation && operation.previous_translation.conflicted_text, true]
)
end

def call(:on_corrected, operation) do
Expand Down
18 changes: 12 additions & 6 deletions lib/movement/migration/rollback.ex
Expand Up @@ -4,17 +4,23 @@ defmodule Movement.Migration.Rollback do
import Movement.EctoMigrationHelper

def call(:new, operation) do
update(operation, %{rollbacked: true})
update(operation.translation, %{removed: true})
[
update_all(operation, %{rollbacked: true}),
update_all(operation.translation, %{removed: true})
]
end

def call(:remove, operation) do
update(operation, %{rollbacked: true})
update(operation.translation, %{removed: false})
[
update_all(operation, %{rollbacked: true}),
update_all(operation.translation, %{removed: false})
]
end

def call(:restore, operation) do
update(operation, %{rollbacked: true})
update(operation.translation, Map.from_struct(operation.previous_translation))
[
update_all(operation, %{rollbacked: true}),
update(operation.translation, Map.from_struct(operation.previous_translation))
]
end
end
62 changes: 36 additions & 26 deletions lib/movement/migration/translation.ex
Expand Up @@ -4,19 +4,18 @@ defmodule Movement.Migration.Translation do
import Movement.EctoMigrationHelper

alias Accent.{Operation, Translation}
alias Ecto.UUID

def call(:update_proposed, operation) do
operation.translation
|> update(%{
update(operation.translation, %{
proposed_text: operation.text
})
end

def call(:update, operation) do
Accent.OperationBatcher.batch(operation)

operation.translation
|> update(%{
update(operation.translation, %{
value_type: operation.value_type,
corrected_text: operation.text,
conflicted_text: operation.previous_translation && operation.previous_translation.corrected_text,
Expand All @@ -25,25 +24,25 @@ defmodule Movement.Migration.Translation do
end

def call(:remove, operation) do
update(operation.translation, %{removed: true})
update_all(operation.translation, %{removed: true})
end

def call(:renew, operation) do
new_translation = %{
proposed_text: operation.text,
corrected_text: operation.text,
conflicted: true,
removed: false
}

update(operation, %{rollbacked: false})
update(operation.translation, new_translation)
[
update_all(operation, %{rollbacked: false}),
update_all_dynamic(
operation.translation,
[:text, :text, :boolean, :boolean],
[:proposed_text, :corrected_text, :conflicted, :removed],
[operation.text, operation.text, true, false]
)
]
end

def call(:new, operation) do
id = Ecto.UUID.generate()
id = UUID.generate()

translation = %Translation{
translation = %{
id: id,
key: operation.key,
proposed_text: operation.text,
Expand All @@ -58,17 +57,22 @@ defmodule Movement.Migration.Translation do
revision_id: operation.revision_id,
document_id: operation.document_id,
version_id: operation.version_id,
placeholders: operation.placeholders
source_translation_id: operation.translation_id,
placeholders: operation.placeholders,
inserted_at: DateTime.utc_now(),
updated_at: DateTime.utc_now()
}

insert(translation)
update(operation, %{translation_id: id})
[
insert_all(Translation, translation),
update_all_dynamic(operation, [:uuid], [:translation_id], [UUID.dump!(id)])
]
end

def call(:version_new, operation) do
id = Ecto.UUID.generate()
id = UUID.generate()

translation = %Translation{
translation = %{
id: id,
key: operation.key,
proposed_text: operation.text,
Expand All @@ -82,17 +86,23 @@ defmodule Movement.Migration.Translation do
document_id: operation.document_id,
version_id: operation.version_id,
source_translation_id: operation.translation_id,
placeholders: operation.placeholders
placeholders: operation.placeholders,
inserted_at: DateTime.utc_now(),
updated_at: DateTime.utc_now()
}

version_operation = Operation.copy(operation, %{action: "add_to_version", translation_id: id})

insert(translation)
insert(version_operation)
[
insert_all(Translation, translation),
insert(version_operation)
]
end

def call(:restore, operation) do
update(operation, %{rollbacked: false})
update(operation.translation, Map.from_struct(operation.previous_translation))
[
update_all(operation, %{rollbacked: false}),
update(operation.translation, Map.from_struct(operation.previous_translation))
]
end
end

0 comments on commit f4bad30

Please sign in to comment.