Skip to content

Commit

Permalink
Add alternate version of mysql upsert
Browse files Browse the repository at this point in the history
This one works by issuing select and then insert or update or skip depending
on what select returns. We use this on mysql 5.7.26 and 8.0.20 where
previous implementation using 'replace' or 'on conflict update' can cause
excessive deadlocks.
  • Loading branch information
prefiks committed Jun 7, 2023
1 parent 3eecf4a commit bb8e892
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 2 deletions.
31 changes: 30 additions & 1 deletion src/ejabberd_sql.erl
Expand Up @@ -73,7 +73,7 @@
-record(state,
{db_ref :: undefined | pid(),
db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql,
db_version :: undefined | non_neg_integer(),
db_version :: undefined | non_neg_integer() | {non_neg_integer(), atom(), non_neg_integer()},
reconnect_count = 0 :: non_neg_integer(),
host :: binary(),
pending_requests :: p1_queue:queue(),
Expand Down Expand Up @@ -1123,9 +1123,38 @@ get_db_version(#state{db_type = pgsql} = State) ->
?WARNING_MSG("Error getting pgsql version: ~p", [Res]),
State
end;
get_db_version(#state{db_type = mysql} = State) ->
case mysql_to_odbc(p1_mysql_conn:squery(State#state.db_ref,
[<<"select version();">>], self(),
[{timeout, 5000},
{result_type, binary}])) of
{selected, _, [SVersion]} ->
case re:run(SVersion, <<"(\\d+)\\.(\\d+)(?:\\.(\\d+))?(?:-([^-]*))?">>,
[{capture, all_but_first, binary}]) of
{match, [V1, V2, V3, Type]} ->
V = ((bin_to_int(V1)*1000)+bin_to_int(V2))*1000+bin_to_int(V3),
TypeA = binary_to_atom(Type, utf8),
Flags = case TypeA of
'MariaDB' -> 0;
_ when V >= 5007026 andalso V < 8000000 -> 1;
_ when V >= 8000020 -> 1;
_ -> 0
end,
State#state{db_version = {V, TypeA, Flags}};
_ ->
?WARNING_MSG("Error parsing mysql version: ~p", [SVersion]),
State
end;
Res ->
?WARNING_MSG("Error getting mysql version: ~p", [Res]),
State
end;
get_db_version(State) ->
State.

bin_to_int(<<>>) -> 0;
bin_to_int(V) -> binary_to_integer(V).

log(Level, Format, Args) ->
case Level of
debug -> ?DEBUG(Format, Args);
Expand Down
66 changes: 65 additions & 1 deletion src/ejabberd_sql_pt.erl
Expand Up @@ -567,7 +567,6 @@ parse_upsert_field1([$= | S], Acc, ParamPos, Loc) ->
parse_upsert_field1([C | S], Acc, ParamPos, Loc) ->
parse_upsert_field1(S, [C | Acc], ParamPos, Loc).


make_sql_upsert(Table, ParseRes, Pos) ->
check_upsert(ParseRes, Pos),
erl_syntax:fun_expr(
Expand All @@ -587,6 +586,11 @@ make_sql_upsert(Table, ParseRes, Pos) ->
erl_syntax:integer(90100))],
[make_sql_upsert_pgsql901(Table, ParseRes),
erl_syntax:atom(ok)]),
erl_syntax:clause(
[erl_syntax:atom(mysql), erl_syntax:tuple([erl_syntax:underscore(), erl_syntax:underscore(), erl_syntax:integer(1)])],
[],
[make_sql_upsert_mysql_select(Table, ParseRes),
erl_syntax:atom(ok)]),
erl_syntax:clause(
[erl_syntax:atom(mysql), erl_syntax:underscore()],
[],
Expand Down Expand Up @@ -682,6 +686,66 @@ make_sql_upsert_insert(Table, ParseRes) ->
]),
State.

make_sql_upsert_select(Table, ParseRes) ->
{Fields0, Where0} =
lists:foldl(
fun({Field, key, ST}, {Fie, Whe}) ->
{Fie, [ST#state{
'query' = [{str, Field}, {str, "="}] ++ ST#state.'query'}] ++ Whe};
({Field, {true}, ST}, {Fie, Whe}) ->
{[ST#state{
'query' = [{str, Field}, {str, "="}] ++ ST#state.'query'}] ++ Fie, Whe};
(_, Acc) ->
Acc
end, {[], []}, ParseRes),
Fields = join_states(Fields0, " AND "),
Where = join_states(Where0, " AND "),
State =
concat_states(
[#state{'query' = [{str, "SELECT "}],
res_vars = [erl_syntax:variable("__VSel")],
res = [erl_syntax:application(
erl_syntax:atom(ejabberd_sql),
erl_syntax:atom(to_bool),
[erl_syntax:variable("__VSel")])]},
Fields,
#state{'query' = [{str, " FROM "}, {str, Table}, {str, " WHERE "}]},
Where
]),
State.

make_sql_upsert_mysql_select(Table, ParseRes) ->
Select = make_sql_query(make_sql_upsert_select(Table, ParseRes)),
Insert = make_sql_query(make_sql_upsert_insert(Table, ParseRes)),
Update = make_sql_query(make_sql_upsert_update(Table, ParseRes)),
erl_syntax:case_expr(
erl_syntax:application(
erl_syntax:atom(ejabberd_sql),
erl_syntax:atom(sql_query_t),
[Select]),
[erl_syntax:clause(
[erl_syntax:tuple([erl_syntax:atom(selected), erl_syntax:list([])])],
none,
[erl_syntax:application(
erl_syntax:atom(ejabberd_sql),
erl_syntax:atom(sql_query_t),
[Insert])]),
erl_syntax:clause(
[erl_syntax:abstract({selected, [{true}]})],
[],
[erl_syntax:atom(ok)]),
erl_syntax:clause(
[erl_syntax:tuple([erl_syntax:atom(selected), erl_syntax:underscore()])],
none,
[erl_syntax:application(
erl_syntax:atom(ejabberd_sql),
erl_syntax:atom(sql_query_t),
[Update])]),
erl_syntax:clause(
[erl_syntax:variable("__SelectRes")],
none,
[erl_syntax:variable("__SelectRes")])]).

make_sql_upsert_mysql(Table, ParseRes) ->
Vals =
lists:map(
Expand Down

0 comments on commit bb8e892

Please sign in to comment.