Skip to content

Commit

Permalink
Squash commits.
Browse files Browse the repository at this point in the history
  • Loading branch information
efritz committed Aug 7, 2020
1 parent 00f5c2a commit 75199a9
Show file tree
Hide file tree
Showing 7 changed files with 396 additions and 16 deletions.
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/precise-code-intel-bundle-manager/internal/database"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/precise-code-intel-bundle-manager/internal/paths"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/scylladb"
sqlitereader "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/sqlite"
"github.com/sourcegraph/sourcegraph/internal/trace/ot"
)
Expand Down Expand Up @@ -377,20 +378,21 @@ func (s *Server) dbQueryErr(w http.ResponseWriter, r *http.Request, handler dbQu
span.Finish()
}()

return s.readerCache.WithReader(ctx, filename, func(reader persistence.Reader) error {
db, err := database.OpenDatabase(ctx, filename, persistence.NewObserved(reader, s.observationContext))
if err != nil {
return pkgerrors.Wrap(err, "database.OpenDatabase")
}
// return s.readerCache.WithReader(ctx, filename, func(reader persistence.Reader) error {
reader := scylladb.NewReader(int(idFromRequest(r)))
db, err := database.OpenDatabase(ctx, filename, persistence.NewObserved(reader, s.observationContext))
if err != nil {
return pkgerrors.Wrap(err, "database.OpenDatabase")
}

payload, err := handler(ctx, db)
if err != nil {
return err
}
payload, err := handler(ctx, db)
if err != nil {
return err
}

writeJSON(w, payload)
return nil
})
writeJSON(w, payload)
return nil
// })
}

// limitTransferRate applies a transfer limit to the given writer.
Expand Down
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/precise-code-intel-worker/internal/correlation"
"github.com/sourcegraph/sourcegraph/enterprise/cmd/precise-code-intel-worker/internal/metrics"
bundles "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/client"
sqlitewriter "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/sqlite"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/scylladb"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/types"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/gitserver"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/store"
Expand Down Expand Up @@ -91,7 +91,7 @@ func (p *processor) Process(ctx context.Context, store store.Store, upload store
return false, errors.Wrap(err, "correlation.Correlate")
}

