-
Notifications
You must be signed in to change notification settings - Fork 239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Graph Loader initial implementation #224
Changes from all commits
ff4daf8
70d01de
67704c5
235c245
7ca004d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package registry | ||
|
||
type Package struct { | ||
Name string | ||
DefaultChannel string | ||
Channels map[string]Channel | ||
} | ||
|
||
type Channel struct { | ||
Head BundleKey | ||
Nodes map[BundleKey]map[BundleKey]struct{} | ||
} | ||
|
||
type BundleKey struct { | ||
BundlePath string | ||
Version string //semver string | ||
CsvName string | ||
} | ||
|
||
func (b *BundleKey) IsEmpty() bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we're using the I guess we don't have to for the purposes of defining the map (Go does this for us) but it may be a helpful utility function to have alongside There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's a fair thing to suggest, but perhaps it should just be included when something that needs to do the comparison is written? |
||
return b.BundlePath == "" && b.Version == "" && b.CsvName == "" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
package sqlite | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"fmt" | ||
|
||
"github.com/operator-framework/operator-registry/pkg/registry" | ||
) | ||
|
||
type SQLGraphLoader struct { | ||
Querier registry.Query | ||
PackageName string | ||
} | ||
|
||
func NewSQLGraphLoader(dbFilename, name string) (*SQLGraphLoader, error) { | ||
querier, err := NewSQLLiteQuerier(dbFilename) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &SQLGraphLoader{ | ||
Querier: querier, | ||
PackageName: name, | ||
}, nil | ||
} | ||
|
||
func NewSQLGraphLoaderFromDB(db *sql.DB, name string) (*SQLGraphLoader, error) { | ||
return &SQLGraphLoader{ | ||
Querier: NewSQLLiteQuerierFromDb(db), | ||
PackageName: name, | ||
}, nil | ||
} | ||
|
||
func (g *SQLGraphLoader) Generate() (*registry.Package, error) { | ||
ctx := context.TODO() | ||
defaultChannel, err := g.Querier.GetDefaultPackage(ctx, g.PackageName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
channelEntries, err := g.Querier.GetChannelEntriesFromPackage(ctx, g.PackageName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
channels, err := graphFromEntries(channelEntries) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return ®istry.Package{ | ||
Name: g.PackageName, | ||
DefaultChannel: defaultChannel, | ||
Channels: channels, | ||
}, nil | ||
} | ||
|
||
// graphFromEntries builds the graph from a set of channel entries | ||
func graphFromEntries(channelEntries []registry.ChannelEntryAnnotated) (map[string]registry.Channel, error) { | ||
channels := map[string]registry.Channel{} | ||
|
||
type replaces map[registry.BundleKey]map[registry.BundleKey]struct{} | ||
|
||
channelGraph := map[string]replaces{} | ||
channelHeadCandidates := map[string]map[registry.BundleKey]struct{}{} | ||
|
||
// add all channels and nodes to the graph | ||
for _, entry := range channelEntries { | ||
// create channel if we haven't seen it yet | ||
if _, ok := channelGraph[entry.ChannelName]; !ok { | ||
channelGraph[entry.ChannelName] = replaces{} | ||
} | ||
|
||
key := registry.BundleKey{ | ||
BundlePath: entry.BundlePath, | ||
Version: entry.Version, | ||
CsvName: entry.BundleName, | ||
} | ||
channelGraph[entry.ChannelName][key] = map[registry.BundleKey]struct{}{} | ||
|
||
// every bundle in a channel is a potential head of that channel | ||
if _, ok := channelHeadCandidates[entry.ChannelName]; !ok { | ||
channelHeadCandidates[entry.ChannelName] = map[registry.BundleKey]struct{}{key: {}} | ||
} else { | ||
channelHeadCandidates[entry.ChannelName][key] = struct{}{} | ||
} | ||
} | ||
|
||
for _, entry := range channelEntries { | ||
key := registry.BundleKey{ | ||
BundlePath: entry.BundlePath, | ||
Version: entry.Version, | ||
CsvName: entry.BundleName, | ||
} | ||
replacesKey := registry.BundleKey{ | ||
BundlePath: entry.BundlePath, | ||
Version: entry.ReplacesVersion, | ||
CsvName: entry.Replaces, | ||
} | ||
|
||
if !replacesKey.IsEmpty() { | ||
channelGraph[entry.ChannelName][key][replacesKey] = struct{}{} | ||
} | ||
|
||
delete(channelHeadCandidates[entry.ChannelName], replacesKey) | ||
} | ||
|
||
for channelName, candidates := range channelHeadCandidates { | ||
if len(candidates) == 0 { | ||
return nil, fmt.Errorf("no channel head found for %s", channelName) | ||
} | ||
if len(candidates) > 1 { | ||
return nil, fmt.Errorf("multiple candidate channel heads found for %s: %v", channelName, candidates) | ||
} | ||
|
||
for head := range candidates { | ||
channel := registry.Channel{ | ||
Head: head, | ||
Nodes: channelGraph[channelName], | ||
} | ||
channels[channelName] = channel | ||
} | ||
} | ||
|
||
return channels, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package sqlite | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"fmt" | ||
"github.com/operator-framework/operator-registry/pkg/registry" | ||
"math/rand" | ||
"os" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func createLoadedTestDb(t *testing.T) (*sql.DB, func()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. woo nice test 🎉 |
||
dbName := fmt.Sprintf("test-%d.db", rand.Int()) | ||
|
||
db, err := sql.Open("sqlite3", dbName) | ||
require.NoError(t, err) | ||
|
||
dbLoader, err := NewSQLLiteLoader(db) | ||
require.NoError(t, err) | ||
|
||
err = dbLoader.Migrate(context.TODO()) | ||
require.NoError(t, err) | ||
|
||
loader := NewSQLLoaderForDirectory(dbLoader, "./testdata/loader_data") | ||
err = loader.Populate() | ||
require.NoError(t, err) | ||
|
||
return db, func() { | ||
defer func() { | ||
if err := os.Remove(dbName); err != nil { | ||
t.Fatal(err) | ||
} | ||
}() | ||
if err := db.Close(); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
} | ||
|
||
func TestLoadPackageGraph_Etcd(t *testing.T) { | ||
expectedGraph := ®istry.Package{ | ||
Name: "etcd", | ||
DefaultChannel: "alpha", | ||
Channels: map[string]registry.Channel{ | ||
"alpha": { | ||
Head: registry.BundleKey{BundlePath: "", Version: "0.9.2", CsvName: "etcdoperator.v0.9.2"}, | ||
Nodes: map[registry.BundleKey]map[registry.BundleKey]struct{}{ | ||
registry.BundleKey{BundlePath: "", Version: "", CsvName: "etcdoperator.v0.9.1"}: {}, | ||
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: {}, | ||
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: { | ||
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: struct{}{}, | ||
}, | ||
registry.BundleKey{BundlePath: "", Version: "0.9.2", CsvName: "etcdoperator.v0.9.2"}: { | ||
registry.BundleKey{BundlePath: "", Version: "", CsvName: "etcdoperator.v0.9.1"}: struct{}{}, | ||
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: struct{}{}, | ||
}, | ||
}, | ||
}, | ||
"beta": { | ||
Head: registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}, | ||
Nodes: map[registry.BundleKey]map[registry.BundleKey]struct{}{ | ||
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: {}, | ||
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: { | ||
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: struct{}{}, | ||
}, | ||
}, | ||
}, | ||
"stable": { | ||
Head: registry.BundleKey{BundlePath: "", Version: "0.9.2", CsvName: "etcdoperator.v0.9.2"}, | ||
Nodes: map[registry.BundleKey]map[registry.BundleKey]struct{}{ | ||
registry.BundleKey{BundlePath: "", Version: "", CsvName: "etcdoperator.v0.9.1"}: {}, | ||
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: {}, | ||
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: { | ||
registry.BundleKey{BundlePath: "", Version: "0.6.1", CsvName: "etcdoperator.v0.6.1"}: struct{}{}, | ||
}, | ||
registry.BundleKey{BundlePath: "", Version: "0.9.2", CsvName: "etcdoperator.v0.9.2"}: { | ||
registry.BundleKey{BundlePath: "", Version: "", CsvName: "etcdoperator.v0.9.1"}: struct{}{}, | ||
registry.BundleKey{BundlePath: "", Version: "0.9.0", CsvName: "etcdoperator.v0.9.0"}: struct{}{}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
db, cleanup := createLoadedTestDb(t) | ||
defer cleanup() | ||
|
||
graphLoader, err := NewSQLGraphLoaderFromDB(db, "etcd") | ||
require.NoError(t, err) | ||
|
||
result, err := graphLoader.Generate() | ||
require.NoError(t, err) | ||
|
||
require.Equal(t, "etcd", result.Name) | ||
require.Equal(t, 3, len(result.Channels)) | ||
|
||
for channelName, channel := range result.Channels { | ||
expectedChannel := expectedGraph.Channels[channelName] | ||
require.Equal(t, expectedChannel.Head, channel.Head) | ||
require.EqualValues(t, expectedChannel.Nodes, channel.Nodes) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -113,6 +113,76 @@ func (s *SQLQuerier) GetPackage(ctx context.Context, name string) (*registry.Pac | |
return pkg, nil | ||
} | ||
|
||
func (s *SQLQuerier) GetDefaultPackage(ctx context.Context, name string) (string, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: should this be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also - is the query interface updated with these? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. latest updated includes the interface update |
||
query := `SELECT default_channel | ||
FROM package WHERE package.name=?` | ||
rows, err := s.db.QueryContext(ctx, query, name) | ||
if err != nil { | ||
return "", err | ||
} | ||
defer rows.Close() | ||
|
||
var defaultChannel sql.NullString | ||
if !rows.Next() { | ||
return "", fmt.Errorf("package %s not found", name) | ||
} | ||
if err := rows.Scan(&defaultChannel); err != nil { | ||
return "", err | ||
} | ||
|
||
if !defaultChannel.Valid { | ||
return "", fmt.Errorf("default channel not valid") | ||
} | ||
|
||
return defaultChannel.String, nil | ||
} | ||
|
||
func (s *SQLQuerier) GetChannelEntriesFromPackage(ctx context.Context, packageName string) ([]registry.ChannelEntryAnnotated, error) { | ||
query := `SELECT channel_entry.package_name, channel_entry.channel_name, channel_entry.operatorbundle_name, op_bundle.version, op_bundle.bundlepath, replaces.operatorbundle_name, replacesbundle.version, replacesbundle.bundlepath | ||
FROM channel_entry | ||
LEFT JOIN channel_entry replaces ON channel_entry.replaces = replaces.entry_id | ||
LEFT JOIN operatorbundle op_bundle ON channel_entry.operatorbundle_name = op_bundle.name | ||
LEFT JOIN operatorbundle replacesbundle ON replaces.operatorbundle_name = replacesbundle.name | ||
WHERE channel_entry.package_name = ?;` | ||
|
||
var entries []registry.ChannelEntryAnnotated | ||
rows, err := s.db.QueryContext(ctx, query, packageName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer rows.Close() | ||
|
||
var pkgName sql.NullString | ||
var channelName sql.NullString | ||
var bundleName sql.NullString | ||
var replaces sql.NullString | ||
var version sql.NullString | ||
var bundlePath sql.NullString | ||
var replacesVersion sql.NullString | ||
var replacesBundlePath sql.NullString | ||
|
||
for rows.Next() { | ||
if err := rows.Scan(&pkgName, &channelName, &bundleName, &version, &bundlePath, &replaces, &replacesVersion, &replacesBundlePath); err != nil { | ||
return nil, err | ||
} | ||
|
||
channelEntryNode := registry.ChannelEntryAnnotated{ | ||
PackageName: pkgName.String, | ||
ChannelName: channelName.String, | ||
BundleName: bundleName.String, | ||
Version: version.String, | ||
BundlePath: bundlePath.String, | ||
Replaces: replaces.String, | ||
ReplacesVersion: replacesVersion.String, | ||
ReplacesBundlePath: replacesBundlePath.String, | ||
} | ||
|
||
entries = append(entries, channelEntryNode) | ||
} | ||
|
||
return entries, nil | ||
} | ||
|
||
func (s *SQLQuerier) GetBundle(ctx context.Context, pkgName, channelName, csvName string) (*api.Bundle, error) { | ||
query := `SELECT DISTINCT channel_entry.entry_id, operatorbundle.name, operatorbundle.bundle, operatorbundle.bundlepath, operatorbundle.version, operatorbundle.skiprange | ||
FROM operatorbundle INNER JOIN channel_entry ON operatorbundle.name=channel_entry.operatorbundle_name | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
expected:[]registry.OperatorBundle{registry.OperatorBundle{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x9, Patch: 0x2, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.9.2", ReplacesBundles: []registry.OperatorBundle{}, Replaces: []registry.BundleRef{registry.BundleRef{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x9, Patch: 0x0, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.9.0"}}}, registry.OperatorBundle{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x0, Patch: 0x0, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.9.1", ReplacesBundles: []registry.OperatorBundle{}, Replaces: []registry.BundleRef{}}, registry.OperatorBundle{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x9, Patch: 0x0, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.9.0", ReplacesBundles: []registry.OperatorBundle{}, Replaces: []registry.BundleRef{registry.BundleRef{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x6, Patch: 0x1, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.6.1"}}}, registry.OperatorBundle{BundlePath: "", Version: semver.Version{Major: 0x0, Minor: 0x6, Patch: 0x1, Pre: []semver.PRVersion(nil), Build: []string(nil)}, CsvName: "etcdoperator.v0.6.1", ReplacesBundles: []registry.OperatorBundle{}, Replaces: []registry.BundleRef{}}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not type this as a semver field?