Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Numeric IDs migration #2762

Merged
merged 8 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions lib/plausible/clickhouse_event_v2.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
defmodule Plausible.ClickhouseEventV2 do
@moduledoc """
Event schema for when NumericIDs migration is complete
"""
use Ecto.Schema
import Ecto.Changeset

@primary_key false
schema "events_v2" do
field :name, :string
field :site_id, Ch.Types.UInt64
field :hostname, :string
field :pathname, :string
field :user_id, Ch.Types.UInt64
field :session_id, Ch.Types.UInt64
field :timestamp, :naive_datetime

field :referrer, :string
field :referrer_source, :string
field :utm_medium, :string
field :utm_source, :string
field :utm_campaign, :string
field :utm_content, :string
field :utm_term, :string

field :country_code, Ch.Types.FixedString, size: 2
field :subdivision1_code, :string
field :subdivision2_code, :string
field :city_geoname_id, Ch.Types.UInt32

field :screen_size, :string
field :operating_system, :string
field :operating_system_version, :string
field :browser, :string
field :browser_version, :string

field :"meta.key", {:array, :string}
field :"meta.value", {:array, :string}
field :transferred_from, :string
end

def new(attrs) do
%__MODULE__{}
|> cast(
attrs,
[
:name,
:site_id,
:hostname,
:pathname,
:user_id,
:timestamp,
:operating_system,
:operating_system_version,
:browser,
:browser_version,
:referrer,
:referrer_source,
:utm_medium,
:utm_source,
:utm_campaign,
:utm_content,
:utm_term,
:country_code,
:subdivision1_code,
:subdivision2_code,
:city_geoname_id,
:screen_size,
:"meta.key",
:"meta.value"
],
empty_values: [nil, ""]
)
|> validate_required([:name, :site_id, :hostname, :pathname, :user_id, :timestamp])
end
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Plausible.ClickhouseSession do
use Ecto.Schema
import Ecto.Changeset

@primary_key false
schema "sessions" do
Expand Down Expand Up @@ -47,35 +46,4 @@ defmodule Plausible.ClickhouseSession do
def random_uint64() do
:crypto.strong_rand_bytes(8) |> :binary.decode_unsigned()
end

def changeset(session, attrs) do
session
|> cast(attrs, [
:hostname,
:domain,
:entry_page,
:exit_page,
:fingerprint,
:start,
:length,
:is_bounce,
:operating_system,
:operating_system_version,
:browser_version,
:referrer,
:referrer_source,
:utm_medium,
:utm_source,
:utm_campaign,
:utm_content,
:utm_term,
:country_code,
:country_geoname_id,
:subdivision1_code,
:subdivision2_code,
:city_geoname_id,
:screen_size
])
|> validate_required([:hostname, :domain, :fingerprint, :is_bounce, :start])
end
end
52 changes: 52 additions & 0 deletions lib/plausible/clickhouse_session_v2.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Plausible.ClickhouseSessionV2 do
@moduledoc """
Session schema for when NumericIDs migration is complete
"""
use Ecto.Schema

@primary_key false
schema "sessions_v2" do
field :hostname, :string
field :site_id, Ch.Types.UInt64
field :user_id, Ch.Types.UInt64
field :session_id, Ch.Types.UInt64

field :start, :naive_datetime
field :duration, Ch.Types.UInt32
field :is_bounce, :boolean
field :entry_page, :string
field :exit_page, :string
field :pageviews, Ch.Types.Int32
field :events, Ch.Types.Int32
field :sign, Ch.Types.Int8

field :"entry_meta.key", {:array, :string}
field :"entry_meta.value", {:array, :string}

field :utm_medium, :string
field :utm_source, :string
field :utm_campaign, :string
field :utm_content, :string
field :utm_term, :string
field :referrer, :string
field :referrer_source, :string

field :country_code, Ch.Types.FixedString, size: 2
field :subdivision1_code, :string
field :subdivision2_code, :string
field :city_geoname_id, Ch.Types.UInt32

field :screen_size, :string
field :operating_system, :string
field :operating_system_version, :string
field :browser, :string
field :browser_version, :string
field :timestamp, :naive_datetime

field :transferred_from, :string
end

def random_uint64() do
:crypto.strong_rand_bytes(8) |> :binary.decode_unsigned()
end
end
64 changes: 64 additions & 0 deletions lib/plausible/data_migration.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Plausible.DataMigration do
@moduledoc """
Base module for coordinated Clickhouse data migrations
run via remote shell or otherwise (TBD).
"""

defmacro __using__(opts) do
dir = Keyword.fetch!(opts, :dir)
repo = Keyword.get(opts, :repo, Plausible.DataMigration.Repo)

quote bind_quoted: [dir: dir, repo: repo] do
@dir dir
@repo repo

def run_sql_confirm(name, assigns \\ []) do
query = unwrap(name, assigns)

confirm("Execute?", fn -> do_run(name, query) end)
end

def confirm(message, func) do
prompt = IO.ANSI.white() <> message <> " [Y/n]: " <> IO.ANSI.reset()

if String.downcase(String.trim(IO.gets(prompt))) == "n" do
IO.puts(" #{IO.ANSI.cyan()}Skipped.#{IO.ANSI.reset()}")
{:ok, :skip}
else
func.()
end
end

defp unwrap(name, assigns) do
"priv/data_migrations"
|> Path.join(@dir)
|> Path.join("sql")
|> Path.join(name <> ".sql.eex")
|> EEx.eval_file(assigns: assigns)
end

def run_sql(name, assigns \\ []) do
query = unwrap(name, assigns)
do_run(name, query)
end