if err := p.write(ctx, tempDir, groupedBundleData); err != nil {
if err := p.write(ctx, upload.ID, tempDir, groupedBundleData); err != nil {
return false, err
}

Expand Down Expand Up @@ -147,14 +147,21 @@ func (p *processor) isRepoCurrentlyCloning(ctx context.Context, repoID int, comm
}

// write commits the correlated data to disk.
func (p *processor) write(ctx context.Context, dirname string, groupedBundleData *correlation.GroupedBundleData) (err error) {
func (p *processor) write(ctx context.Context, id int, dirname string, groupedBundleData *correlation.GroupedBundleData) (err error) {
ctx, endOperation := p.metrics.WriteOperation.With(ctx, &err, observation.Args{})
defer endOperation(1, observation.Args{})

writer, err := sqlitewriter.NewWriter(ctx, filepath.Join(dirname, "sqlite.db"))
f, err := os.OpenFile(filepath.Join(dirname, "sqlite.db"), os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return err
}
f.Close()

writer := scylladb.NewWriter(id)
// writer, err := sqlitewriter.NewWriter(ctx, filepath.Join(dirname, "sqlite.db"))
// if err != nil {
// return err
// }
defer func() {
err = writer.Close(err)
}()
Expand Down
164 changes: 164 additions & 0 deletions enterprise/internal/codeintel/bundles/persistence/scylladb/reader.go
@@ -0,0 +1,164 @@
package scylladb

import (
"context"
"fmt"
"strings"

"github.com/gocql/gocql"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/serialization"
gobserializer "github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/persistence/serialization/gob"
"github.com/sourcegraph/sourcegraph/enterprise/internal/codeintel/bundles/types"
)

type reader struct {
dumpID int
serializer serialization.Serializer
}

var _ persistence.Reader = &reader{}

func NewReader(dumpID int) persistence.Reader {
return &reader{
dumpID: dumpID,
serializer: gobserializer.New(),
}
}

func (r *reader) ReadMeta(ctx context.Context) (types.MetaData, error) {
var numResultChunks int

if err := session.Query(
`SELECT num_result_chunks FROM metadata WHERE dump_id = ? LIMIT 1`,
r.dumpID,
).Scan(&numResultChunks); err != nil {
return types.MetaData{}, err
}

return types.MetaData{NumResultChunks: numResultChunks}, nil
}

func (r *reader) PathsWithPrefix(ctx context.Context, prefix string) (px []string, _ error) {
iter := session.Query(
`SELECT path FROM documents WHERE dump_id = ?`,
r.dumpID,
).Iter()

var path string
for iter.Scan(&path) {
if strings.HasPrefix(path, prefix) {
px = append(px, path)
}
}

if err := iter.Close(); err != nil {
return nil, err
}

return px, nil
}

func (r *reader) ReadDocument(ctx context.Context, path string) (types.DocumentData, bool, error) {
var data string
if err := session.Query(
`SELECT data FROM documents WHERE dump_id = ? AND path = ? LIMIT 1`,
r.dumpID,
path,
).Scan(&data); err != nil {
if err == gocql.ErrNotFound {
err = nil
}
return types.DocumentData{}, false, err
}

documentData, err := r.serializer.UnmarshalDocumentData([]byte(data))
if err != nil {
return types.DocumentData{}, false, err
}

return documentData, true, nil
}

func (r *reader) ReadResultChunk(ctx context.Context, id int) (types.ResultChunkData, bool, error) {
var data string
if err := session.Query(
`SELECT data FROM result_chunks WHERE dump_id = ? AND idx = ? LIMIT 1`,
r.dumpID,
id,
).Scan(&data); err != nil {
if err == gocql.ErrNotFound {
err = nil
}
return types.ResultChunkData{}, false, err
}

resultChunkData, err := r.serializer.UnmarshalResultChunkData([]byte(data))
if err != nil {
return types.ResultChunkData{}, false, err
}

return resultChunkData, true, nil
}

func (r *reader) ReadDefinitions(ctx context.Context, scheme, identifier string, skip, take int) ([]types.Location, int, error) {
return r.defref(ctx, "definitions", scheme, identifier, skip, take)
}

func (r *reader) ReadReferences(ctx context.Context, scheme, identifier string, skip, take int) ([]types.Location, int, error) {
return r.defref(ctx, "references", scheme, identifier, skip, take)
}

func (r *reader) defref(ctx context.Context, tableName, scheme, identifier string, skip, take int) ([]types.Location, int, error) {
locations, err := r.readDefinitionReferences(ctx, tableName, scheme, identifier)
if err != nil {
return nil, 0, err
}

if skip == 0 && take == 0 {
// Pagination is disabled, return full result set
return locations, len(locations), nil
}

lo := skip
if lo >= len(locations) {
// Skip lands past result set, return nothing
return nil, len(locations), nil
}

hi := skip + take
if hi >= len(locations) {
hi = len(locations)
}

return locations[lo:hi], len(locations), nil
}

func (r *reader) readDefinitionReferences(ctx context.Context, tableName, scheme, identifier string) ([]types.Location, error) {
iter := session.Query(
fmt.Sprintf(`SELECT data FROM %s WHERE dump_id = ? AND scheme = ? AND identifier = ?`, tableName),
r.dumpID,
scheme,
identifier,
).Iter()

var data string
var allLocations []types.Location
for iter.Scan(&data) {
locations, err := r.serializer.UnmarshalLocations([]byte(data))
if err != nil {
return nil, err
}

allLocations = append(allLocations, locations...)
}

if err := iter.Close(); err != nil {
return nil, err
}
return allLocations, nil
}

func (r *reader) Close() error {
return nil
}
@@ -0,0 +1,72 @@
package scylladb

import (
"sync"

"github.com/gocql/gocql"
)

var session *gocql.Session

func init() {
cluster := gocql.NewCluster("localhost")
cluster.Keyspace = "lsif"
cluster.Consistency = gocql.One

var err error
if session, err = cluster.CreateSession(); err != nil {
panic(err.Error())
}
}

//
//

type batchWriter struct {
m sync.RWMutex
err error
wg sync.WaitGroup
ch chan struct {
query string
args []interface{}
}
}

func newBatchWriter() *batchWriter {
w := &batchWriter{
ch: make(chan struct {
query string
args []interface{}
}),
}

for i := 0; i < 100; i++ {
w.wg.Add(1)
go func() {
defer w.wg.Done()

for v := range w.ch {
if err := session.Query(v.query, v.args...).Exec(); err != nil {
w.m.Lock()
w.err = err
w.m.Unlock()
}
}
}()
}

return w
}

func (w *batchWriter) Write(query string, args ...interface{}) {
w.ch <- struct {
query string
args []interface{}
}{query, args}
}

func (w *batchWriter) Flush() error {
close(w.ch)
w.wg.Wait()
return w.err
}

0 comments on commit 75199a9

Please sign in to comment.