Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(push): streaming upload #2

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
**muster_storage_**
/_build
/cover
/deps
Expand Down
29 changes: 29 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "mix_task",
"name": "mix (Default task)",
"request": "launch",
"projectDir": "${workspaceRoot}"
},
{
"type": "mix_task",
"name": "mix test",
"request": "launch",
"task": "test",
"taskArgs": [
"--trace"
],
"startApps": true,
"projectDir": "${workspaceRoot}",
"requireFiles": [
"test/**/test_helper.exs",
"test/**/*_test.exs"
]
}
]
}
3 changes: 3 additions & 0 deletions apps/muster/lib/muster/blob.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule Muster.Blob do

end
4 changes: 4 additions & 0 deletions apps/muster/lib/muster/repository.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ defmodule Muster.Repository do
)
end

def chunk_upload(repo, location, range, blob) when is_nil(range) do
GenServer.call(repo, {:upload_stream, %ChunkedUploadRequest{upload_id: location, range: nil, blob: blob}})
end

def chunk_upload(repo, location, range, blob) do
GenServer.call(
repo,
Expand Down
78 changes: 66 additions & 12 deletions apps/muster/lib/muster/repository/impl.ex
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
defmodule Muster.Repository.Impl do
alias Muster.Storage
require Logger

@enforce_keys [:name, :uploads, :layers, :tags, :manifests]
defstruct ~w[name uploads layers tags manifests]a

@type t :: %__MODULE__{name: String.t(), uploads: map(), layers: MapSet.t(), tags: map(), manifests: map()}

@namespace "all"

@spec new(any) :: Muster.Repository.Impl.t()
def new(name) do
%__MODULE__{
name: name,
# upload sessions
uploads: %{},
# digest to layer blob
layers: %{},
layers: MapSet.new(),
# tag to manifest
tags: %{},
# digest to tag, tag to tag -- single source of truth for manifest reference -> tag
# digest -> tag (union) tag -> tag -- single source of truth for manifest reference -> tag
manifests: %{}
}
end
Expand All @@ -38,7 +44,7 @@ defmodule Muster.Repository.Impl do
%__MODULE__{uploads: uploads, layers: layers} = state
) do
with {:ok, {:started, _any}} <- Map.fetch(uploads, upload_id) do
layers = Map.put(layers, digest, blob)
put_layer(digest, blob, layers, state.name)
uploads = Map.put(uploads, upload_id, {:completed, []})
state = %{state | layers: layers, uploads: uploads}
{:ok, digest, state}
Expand Down Expand Up @@ -67,17 +73,45 @@ defmodule Muster.Repository.Impl do
end
end

def upload_layer_stream(
upload_id, blob, %__MODULE__{uploads: uploads} = state
) do
Logger.debug("Streaming layer chunk for #{upload_id}")
with {:ok, {:started, chunks}} when is_list(chunks) <- Map.fetch(uploads, upload_id)
do
range_end = case chunks do
[prev | []] ->
{{_, prev_end}, _} = prev
prev_end
[prev | [_]] ->
{{_, prev_end}, _} = prev
prev_end
[] ->
0
end
range = range_end + byte_size(blob)
chunks = [{{nil, range}, blob} | chunks]
uploads = Map.put(uploads, upload_id, {:started, chunks})
state = %{state | uploads: uploads}
{:ok, range, state}
else
{:ok, {:started, nil}} -> {:error, :monolithic_only}
{:error, cause} -> {:error, cause}
error -> {:error, error}
end
end

defp verify_chunk_order(chunks, range_start, range_end, blob) do
chunks =
case chunks do
[] when range_start == 0 ->
[{range_end, blob}]
[{{0, range_end}, blob}]

chunks = [{prev_end, _blob} | _tail = []] when prev_end + 1 == range_start ->
[{range_end, blob} | chunks]
chunks = [{{_prev_start, prev_end}, _blob} | _tail = []] when prev_end + 1 == range_start ->
[{{range_start, range_end}, blob} | chunks]

chunks = [{prev_end, blob} | _tail] when prev_end + 1 == range_start ->
[{range_end, blob} | chunks]
chunks = [{{_prev_start, prev_end}, blob} | _tail] when prev_end + 1 == range_start ->
[{{range_start, range_end}, blob} | chunks]

_ ->
Logger.warn("Got invalid chunk sequence for range '#{range_start}-#{range_end}'")
Expand All @@ -103,7 +137,7 @@ defmodule Muster.Repository.Impl do
end

{:started, chunks} = Map.fetch!(state.uploads, upload_id)
layers = put_layer(digest, chunks, state.layers)
layers = put_layer(digest, chunks, state.layers, state.name)
uploads = Map.put(state.uploads, upload_id, {:completed, []})
{:ok, digest, %{state | layers: layers, uploads: uploads}}

Expand All @@ -112,13 +146,20 @@ defmodule Muster.Repository.Impl do
end
end

defp put_layer(digest, chunks, layers_state) do
defp put_layer(digest, chunks, layers_state, name) when is_list(chunks) do
layer =
chunks
|> Enum.map(fn {_, blob} -> blob end)
|> Enum.reduce(<<>>, fn a, b -> a <> b end)

