Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
pre-create namespaces during database initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
jlegrone committed Aug 4, 2021
1 parent c0be89d commit 011f820
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 18 deletions.
54 changes: 53 additions & 1 deletion internal/common/persistence/sql/sqlplugin/sqlite/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package sqlite

import (
"context"
gosql "database/sql"
_ "embed"
"errors"
Expand All @@ -34,13 +35,18 @@ import (
"strings"

"github.com/iancoleman/strcase"
"github.com/DataDog/temporalite/internal/common/persistence/sql/sqlplugin/tools"
"github.com/jmoiron/sqlx"
"go.temporal.io/api/enums/v1"
"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/persistence/sql/sqlplugin"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/resolver"
"go.temporal.io/server/tools/common/schema"

"github.com/DataDog/temporalite/internal/common/persistence/sql/sqlplugin/tools"
)

const (
Expand Down Expand Up @@ -78,6 +84,14 @@ func (p *plugin) CreateDB(
return nil, err
}
p.mainDB = newDB(dbKind, cfg.DatabaseName, conn, nil)

// Ensure namespaces exist
namespaces := strings.Split(cfg.ConnectAttributes["preCreateNamespaces"], ",")
for _, ns := range namespaces {
if err := createNamespaceIfNotExists(p.mainDB, ns); err != nil {
return nil, fmt.Errorf("error ensuring namespace exists: %w", err)
}
}
}
return p.mainDB, nil
}
Expand Down Expand Up @@ -168,3 +182,41 @@ func (p *plugin) createDBConnection(dbKind sqlplugin.DbKind, cfg *config.SQL, _

return db, nil
}

func createNamespaceIfNotExists(mainDB *db, namespace string) error {
// Return early if namespace already exists
rows, err := mainDB.SelectFromNamespace(context.Background(), sqlplugin.NamespaceFilter{
Name: &namespace,
})
if err == nil && len(rows) > 0 {
return nil
}

nsID := primitives.NewUUID()

d, err := serialization.NamespaceDetailToBlob(&persistence.NamespaceDetail{
Info: &persistence.NamespaceInfo{
Id: nsID.String(),
State: enums.NAMESPACE_STATE_REGISTERED,
Name: namespace,
},
Config: &persistence.NamespaceConfig{},
ReplicationConfig: &persistence.NamespaceReplicationConfig{},
})
if err != nil {
return err
}

if _, err := mainDB.InsertIntoNamespace(context.Background(), &sqlplugin.NamespaceRow{
ID: nsID,
Name: namespace,
Data: d.GetData(),
DataEncoding: d.GetEncodingType().String(),
IsGlobal: false,
NotificationVersion: 0,
}); err != nil {
return err
}

return nil
}
5 changes: 5 additions & 0 deletions internal/liteconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"time"

"github.com/DataDog/temporalite/internal/common/persistence/sql/sqlplugin/sqlite"
Expand Down Expand Up @@ -71,6 +72,10 @@ func Convert(cfg *Config) *config.Config {
sqliteConfig.DatabaseName = cfg.DatabaseFilePath
}

if len(cfg.Namespaces) > 0 {
sqliteConfig.ConnectAttributes["preCreateNamespaces"] = strings.Join(cfg.Namespaces, ",")
}

var (
metricsPort = cfg.FrontendPort + 200
pprofPort = cfg.FrontendPort + 201
Expand Down
21 changes: 4 additions & 17 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package server

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/DataDog/temporalite/internal/liteconfig"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/server/common/authorization"
Expand Down Expand Up @@ -73,28 +71,17 @@ func New(opts ...Option) (*Server, error) {
func (s *Server) Start() error {
if len(s.config.Namespaces) > 0 {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
nsClient, err := s.newNamespaceClient(ctx)
if err != nil {
panic(err)
}
defer nsClient.Close()

// Create namespaces
var errNamespaceExists *serviceerror.NamespaceAlreadyExists
for _, ns := range s.config.Namespaces {
if err := nsClient.Register(ctx, &workflowservice.RegisterNamespaceRequest{
Namespace: ns,
WorkflowExecutionRetentionPeriod: &s.config.DefaultNamespaceRetentionPeriod,
}); err != nil && !errors.As(err, &errNamespaceExists) {
panic(err)
}
}

// Wait for each namespace to be ready
for _, ns := range s.config.Namespaces {
c, err := s.newClient(context.Background(), client.Options{Namespace: ns})
c, err := s.newClient(ctx, client.Options{Namespace: ns})
if err != nil {
panic(err)
}
Expand All @@ -105,11 +92,11 @@ func (s *Server) Start() error {
backoff = 20 * time.Millisecond
)
for i := 0; i < maxAttempts; i++ {
_, err = c.ListOpenWorkflow(context.Background(), &workflowservice.ListOpenWorkflowExecutionsRequest{
_, err = c.ListOpenWorkflow(ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{
Namespace: ns,
})
if err == nil {
if _, err := c.DescribeTaskQueue(context.Background(), "_404", enumspb.TASK_QUEUE_TYPE_UNSPECIFIED); err == nil {
if _, err := c.DescribeTaskQueue(ctx, "_404", enumspb.TASK_QUEUE_TYPE_UNSPECIFIED); err == nil {
fmt.Println(err)
break
}
Expand Down

0 comments on commit 011f820

Please sign in to comment.