From 75199a91cc0c4e4bad872fb197e738c25e4d1b63 Mon Sep 17 00:00:00 2001 From: Eric Fritz Date: Fri, 7 Aug 2020 14:28:53 -0500 Subject: [PATCH] Squash commits. --- .../internal/server/handler.go | 26 +-- .../internal/worker/processor.go | 15 +- .../bundles/persistence/scylladb/reader.go | 164 ++++++++++++++++++ .../bundles/persistence/scylladb/session.go | 72 ++++++++ .../bundles/persistence/scylladb/writer.go | 128 ++++++++++++++ go.mod | 3 + go.sum | 4 + 7 files changed, 396 insertions(+), 16 deletions(-) create mode 100644 enterprise/internal/codeintel/bundles/persistence/scylladb/reader.go create mode 100644 enterprise/internal/codeintel/bundles/persistence/scylladb/session.go create mode 100644 enterprise/internal/codeintel/bundles/persistence/scylladb/writer.go diff --git a/enterprise/cmd/precise-code-intel-bundle-manager/internal/server/handler.go b/enterprise/cmd/precise-code-intel-bundle-manager/internal/server/handler.go index 070c284bda0ae4..c9ed7065b97d3a 100644 --- a/enterprise/cmd/precise-code-intel-bundle-manager/internal/server/handler.go +++ b/enterprise/cmd/precise-code-intel-bundle-manager/internal/server/handler.go @@ -18,6 +18,7 @@ import ( "github.com/sourcegraph/sourcegraph/enterprise/cmd/precise-code-intel-bundle-manager/internal/database" "github.com/sourcegraph/sourcegraph/enterprise/cmd/precise-code-intel-bundle-manager/internal/paths" "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence" + "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/scylladb" sqlitereader "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/sqlite" "github.com/sourcegraph/sourcegraph/internal/trace/ot" ) @@ -377,20 +378,21 @@ func (s *Server) dbQueryErr(w http.ResponseWriter, r *http.Request, handler dbQu span.Finish() }() - return s.readerCache.WithReader(ctx, filename, func(reader persistence.Reader) error { - db, err := database.OpenDatabase(ctx, filename, persistence.NewObserved(reader, s.observationContext)) - if err != nil { - return pkgerrors.Wrap(err, "database.OpenDatabase") - } + // return s.readerCache.WithReader(ctx, filename, func(reader persistence.Reader) error { + reader := scylladb.NewReader(int(idFromRequest(r))) + db, err := database.OpenDatabase(ctx, filename, persistence.NewObserved(reader, s.observationContext)) + if err != nil { + return pkgerrors.Wrap(err, "database.OpenDatabase") + } - payload, err := handler(ctx, db) - if err != nil { - return err - } + payload, err := handler(ctx, db) + if err != nil { + return err + } - writeJSON(w, payload) - return nil - }) + writeJSON(w, payload) + return nil + // }) } // limitTransferRate applies a transfer limit to the given writer. diff --git a/enterprise/cmd/precise-code-intel-worker/internal/worker/processor.go b/enterprise/cmd/precise-code-intel-worker/internal/worker/processor.go index a9dfa17bcf842f..8f2a001a4725ed 100644 --- a/enterprise/cmd/precise-code-intel-worker/internal/worker/processor.go +++ b/enterprise/cmd/precise-code-intel-worker/internal/worker/processor.go @@ -13,7 +13,7 @@ import ( "github.com/sourcegraph/sourcegraph/enterprise/cmd/precise-code-intel-worker/internal/correlation" "github.com/sourcegraph/sourcegraph/enterprise/cmd/precise-code-intel-worker/internal/metrics" bundles "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/client" - sqlitewriter "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/sqlite" + "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/scylladb" "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/types" "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/gitserver" "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/store" @@ -91,7 +91,7 @@ func (p *processor) Process(ctx context.Context, store store.Store, upload store return false, errors.Wrap(err, "correlation.Correlate") } - if err := p.write(ctx, tempDir, groupedBundleData); err != nil { + if err := p.write(ctx, upload.ID, tempDir, groupedBundleData); err != nil { return false, err } @@ -147,14 +147,21 @@ func (p *processor) isRepoCurrentlyCloning(ctx context.Context, repoID int, comm } // write commits the correlated data to disk. -func (p *processor) write(ctx context.Context, dirname string, groupedBundleData *correlation.GroupedBundleData) (err error) { +func (p *processor) write(ctx context.Context, id int, dirname string, groupedBundleData *correlation.GroupedBundleData) (err error) { ctx, endOperation := p.metrics.WriteOperation.With(ctx, &err, observation.Args{}) defer endOperation(1, observation.Args{}) - writer, err := sqlitewriter.NewWriter(ctx, filepath.Join(dirname, "sqlite.db")) + f, err := os.OpenFile(filepath.Join(dirname, "sqlite.db"), os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return err } + f.Close() + + writer := scylladb.NewWriter(id) + // writer, err := sqlitewriter.NewWriter(ctx, filepath.Join(dirname, "sqlite.db")) + // if err != nil { + // return err + // } defer func() { err = writer.Close(err) }() diff --git a/enterprise/internal/codeintel/bundles/persistence/scylladb/reader.go b/enterprise/internal/codeintel/bundles/persistence/scylladb/reader.go new file mode 100644 index 00000000000000..a4a1a8587be32e --- /dev/null +++ b/enterprise/internal/codeintel/bundles/persistence/scylladb/reader.go @@ -0,0 +1,164 @@ +package scylladb + +import ( + "context" + "fmt" + "strings" + + "github.com/gocql/gocql" + "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence" + "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/serialization" + gobserializer "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/serialization/gob" + "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/types" +) + +type reader struct { + dumpID int + serializer serialization.Serializer +} + +var _ persistence.Reader = &reader{} + +func NewReader(dumpID int) persistence.Reader { + return &reader{ + dumpID: dumpID, + serializer: gobserializer.New(), + } +} + +func (r *reader) ReadMeta(ctx context.Context) (types.MetaData, error) { + var numResultChunks int + + if err := session.Query( + `SELECT num_result_chunks FROM metadata WHERE dump_id = ? LIMIT 1`, + r.dumpID, + ).Scan(&numResultChunks); err != nil { + return types.MetaData{}, err + } + + return types.MetaData{NumResultChunks: numResultChunks}, nil +} + +func (r *reader) PathsWithPrefix(ctx context.Context, prefix string) (px []string, _ error) { + iter := session.Query( + `SELECT path FROM documents WHERE dump_id = ?`, + r.dumpID, + ).Iter() + + var path string + for iter.Scan(&path) { + if strings.HasPrefix(path, prefix) { + px = append(px, path) + } + } + + if err := iter.Close(); err != nil { + return nil, err + } + + return px, nil +} + +func (r *reader) ReadDocument(ctx context.Context, path string) (types.DocumentData, bool, error) { + var data string + if err := session.Query( + `SELECT data FROM documents WHERE dump_id = ? AND path = ? LIMIT 1`, + r.dumpID, + path, + ).Scan(&data); err != nil { + if err == gocql.ErrNotFound { + err = nil + } + return types.DocumentData{}, false, err + } + + documentData, err := r.serializer.UnmarshalDocumentData([]byte(data)) + if err != nil { + return types.DocumentData{}, false, err + } + + return documentData, true, nil +} + +func (r *reader) ReadResultChunk(ctx context.Context, id int) (types.ResultChunkData, bool, error) { + var data string + if err := session.Query( + `SELECT data FROM result_chunks WHERE dump_id = ? AND idx = ? LIMIT 1`, + r.dumpID, + id, + ).Scan(&data); err != nil { + if err == gocql.ErrNotFound { + err = nil + } + return types.ResultChunkData{}, false, err + } + + resultChunkData, err := r.serializer.UnmarshalResultChunkData([]byte(data)) + if err != nil { + return types.ResultChunkData{}, false, err + } + + return resultChunkData, true, nil +} + +func (r *reader) ReadDefinitions(ctx context.Context, scheme, identifier string, skip, take int) ([]types.Location, int, error) { + return r.defref(ctx, "definitions", scheme, identifier, skip, take) +} + +func (r *reader) ReadReferences(ctx context.Context, scheme, identifier string, skip, take int) ([]types.Location, int, error) { + return r.defref(ctx, "references", scheme, identifier, skip, take) +} + +func (r *reader) defref(ctx context.Context, tableName, scheme, identifier string, skip, take int) ([]types.Location, int, error) { + locations, err := r.readDefinitionReferences(ctx, tableName, scheme, identifier) + if err != nil { + return nil, 0, err + } + + if skip == 0 && take == 0 { + // Pagination is disabled, return full result set + return locations, len(locations), nil + } + + lo := skip + if lo >= len(locations) { + // Skip lands past result set, return nothing + return nil, len(locations), nil + } + + hi := skip + take + if hi >= len(locations) { + hi = len(locations) + } + + return locations[lo:hi], len(locations), nil +} + +func (r *reader) readDefinitionReferences(ctx context.Context, tableName, scheme, identifier string) ([]types.Location, error) { + iter := session.Query( + fmt.Sprintf(`SELECT data FROM %s WHERE dump_id = ? AND scheme = ? AND identifier = ?`, tableName), + r.dumpID, + scheme, + identifier, + ).Iter() + + var data string + var allLocations []types.Location + for iter.Scan(&data) { + locations, err := r.serializer.UnmarshalLocations([]byte(data)) + if err != nil { + return nil, err + } + + allLocations = append(allLocations, locations...) + } + + if err := iter.Close(); err != nil { + return nil, err + } + return allLocations, nil +} + +func (r *reader) Close() error { + return nil +} diff --git a/enterprise/internal/codeintel/bundles/persistence/scylladb/session.go b/enterprise/internal/codeintel/bundles/persistence/scylladb/session.go new file mode 100644 index 00000000000000..49da299dc14770 --- /dev/null +++ b/enterprise/internal/codeintel/bundles/persistence/scylladb/session.go @@ -0,0 +1,72 @@ +package scylladb + +import ( + "sync" + + "github.com/gocql/gocql" +) + +var session *gocql.Session + +func init() { + cluster := gocql.NewCluster("localhost") + cluster.Keyspace = "lsif" + cluster.Consistency = gocql.One + + var err error + if session, err = cluster.CreateSession(); err != nil { + panic(err.Error()) + } +} + +// +// + +type batchWriter struct { + m sync.RWMutex + err error + wg sync.WaitGroup + ch chan struct { + query string + args []interface{} + } +} + +func newBatchWriter() *batchWriter { + w := &batchWriter{ + ch: make(chan struct { + query string + args []interface{} + }), + } + + for i := 0; i < 100; i++ { + w.wg.Add(1) + go func() { + defer w.wg.Done() + + for v := range w.ch { + if err := session.Query(v.query, v.args...).Exec(); err != nil { + w.m.Lock() + w.err = err + w.m.Unlock() + } + } + }() + } + + return w +} + +func (w *batchWriter) Write(query string, args ...interface{}) { + w.ch <- struct { + query string + args []interface{} + }{query, args} +} + +func (w *batchWriter) Flush() error { + close(w.ch) + w.wg.Wait() + return w.err +} diff --git a/enterprise/internal/codeintel/bundles/persistence/scylladb/writer.go b/enterprise/internal/codeintel/bundles/persistence/scylladb/writer.go new file mode 100644 index 00000000000000..f5279c2b5748bf --- /dev/null +++ b/enterprise/internal/codeintel/bundles/persistence/scylladb/writer.go @@ -0,0 +1,128 @@ +package scylladb + +import ( + "context" + + "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence" + "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/serialization" + gobserializer "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/serialization/gob" + "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/types" +) + +type writer struct { + dumpID int + serializer serialization.Serializer + writer *batchWriter +} + +var _ persistence.Writer = &writer{} + +func NewWriter(dumpID int) persistence.Writer { + return &writer{ + dumpID: dumpID, + serializer: gobserializer.New(), + writer: newBatchWriter(), + } +} + +func (w *writer) WriteMeta(ctx context.Context, meta types.MetaData) error { + w.writer.Write( + `INSERT INTO metadata (dump_id, num_result_chunks) VALUES (?, ?)`, + w.dumpID, + meta.NumResultChunks, + ) + + return nil +} + +func (w *writer) WriteDocuments(ctx context.Context, documents map[string]types.DocumentData) error { + for path, document := range documents { + data, err := w.serializer.MarshalDocumentData(document) + if err != nil { + return err + } + + w.writer.Write( + `INSERT INTO documents (dump_id, path, data) VALUES (?, ?, ?)`, + w.dumpID, + path, + data, + ) + } + + return nil +} + +func (w *writer) WriteResultChunks(ctx context.Context, resultChunks map[int]types.ResultChunkData) error { + for idx, resultChunk := range resultChunks { + data, err := w.serializer.MarshalResultChunkData(resultChunk) + if err != nil { + return err + } + + w.writer.Write( + `INSERT INTO result_chunks (dump_id, idx, data) VALUES (?, ?, ?)`, + w.dumpID, + idx, + data, + ) + } + + return nil +} + +func (w *writer) WriteDefinitions(ctx context.Context, monikerLocations []types.MonikerLocations) error { + for _, v := range monikerLocations { + data, err := w.serializer.MarshalLocations(v.Locations) + if err != nil { + return err + } + + w.writer.Write( + `INSERT INTO definitions (dump_id, scheme, identifier, data) VALUES (?, ?, ?, ?)`, + w.dumpID, + v.Scheme, + v.Identifier, + data, + ) + } + + return nil +} + +func (w *writer) WriteReferences(ctx context.Context, monikerLocations []types.MonikerLocations) error { + for _, v := range monikerLocations { + data, err := w.serializer.MarshalLocations(v.Locations) + if err != nil { + return err + } + + w.writer.Write( + `INSERT INTO references (dump_id, scheme, identifier, data) VALUES (?, ?, ?, ?)`, + w.dumpID, + v.Scheme, + v.Identifier, + data, + ) + } + + return nil +} + +func (w *writer) Close(err error) error { + return w.writer.Flush() +} + +// Run scylladb: +// docker run -p 9042:9042 -d scylladb/scylla + +// Prepare schema: +// docker exec -it {container-id} cqlsh +// +// cqlsh> CREATE KEYSPACE lsif WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; +// cqlsh> USE lsif; +// cqlsh:lsif> CREATE TABLE metadata (dump_id int, num_result_chunks int, PRIMARY KEY (dump_id)); +// cqlsh:lsif> CREATE TABLE documents (dump_id int, path text, data blob, PRIMARY KEY (dump_id, path)); +// cqlsh:lsif> CREATE TABLE result_chunks (dump_id int, idx int, data blob, PRIMARY KEY (dump_id, idx)); +// cqlsh:lsif> CREATE TABLE definitions (dump_id int, scheme text, identifier text, data blob, PRIMARY KEY (dump_id, scheme, identifier)); +// cqlsh:lsif> CREATE TABLE references (dump_id int, scheme text, identifier text, data blob, PRIMARY KEY (dump_id, scheme, identifier)); diff --git a/go.mod b/go.mod index 5389be677d5c54..afbf2c1778758b 100644 --- a/go.mod +++ b/go.mod @@ -49,6 +49,7 @@ require ( github.com/go-playground/validator/v10 v10.3.0 // indirect github.com/go-redsync/redsync v1.4.2 github.com/gobwas/glob v0.2.3 + github.com/gocql/gocql v0.0.0-20190301043612-f6df8288f9b4 github.com/golang-migrate/migrate/v4 v4.11.0 github.com/golang/gddo v0.0.0-20200611223618-a4829ef13274 github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e @@ -208,3 +209,5 @@ replace github.com/golang/lint => golang.org/x/lint v0.0.0-20191125180803-fdd1cd // See: https://github.com/ghodss/yaml/pull/65 replace github.com/ghodss/yaml => github.com/sourcegraph/yaml v1.0.1-0.20200714132230-56936252f152 + +replace github.com/gocql/gocql => github.com/scylladb/gocql v1.4.0 diff --git a/go.sum b/go.sum index dbbcf3932e5cbc..4e898a72422655 100644 --- a/go.sum +++ b/go.sum @@ -664,6 +664,7 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= github.com/hashicorp/consul/api v1.3.0/go.mod h1:MmDNSzIMUjNpY/mQ398R4bk2FnqQLoPndWW5VkKPlCE= github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= @@ -1117,6 +1118,8 @@ github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/schollz/progressbar/v3 v3.3.4 h1:nMinx+JaEm/zJz4cEyClQeAw5rsYSB5th3xv+5lV6Vg= github.com/schollz/progressbar/v3 v3.3.4/go.mod h1:Rp5lZwpgtYmlvmGo1FyDwXMqagyRBQYSDwzlP9QDu84= +github.com/scylladb/gocql v1.4.0 h1:JVePQWv+o9ezElfVESvNAzgxG1nmiiojYOaJRxO1pqo= +github.com/scylladb/gocql v1.4.0/go.mod h1:S154F0u6zQlF3JjuHAidQIExQf9H45yT8z68h0FQYdU= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/securego/gosec v0.0.0-20200103095621-79fbf3af8d83/go.mod h1:vvbZ2Ae7AzSq3/kywjUDxSNq2SJ27RxCz2un0H3ePqE= @@ -1780,6 +1783,7 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gcfg.v1 v1.2.3/go.mod h1:yesOnuUOFQAhST5vPY4nbZsb/huCgGGXlipJsBn0b3o= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=