Skip to content

Commit

Permalink
refactor of workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
manveru committed Oct 14, 2021
1 parent e6c408f commit c5c4aaa
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 99 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1 +1,2 @@
/cicero
/db/database.sqlite3
11 changes: 7 additions & 4 deletions brain.go
Expand Up @@ -21,6 +21,7 @@ type BrainCmd struct {

type Workflow struct {
ID uint64
Name string
Certs map[string]interface{}
}

Expand All @@ -44,7 +45,7 @@ func runBrain(args *BrainCmd) error {
}

func brain(db *bun.DB) error {
streamName := "workflow.*.cert"
streamName := "workflow.*.*.cert"

client := connect([]string{streamName})
defer client.Close()
Expand All @@ -61,13 +62,14 @@ func brain(db *bun.DB) error {
fmt.Println(msg.Timestamp(), msg.Offset(), string(msg.Key()), string(msg.Value()))
fmt.Println("subject:", msg.Subject())
parts := strings.Split(msg.Subject(), ".")
id, err := strconv.ParseUint(parts[1], 10, 64)
workflowName := parts[1]
id, err := strconv.ParseUint(parts[2], 10, 64)
if err != nil {
logger.Printf("Invalid Workflow ID received, ignoring: %s\n", msg.Subject())
return
}

logger.Printf("Received update for workflow %d", id)
logger.Printf("Received update for workflow %s %d", workflowName, id)

received := map[string]interface{}{}
unmarshalErr := json.Unmarshal(msg.Value(), &received)
Expand All @@ -77,14 +79,15 @@ func brain(db *bun.DB) error {
}

err = db.RunInTx(ctx, nil, func(ctx context.Context, tx bun.Tx) error {
existing := &Workflow{}
existing := &Workflow{Name: workflowName}
err = tx.NewSelect().
Model(existing).
Where("id = ?", id).
Scan(context.Background())

if err == sql.ErrNoRows {
workflow := &Workflow{
Name: workflowName,
ID: id,
Certs: received,
}
Expand Down
18 changes: 16 additions & 2 deletions builder.sh
@@ -1,7 +1,21 @@
#!/bin/sh
# shellcheck shell=bash
# shellcheck shell=bash disable=SC1090

set -exu

. "${scriptPath:-}"
case "${type:-bash}" in
bash)
. "${scriptPath:-}"
;;
ruby)
ruby "${scriptPath:-}"
;;
python)
python "${scriptPath:-}"
;;
crystal)
crystal run "${scriptPath:-}"
;;
esac

echo -n "${result:-}" > "${out:-}"
Binary file removed db/database.sqlite3
Binary file not shown.
1 change: 1 addition & 0 deletions db/migrations/20211011095324_create_workflows.sql
Expand Up @@ -2,6 +2,7 @@

create table workflows (
id bigint primary key unique not null,
name text not null,
certs jsonb not null
);

Expand Down
1 change: 1 addition & 0 deletions db/schema.sql
@@ -1,6 +1,7 @@
CREATE TABLE IF NOT EXISTS "schema_migrations" (version varchar(255) primary key);
CREATE TABLE workflows (
id bigint primary key unique not null,
name text not null,
certs jsonb not null
);
-- Dbmate schema migrations
Expand Down
12 changes: 10 additions & 2 deletions flake.nix
Expand Up @@ -22,11 +22,19 @@

cicero = prev.buildGoModule rec {
pname = "cicero";
version = "2021.10.11.001";
version = "2021.10.14.001";
vendorSha256 = "sha256-S1YuuiJUiQG9SMzXJUDmF4ZyYmAPNoVnemzBGdWMzdM=";

src = inputs.inclusive.lib.inclusive ./. [
./go.mod ./go.sum ./main.go ./brain.go ./invoker.go ./show.go
./go.mod
./go.sum
./main.go
./brain.go
./builder.sh
./db
./invoker.go
./lib.nix
./show.go
];

ldflags = [
Expand Down
43 changes: 27 additions & 16 deletions invoker.go
Expand Up @@ -28,34 +28,35 @@ func invoker() {
ctx := context.Background()
err := client.Subscribe(
ctx,
"workflow.*.invoke",
"workflow.*.*.invoke",
func(msg *liftbridge.Message, err error) {
inputs := string(msg.Value())
logger.Println(msg.Timestamp(), msg.Offset(), string(msg.Key()), inputs)

parts := strings.Split(msg.Subject(), ".")
id, err := strconv.ParseUint(parts[1], 10, 64)
workflowName := parts[1]
id, err := strconv.ParseUint(parts[2], 10, 64)
if err != nil {
logger.Printf("Invalid Workflow ID received, ignoring: %s\n", msg.Subject())
return
}

instantiated, err := nixInstantiate(id, inputs)
workflow, err := nixInstantiate(workflowName, id, inputs)
if err != nil {
logger.Printf("Invalid Workflow Definition, ignoring: %s\n", err)
return
}

for key, value := range *instantiated {
if value.Run != nil {
fmt.Printf("building %s\n", key)
output, err := nixBuild(id, key, inputs)
for taskName, task := range workflow.Tasks {
if task.Run != nil {
fmt.Printf("building %s.%s\n", workflowName, taskName)
output, err := nixBuild(workflowName, id, taskName, inputs)

if err == nil {
publish(fmt.Sprintf("workflow.%d.cert", id), "workflow.*.cert", value.Success)
publish(fmt.Sprintf("workflow.%s.%d.cert", workflowName, id), "workflow.*.*.cert", task.Success)
} else {
fmt.Println(string(output))
publish(fmt.Sprintf("workflow.%d.cert", id), "workflow.*.cert", value.Failure)
publish(fmt.Sprintf("workflow.%s.%d.cert", workflowName, id), "workflow.*.*.cert", task.Failure)
fail(errors.WithMessage(err, "Failed to run nix-build"))
}
}
Expand All @@ -69,43 +70,53 @@ func invoker() {
}
}

func nixBuild(id uint64, name string, inputs string) ([]byte, error) {
func nixBuild(workflowName string, id uint64, name string, inputs string) ([]byte, error) {
return exec.Command(
"nix-build",
"--no-out-link",
"--argstr", "id", strconv.FormatUint(id, 10),
"--argstr", "inputsJSON", inputs,
"./workflow.nix",
"--attr", name+".run",
"./lib.nix",
"--attr", fmt.Sprintf("workflows.%s.%s.run", workflowName, name),
).CombinedOutput()
}

type workflowDefinitions map[string]workflowDefinition
type workflowDefinition struct {
Name string
Meta map[string]interface{}
Tasks map[string]workflowTask
}
type workflowTask struct {
Failure map[string]interface{} `json:"failure"`
Success map[string]interface{} `json:"success"`
Inputs []string `json:"inputs"`
When map[string]bool `json:"when"`
Run *string `json:"run"`
}

func nixInstantiate(id uint64, inputs string) (*workflowDefinitions, error) {
func nixInstantiate(workflowName string, id uint64, inputs string) (*workflowDefinition, error) {
output, err := exec.Command(
"nix-instantiate",
"--eval",
"--strict",
"--json",
"./workflow.nix",
"./lib.nix",
"--argstr", "inputsJSON", inputs,
"--argstr", "id", strconv.FormatUint(id, 10),
"--attr", fmt.Sprintf(`workflows."%s"`, workflowName),
).CombinedOutput()

if err != nil {
logger.Println(string(output))
return nil, errors.WithMessage(err, "Failed to run nix-instantiate")
}

result := &workflowDefinitions{}
result := &workflowDefinition{}
err = json.Unmarshal(output, result)
return result, err
if err != nil {
logger.Println(string(output))
return nil, errors.WithMessage(err, "While unmarshaling workflowDefinition")
}
return result, nil
}
105 changes: 105 additions & 0 deletions lib.nix
@@ -0,0 +1,105 @@
{ id, inputs ? { }, inputsJSON ? null }:
let
inherit (builtins)
all attrNames attrValues concatStringsSep fromJSON functionArgs getFlake
hashString seq readDir;

combinedInputs = inputs
// (if inputsJSON != null then (fromJSON inputsJSON) else { });

flake = getFlake (toString ./.);
pkgs = flake.inputs.nixpkgs.legacyPackages.x86_64-linux
// flake.legacyPackages.x86_64-linux;
inherit (pkgs) lib;

findRunner = type:
runners.${type} or (throw
"Invalid task type '${type}'. Available types are: ${
concatStringsSep ", " (attrNames runners)
}");

mkDerivation = { workflowName, taskName, script, ... }@args:
derivation ((rec {
name = lib.strings.sanitizeDerivationName "${workflowName}-${taskName}";
passAsFile = [ "script" ];
inherit script;
system = "x86_64-linux";
result = concatStringsSep "." [
workflowName
taskName
(hashString outputHashAlgo script)
];
outputHashAlgo = "sha256";
outputHashMode = "flat";
outputHash = hashString outputHashAlgo result;
builder = ./builder.sh;
}) // args);

runners = with pkgs;
let
run = ourArgs: taskArgs: mkDerivation (ourArgs // taskArgs);
makeBinPath = extra: lib.makeBinPath ([ liftbridge-cli ] ++ extra);
in {
bash = run { PATH = makeBinPath [ bash coreutils ]; };
ruby = run { PATH = makeBinPath [ ruby ]; };
python = run { PATH = makeBinPath [ python ]; };
crystal = run { PATH = makeBinPath [ crystal ]; };
};

mkTask = { workflowName, taskName, task, inputs, run, type ? "bash"
, when ? { }, success ? { ${taskName} = true; }
, failure ? { ${taskName} = false; } }:
let
pp = v: builtins.trace (builtins.toJSON v) v;
ok = all (a: a) (attrValues when);
runner = findRunner type;
drv = runner {
inherit workflowName taskName type;
script = run;
};
in {
run = if ok then drv else null;
type = seq runner type;
inherit when inputs success failure;
};

workflow = { name, tasks ? { }, meta ? { } }:
let
transformTask = taskName: task:
let
inputs = attrNames (functionArgs task);
intersection =
pkgs.lib.intersectLists inputs (attrNames combinedInputs);
filteredInputs = pkgs.lib.listToAttrs (map (input: {
name = input;
value = combinedInputs.${input} or null;
}) intersection);
in mkTask ({
workflowName = name;
inherit taskName task inputs;
} // (task filteredInputs));
transformedTasks = pkgs.lib.mapAttrs transformTask tasks;
in {
inherit name meta;
tasks = transformedTasks;
};

x = {
pingpong = { name = "pingpong"; };
"test/ruby" = { name = "test/ruby"; };
};

workflows = dir:
(lib.mapAttrsToList (name: type:
if (type == "regular") && (lib.hasSuffix ".nix" name) then
let
called = import (dir + "/${name}") {
id = toString id;
inherit workflow;
};
in [ (pkgs.lib.nameValuePair called.name called) ]
else if type == "directory" then
[ (workflows (dir + "/${name}")) ]
else
{ }) (readDir dir));
in { workflows = lib.listToAttrs (lib.flatten (workflows ./workflows)); }
7 changes: 4 additions & 3 deletions show.go
Expand Up @@ -7,16 +7,17 @@ import (
)

type ShowCmd struct {
ID uint64 `arg:"--id" default:0`
Inputs string `arg:"--inputs" default:"{}"`
WorkflowName string `arg:"--workflow,required"`
ID uint64 `arg:"--id" default:0`
Inputs string `arg:"--inputs" default:"{}"`
}

func runShow(args *ShowCmd) error {
return show(args)
}

func show(args *ShowCmd) error {
def, err := nixInstantiate(args.ID, args.Inputs)
def, err := nixInstantiate(args.WorkflowName, args.ID, args.Inputs)
if err != nil {
return err
}
Expand Down

0 comments on commit c5c4aaa

Please sign in to comment.