defp do_run(name, query) do
{:ok, res} = @repo.query(query, [], timeout: :infinity)
IO.puts(" #{IO.ANSI.yellow()}#{name} #{IO.ANSI.green()}Done!#{IO.ANSI.reset()}\n")
IO.puts(String.duplicate("-", 78))
{:ok, res}
end

defp unwrap_with_io(name, assigns) do
IO.puts("#{IO.ANSI.yellow()}Running #{name}#{IO.ANSI.reset()}")
query = unwrap(name, assigns)

IO.puts("""
-> Query: #{IO.ANSI.blue()}#{String.trim(query)}#{IO.ANSI.reset()}
""")

query
end
end
end
end
141 changes: 141 additions & 0 deletions lib/plausible/data_migration/numeric_ids.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
defmodule Plausible.DataMigration.NumericIDs do
@moduledoc """
Numeric IDs migration, SQL files available at:
priv/data_migrations/NumericIDs/sql
"""
use Plausible.DataMigration, dir: "NumericIDs"

@table_settings "SETTINGS index_granularity = 8192, storage_policy = 'tiered'"

# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
def run(opts \\ []) do
interactive? = Keyword.get(opts, :interactive?, true)

db_url =
System.get_env(
"NUMERIC_IDS_MIGRATION_DB_URL",
Application.get_env(:plausible, Plausible.IngestRepo)[:url]
)

max_threads =
"NUMERIC_IDS_MIGRATION_MAX_THREADS" |> System.get_env("16") |> String.to_integer()

# TBD: There's most likely a bug in Clickhouse defining Postgres dictionaries,
# we'll use a static URL for now
dict_url = Keyword.get(opts, :dict_url) || System.get_env("DOMAINS_DICT_URL") || ""

dict_password =
Keyword.get(opts, :dict_password) || System.get_env("DOMAINS_DICT_PASSWORD") || ""

table_settings =
Keyword.get(opts, :table_settings) || System.get_env("NUMERIC_IDS_TABLE_SETTINGS") ||
@table_settings

start_from =
Keyword.get(opts, :start_from) || System.get_env("NUMERIC_IDS_PARTITION_START_FROM")

stop_at =
Keyword.get(opts, :stop_at) || System.get_env("NUMERIC_IDS_PARTITION_STOP_AT") ||
previous_part()

(byte_size(dict_url) > 0 and byte_size(dict_password) > 0) ||
raise "Set DOMAINS_DICT_URL and DOMAINS_DICT_PASSWORD"

@repo.start(db_url, max_threads)

cluster? =
case run_sql("check-replicas") do
{:ok, %{num_rows: 0}} -> false
{:ok, %{num_rows: 1}} -> true
end

{:ok, %{rows: partitions}} =
run_sql("list-partitions", start_from: start_from, stop_at: stop_at)

partitions = Enum.map(partitions, fn [part] -> part end)

start_from = start_from || List.first(partitions)

IO.puts("""
Got the following migration settings:

- max_threads: #{max_threads}
- dict_url: #{dict_url}
- dict_password: ✅
- table_settings: #{table_settings}
- db url: #{db_url}
- cluster?: #{cluster?}
- partitions to do: #{inspect(partitions, pretty: true, width: 80)}
- start from: #{start_from}
- stop at: #{stop_at}
""")

run_sql_fn =
if interactive? do
&run_sql_confirm/2
else
&run_sql/2
end

confirm_fn =
if interactive? do
&confirm/2
else
fn _, run_fn ->
run_fn.()
end
end

{:ok, _} = run_sql_fn.("drop-events-v2", cluster?: cluster?)
{:ok, _} = run_sql_fn.("drop-sessions-v2", cluster?: cluster?)
{:ok, _} = run_sql_fn.("drop-tmp-events-v2", [])
{:ok, _} = run_sql_fn.("drop-tmp-sessions-v2", [])
{:ok, _} = run_sql_fn.("drop-dict", [])

{:ok, _} =
run_sql_fn.("create-dict-from-static-file",
dict_url: dict_url,
dict_password: dict_password
)

{:ok, _} = run_sql_fn.("create-events-v2", table_settings: table_settings, cluster?: cluster?)

{:ok, _} =
run_sql_fn.("create-sessions-v2", table_settings: table_settings, cluster?: cluster?)

{:ok, _} = run_sql_fn.("create-tmp-events-v2", table_settings: table_settings)
{:ok, _} = run_sql_fn.("create-tmp-sessions-v2", table_settings: table_settings)

confirm_fn.("Start migration? (starting from partition: #{start_from})", fn ->
IO.puts("start.. #{DateTime.utc_now()}")

for part <- partitions do
part_start = System.monotonic_time()

confirm_fn.("Run partition: #{part}?", fn ->
{:ok, _} = run_sql("insert-into-tmp-events-v2", partition: part)
{:ok, _} = run_sql("attach-tmp-events-v2", partition: part)
{:ok, _} = run_sql("truncate-tmp-events-v2", [])
{:ok, _} = run_sql("insert-into-tmp-sessions-v2", partition: part)
{:ok, _} = run_sql("attach-tmp-sessions-v2", partition: part)
{:ok, _} = run_sql("truncate-tmp-sessions-v2", [])
end)

part_end = System.monotonic_time()

IO.puts(
"#{part} took #{System.convert_time_unit(part_end - part_start, :native, :second)} seconds"
)
end

IO.puts("end.. #{DateTime.utc_now()}")
end)
end

defp previous_part() do
now = NaiveDateTime.utc_now()
month = String.pad_leading("#{now.month - 1}", 2, "0")
year = "#{now.year}"
"#{year}#{month}"
end
end
Loading