Map.put(layers_state, digest, layer)
put_layer(digest, layer, layers_state, name)
end

defp put_layer(digest, blob, layers_state, name) when is_binary(blob) do
case Storage.write_blob(@namespace, name, digest, blob) do
:ok -> MapSet.put(layers_state, digest)
{:error, _} -> layers_state
end
end

def upload_manifest(
Expand All @@ -129,7 +170,7 @@ defmodule Muster.Repository.Impl do
) do
case manifest_layers
|> Enum.map(fn %{"digest" => digest} -> digest end)
|> Enum.all?(&Map.has_key?(state.layers, &1)) do
|> Enum.all?(&MapSet.member?(state.layers, &1)) do
true ->
state = state |> put_manifest(reference, manifest_digest, manifest)
{:ok, {reference, state}}
Expand All @@ -144,4 +185,17 @@ defmodule Muster.Repository.Impl do
manifests = Map.put(state.manifests, digest, reference) |> Map.put(reference, reference)
%{state | tags: tags, manifests: manifests}
end

def check_layer(digest, %__MODULE__{layers: layers} = _state) do
MapSet.member?(layers, digest)
end

def get_layer(digest, %__MODULE__{layers: layers} = state) do
l = layers |> Enum.map(fn l -> l["digest"] end) |> Enum.join(", ")
Logger.info("Checking layer #{digest} in layers for #{l}")
case MapSet.member?(layers, digest) do
false -> {:error, :not_found}
true -> Storage.get_blob(@namespace, state.name, digest)
end
end
end
39 changes: 22 additions & 17 deletions apps/muster/lib/muster/repository/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,7 @@ defmodule Muster.Repository.Server do

@impl GenServer
@spec init(any) ::
{:ok,
%Muster.Repository.Impl{
layers: %{},
manifests: %{},
name: any,
tags: %{},
uploads: %{}
}}
{:ok, Muster.Repository.Impl.t()}
def init(name) do
{
:ok,
Expand Down Expand Up @@ -76,6 +69,23 @@ defmodule Muster.Repository.Server do
end
end

@impl GenServer
def handle_call(
{:upload_stream,
%ChunkedUploadRequest{upload_id: upload_id, range: nil, blob: blob}},
_from,
state
) do
case Repository.Impl.upload_layer_stream(upload_id, blob, state) do
{:ok, range, state} ->
Logger.debug("Successfully streamed chunk to #{upload_id}")
{:reply, %{location: upload_id, range: range}, state}
{:error, cause} ->
Logger.debug("Error streaming chunk to #{upload_id}: #{cause}")
{:reply, {:error, cause}, state}
end
end

# complete upload with final layer
@impl GenServer
def handle_call(
Expand Down Expand Up @@ -178,19 +188,14 @@ defmodule Muster.Repository.Server do
end

@impl GenServer
def handle_call({:check_layer, digest}, _from, %{layers: layers} = state) do
exists? = Map.has_key?(layers, digest)
def handle_call({:check_layer, digest}, _from, state) do
exists? = Repository.Impl.check_layer(digest, state)
{:reply, exists?, state}
end

@impl GenServer
def handle_call({:get_layer, digest}, _from, %{layers: layers} = state) do
resp =
case Map.get(layers, digest) do
nil -> {:error, :not_found}
blob -> {:ok, blob}
end

def handle_call({:get_layer, digest}, _from, state) do
resp = Repository.Impl.get_layer(digest, state)
{:reply, resp, state}
end
end
31 changes: 31 additions & 0 deletions apps/muster/lib/muster/storage.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Muster.Storage do

# Application.compile_env!(:muster, :storage_root)
@storage_root Application.fetch_env!(:muster, :storage_root)

def storage_root(), do: @storage_root

# Storage Driver
def get_blob(namespace, name, digest) do
filepath(namespace, name, digest)
|> File.read()
|> case do
{:ok, blob} -> {:ok, blob}
{:error, :enomem} -> {:error, :ephemeral}
{:error, _cause} -> {:error, :not_found}
end
end

def write_blob(namespace, name, digest, blob) do
Path.join([@storage_root, namespace, name]) |> File.mkdir_p()
filepath(namespace, name, digest)
|> File.write(blob)
|> case do
{:error, :enomemt} -> {:error, :ephemeral}
{:error, :enoent} -> raise "storage error: enoent"
other -> other
end
end

defp filepath(namespace, name, digest), do: [@storage_root, namespace, name, digest] |> Path.join()
end
4 changes: 4 additions & 0 deletions apps/muster/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ defmodule Muster.MixProject do
[
app: :muster,
version: "0.1.0",
build_path: "../../_build",
config_path: "../../config/config.exs",
deps_path: "../../deps",
lockfile: "../../mix.lock",
elixir: "~> 1.12",
start_permanent: Mix.env() == :prod,
deps: deps(),
Expand Down
7 changes: 7 additions & 0 deletions apps/muster/test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
alias Muster.Storage

ExUnit.start()

ExUnit.after_suite(fn _ ->
Storage.storage_root()
|> File.rm_rf!()
end)
Loading