Skip to content

Commit

Permalink
Merge pull request #224 from kevinrizza/graph-loader
Browse files Browse the repository at this point in the history
Graph Loader initial implementation
  • Loading branch information
openshift-merge-robot committed Mar 25, 2020
2 parents 100f3bc + 7ca004d commit d2e8855
Show file tree
Hide file tree
Showing 33 changed files with 18,894 additions and 0 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6
github.com/blang/semver v3.5.0+incompatible
github.com/docker/distribution v2.7.1+incompatible
github.com/ghodss/yaml v1.0.0
github.com/golang-migrate/migrate/v4 v4.6.2
Expand Down
8 changes: 8 additions & 0 deletions pkg/registry/empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ func (EmptyQuery) GetPackage(ctx context.Context, name string) (*PackageManifest
return nil, errors.New("empty querier: cannot get package")
}

func (EmptyQuery) GetDefaultPackage(ctx context.Context, name string) (string, error) {
return "", errors.New("empty querier: cannot get default package")
}

func (EmptyQuery) GetChannelEntriesFromPackage(ctx context.Context, packageName string) ([]ChannelEntryAnnotated, error) {
return nil, errors.New("empty querier: cannot get all channel entries for package")
}

func (EmptyQuery) GetBundle(ctx context.Context, pkgName, channelName, csvName string) (*api.Bundle, error) {
return nil, errors.New("empty querier: cannot get bundle")
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/registry/graph.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package registry

type Package struct {
Name string
DefaultChannel string
Channels map[string]Channel
}

type Channel struct {
Head BundleKey
Nodes map[BundleKey]map[BundleKey]struct{}
}

type BundleKey struct {
BundlePath string
Version string //semver string
CsvName string
}

func (b *BundleKey) IsEmpty() bool {
return b.BundlePath == "" && b.Version == "" && b.CsvName == ""
}
9 changes: 9 additions & 0 deletions pkg/registry/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Query interface {
ListTables(ctx context.Context) ([]string, error)
ListPackages(ctx context.Context) ([]string, error)
GetPackage(ctx context.Context, name string) (*PackageManifest, error)
GetDefaultPackage(ctx context.Context, name string) (string, error)
GetChannelEntriesFromPackage(ctx context.Context, packageName string) ([]ChannelEntryAnnotated, error)
GetBundle(ctx context.Context, pkgName, channelName, csvName string) (*api.Bundle, error)
GetBundleForChannel(ctx context.Context, pkgName string, channelName string) (*api.Bundle, error)
// Get all channel entries that say they replace this one
Expand Down Expand Up @@ -47,3 +49,10 @@ type Query interface {
// Get CurrentCSV name for channel and package
GetCurrentCSVNameForChannel(ctx context.Context, pkgName, channel string) (string, error)
}

// GraphLoader generates a graph
// GraphLoader supports multiple different loading schemes
// GraphLoader from SQL, GraphLoader from old format (filesystem), GraphLoader from SQL + input bundles
type GraphLoader interface {
Generate() (*Package, error)
}
12 changes: 12 additions & 0 deletions pkg/registry/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ type ChannelEntry struct {
Replaces string
}

// ChannelEntryAnnotated is a denormalized node in a channel graph annotated with additional entry level info
type ChannelEntryAnnotated struct {
PackageName string
ChannelName string
BundleName string
BundlePath string
Version string
Replaces string
ReplacesVersion string
ReplacesBundlePath string
}

// AnnotationsFile holds annotation information about a bundle
type AnnotationsFile struct {
// annotations is a list of annotations for a given bundle
Expand Down
127 changes: 127 additions & 0 deletions pkg/sqlite/graphloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package sqlite

import (
"context"
"database/sql"
"fmt"

"github.com/operator-framework/operator-registry/pkg/registry"
)

type SQLGraphLoader struct {
Querier registry.Query
PackageName string
}

func NewSQLGraphLoader(dbFilename, name string) (*SQLGraphLoader, error) {
querier, err := NewSQLLiteQuerier(dbFilename)
if err != nil {
return nil, err
}

return &SQLGraphLoader{
Querier: querier,
PackageName: name,
}, nil
}

func NewSQLGraphLoaderFromDB(db *sql.DB, name string) (*SQLGraphLoader, error) {
return &SQLGraphLoader{
Querier: NewSQLLiteQuerierFromDb(db),
PackageName: name,
}, nil
}

func (g *SQLGraphLoader) Generate() (*registry.Package, error) {
ctx := context.TODO()
defaultChannel, err := g.Querier.GetDefaultPackage(ctx, g.PackageName)
if err != nil {
return nil, err
}

channelEntries, err := g.Querier.GetChannelEntriesFromPackage(ctx, g.PackageName)
if err != nil {
return nil, err
}

channels, err := graphFromEntries(channelEntries)
if err != nil {
return nil, err
}

return &registry.Package{
Name: g.PackageName,
DefaultChannel: defaultChannel,
Channels: channels,
}, nil
}

// graphFromEntries builds the graph from a set of channel entries
func graphFromEntries(channelEntries []registry.ChannelEntryAnnotated) (map[string]registry.Channel, error) {
channels := map[string]registry.Channel{}

type replaces map[registry.BundleKey]map[registry.BundleKey]struct{}

channelGraph := map[string]replaces{}
channelHeadCandidates := map[string]map[registry.BundleKey]struct{}{}

// add all channels and nodes to the graph
for _, entry := range channelEntries {
// create channel if we haven't seen it yet
if _, ok := channelGraph[entry.ChannelName]; !ok {
channelGraph[entry.ChannelName] = replaces{}
}

key := registry.BundleKey{
BundlePath: entry.BundlePath,
Version: entry.Version,
CsvName: entry.BundleName,
}
channelGraph[entry.ChannelName][key] = map[registry.BundleKey]struct{}{}

// every bundle in a channel is a potential head of that channel
if _, ok := channelHeadCandidates[entry.ChannelName]; !ok {
channelHeadCandidates[entry.ChannelName] = map[registry.BundleKey]struct{}{key: {}}
} else {
channelHeadCandidates[entry.ChannelName][key] = struct{}{}
}
}

for _, entry := range channelEntries {
key := registry.BundleKey{
BundlePath: entry.BundlePath,
Version: entry.Version,
CsvName: entry.BundleName,
}
replacesKey := registry.BundleKey{
BundlePath: entry.BundlePath,
Version: entry.ReplacesVersion,
CsvName: entry.Replaces,
}

if !replacesKey.IsEmpty() {
channelGraph[entry.ChannelName][key][replacesKey] = struct{}{}
}

delete(channelHeadCandidates[entry.ChannelName], replacesKey)
}

for channelName, candidates := range channelHeadCandidates {
if len(candidates) == 0 {
return nil, fmt.Errorf("no channel head found for %s", channelName)
}
if len(candidates) > 1 {
return nil, fmt.Errorf("multiple candidate channel heads found for %s: %v", channelName, candidates)
}

for head := range candidates {
channel := registry.Channel{
Head: head,
Nodes: channelGraph[channelName],
}
channels[channelName] = channel
}
}

return channels, nil
}
105 changes: 105 additions & 0 deletions pkg/sqlite/graphloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package sqlite

import (
"context"
"database/sql"
"fmt"
"github.com/operator-framework/operator-registry/pkg/registry"
"math/rand"
"os"
"testing"

"github.com/stretchr/testify/require"
)

func createLoadedTestDb(t *testing.T) (*sql.DB, func()) {
dbName := fmt.Sprintf("test-%d.db", rand.Int())

db, err := sql.Open("sqlite3", dbName)
require.NoError(t, err)

dbLoader, err := NewSQLLiteLoader(db)
require.NoError(t, err)

err = dbLoader.Migrate(context.TODO())
require.NoError(t, err)

loader := NewSQLLoaderForDirectory(dbLoader, "./testdata/loader_data")
err = loader.Populate()
require.NoError(t, err)

return db, func() {
defer func() {
if err := os.Remove(dbName); err != nil {
t.Fatal(err)
}
}()
if err := db.Close(); err != nil {
t.Fatal(err)
}
}
}

func TestLoadPackageGraph_Etcd(t *testing.T) {
expectedGraph := &registry.Package{
Name: "etcd",
DefaultChannel: "alpha",
Channels: map[string]registry.Channel{
"alpha": {
Head: registry.BundleKey{BundlePath: "", Version: "0.9.2", CsvName: "etcdoperator.v0.9.2"},
Nodes: map[registry.BundleKey]map[registry.BundleKey]struct{}{
registry.BundleKey{BundlePath: "", Version: "", CsvName: "etcdoperator.v0.9.1"}: {},
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: {},
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: {
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: struct{}{},
},
registry.BundleKey{BundlePath: "", Version: "0.9.2", CsvName: "etcdoperator.v0.9.2"}: {
registry.BundleKey{BundlePath: "", Version: "", CsvName: "etcdoperator.v0.9.1"}: struct{}{},
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: struct{}{},
},
},
},
"beta": {
Head: registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"},
Nodes: map[registry.BundleKey]map[registry.BundleKey]struct{}{
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: {},
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: {
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: struct{}{},
},
},
},
"stable": {
Head: registry.BundleKey{BundlePath: "", Version: "0.9.2", CsvName: "etcdoperator.v0.9.2"},
Nodes: map[registry.BundleKey]map[registry.BundleKey]struct{}{
registry.BundleKey{BundlePath: "", Version: "", CsvName: "etcdoperator.v0.9.1"}: {},
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: {},
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: {
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: struct{}{},
},
registry.BundleKey{BundlePath: "", Version: "0.9.2", CsvName: "etcdoperator.v0.9.2"}: {
registry.BundleKey{BundlePath: "", Version: "", CsvName: "etcdoperator.v0.9.1"}: struct{}{},
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: struct{}{},
},
},
},
},
}

db, cleanup := createLoadedTestDb(t)
defer cleanup()

graphLoader, err := NewSQLGraphLoaderFromDB(db, "etcd")
require.NoError(t, err)

result, err := graphLoader.Generate()
require.NoError(t, err)

require.Equal(t, "etcd", result.Name)
require.Equal(t, 3, len(result.Channels))

for channelName, channel := range result.Channels {
expectedChannel := expectedGraph.Channels[channelName]
require.Equal(t, expectedChannel.Head, channel.Head)
require.EqualValues(t, expectedChannel.Nodes, channel.Nodes)
}
}
70 changes: 70 additions & 0 deletions pkg/sqlite/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,76 @@ func (s *SQLQuerier) GetPackage(ctx context.Context, name string) (*registry.Pac
return pkg, nil
}

func (s *SQLQuerier) GetDefaultPackage(ctx context.Context, name string) (string, error) {
query := `SELECT default_channel
FROM package WHERE package.name=?`
rows, err := s.db.QueryContext(ctx, query, name)
if err != nil {
return "", err
}
defer rows.Close()

var defaultChannel sql.NullString
if !rows.Next() {
return "", fmt.Errorf("package %s not found", name)
}
if err := rows.Scan(&defaultChannel); err != nil {
return "", err
}

if !defaultChannel.Valid {
return "", fmt.Errorf("default channel not valid")
}

return defaultChannel.String, nil
}

func (s *SQLQuerier) GetChannelEntriesFromPackage(ctx context.Context, packageName string) ([]registry.ChannelEntryAnnotated, error) {
query := `SELECT channel_entry.package_name, channel_entry.channel_name, channel_entry.operatorbundle_name, op_bundle.version, op_bundle.bundlepath, replaces.operatorbundle_name, replacesbundle.version, replacesbundle.bundlepath
FROM channel_entry
LEFT JOIN channel_entry replaces ON channel_entry.replaces = replaces.entry_id
LEFT JOIN operatorbundle op_bundle ON channel_entry.operatorbundle_name = op_bundle.name
LEFT JOIN operatorbundle replacesbundle ON replaces.operatorbundle_name = replacesbundle.name
WHERE channel_entry.package_name = ?;`

var entries []registry.ChannelEntryAnnotated
rows, err := s.db.QueryContext(ctx, query, packageName)
if err != nil {
return nil, err
}
defer rows.Close()

var pkgName sql.NullString
var channelName sql.NullString
var bundleName sql.NullString
var replaces sql.NullString
var version sql.NullString
var bundlePath sql.NullString
var replacesVersion sql.NullString
var replacesBundlePath sql.NullString

for rows.Next() {
if err := rows.Scan(&pkgName, &channelName, &bundleName, &version, &bundlePath, &replaces, &replacesVersion, &replacesBundlePath); err != nil {
return nil, err
}

channelEntryNode := registry.ChannelEntryAnnotated{
PackageName: pkgName.String,
ChannelName: channelName.String,
BundleName: bundleName.String,
Version: version.String,
BundlePath: bundlePath.String,
Replaces: replaces.String,
ReplacesVersion: replacesVersion.String,
ReplacesBundlePath: replacesBundlePath.String,
}

entries = append(entries, channelEntryNode)
}

return entries, nil
}

func (s *SQLQuerier) GetBundle(ctx context.Context, pkgName, channelName, csvName string) (*api.Bundle, error) {
query := `SELECT DISTINCT channel_entry.entry_id, operatorbundle.name, operatorbundle.bundle, operatorbundle.bundlepath, operatorbundle.version, operatorbundle.skiprange
FROM operatorbundle INNER JOIN channel_entry ON operatorbundle.name=channel_entry.operatorbundle_name
Expand Down
1 change: 1 addition & 0 deletions pkg/sqlite/test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
expected:[]registry.OperatorBundle{registry.OperatorBundle{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x9, Patch: 0x2, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.9.2", ReplacesBundles: []registry.OperatorBundle{}, Replaces: []registry.BundleRef{registry.BundleRef{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x9, Patch: 0x0, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.9.0"}}}, registry.OperatorBundle{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x0, Patch: 0x0, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.9.1", ReplacesBundles: []registry.OperatorBundle{}, Replaces: []registry.BundleRef{}}, registry.OperatorBundle{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x9, Patch: 0x0, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.9.0", ReplacesBundles: []registry.OperatorBundle{}, Replaces: []registry.BundleRef{registry.BundleRef{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x6, Patch: 0x1, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.6.1"}}}, registry.OperatorBundle{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x6, Patch: 0x1, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.6.1", ReplacesBundles: []registry.OperatorBundle{}, Replaces: []registry.BundleRef{}}}

0 comments on commit d2e8855

Please sign in to comment.