-
Notifications
You must be signed in to change notification settings - Fork 3
/
foreign_data_wrapper.ex
145 lines (120 loc) · 4.4 KB
/
foreign_data_wrapper.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
case Code.ensure_loaded(Ecto) do
{:module, _} ->
defmodule Surgex.DataPipe.ForeignDataWrapper do
@moduledoc """
Configures a PostgreSQL Foreign Data Wrapper linkage between two repos.
Specifically, it executes the following steps:
- adds postgres_fdw extension to local repo
- (re)creates server and user mapping based on current remote repo's config
- copies remote repo's schema to local repo (named with underscored repo module name)
Everything is executed in one transaction, so it's safe to use while existing transactions that
depend on connection to foreign repo and its schema are running in the system (based on
https://robots.thoughtbot.com/postgres-foreign-data-wrapper).
## Usage
Refer to `Surgex.DataPipe` for a complete data pipe example.
"""
require Logger
alias Ecto.Adapters.SQL
@doc """
Links source repo to a given foreign repo.
"""
def init(source_repo, foreign_repo) do
local_name = source_repo |> Module.split() |> List.last()
server = schema = build_foreign_alias(foreign_repo)
config = foreign_repo.config
Logger.info(fn -> "Preparing foreign data wrapper at #{local_name}.#{server}..." end)
servers_count =
SQL.query!(
source_repo,
"select 1 from pg_foreign_server where srvname = '#{server}'"
)
script =
case servers_count.num_rows do
0 -> init_script(server, schema, config)
_ -> update_script(server, schema, config)
end
{:ok, _} =
apply(source_repo, :transaction, [
fn ->
Enum.each(script, fn command ->
SQL.query!(source_repo, command)
end)
end
])
end
def update_script(server, schema, config) do
server_opts = build_server_opts(config, "SET")
user_opts = build_user_opts(config, "SET")
[
"ALTER SERVER #{server}" <> server_opts,
"ALTER USER MAPPING FOR CURRENT_USER SERVER #{server}" <> user_opts,
"DROP SCHEMA IF EXISTS #{schema} CASCADE",
"CREATE SCHEMA #{schema}",
"IMPORT FOREIGN SCHEMA public FROM SERVER #{server} INTO #{schema}"
]
end
def init_script(server, schema, config) do
server_opts = build_server_opts(config)
user_opts = build_user_opts(config)
[
"CREATE EXTENSION IF NOT EXISTS postgres_fdw",
"DROP SERVER IF EXISTS #{server} CASCADE",
"CREATE SERVER #{server} FOREIGN DATA WRAPPER postgres_fdw" <> server_opts,
"CREATE USER MAPPING FOR CURRENT_USER SERVER #{server}" <> user_opts,
"DROP SCHEMA IF EXISTS #{schema}",
"CREATE SCHEMA #{schema}",
"IMPORT FOREIGN SCHEMA public FROM SERVER #{server} INTO #{schema}"
]
end
@doc """
Puts a foreign repo prefix (aka. schema) in a given Repo query.
After calling this function, a given query will target tables from the previously linked repo
instead of Repo.
"""
def prefix(query = %{}, foreign_repo) do
Map.put(query, :prefix, build_foreign_alias(foreign_repo))
end
def prefix(schema, foreign_repo) do
import Ecto.Query, only: [from: 1]
prefix(from(schema), foreign_repo)
end
defp build_server_opts(config, command \\ "") do
build_opts(
[
{"host", Keyword.get(config, :hostname)},
{"dbname", Keyword.get(config, :database)},
{"port", Keyword.get(config, :port)}
],
command
)
end
defp build_user_opts(config, command \\ "") do
build_opts(
[
{"user", Keyword.get(config, :username)},
{"password", Keyword.get(config, :password)}
],
command
)
end
defp build_opts(mapping, command) do
opts_string =
mapping
|> Enum.filter(fn {_, value} -> value end)
|> Enum.map(fn {option, value} -> "#{command} #{option} '#{value}'" end)
|> Enum.join(", ")
case opts_string do
"" -> ""
_ -> " OPTIONS (#{opts_string})"
end
end
defp build_foreign_alias(repo) do
repo
|> Module.split()
|> List.last()
|> Macro.underscore()
end
end
_ ->
nil
end