From 68cf602ca72d523053a253ee2eb323f653c02d44 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Thu, 11 Feb 2021 16:24:40 +0100 Subject: [PATCH 01/13] No MappingToOrdered in VertexSetIntersection. --- sphynx/lynxkite-sphynx/vertex_set_intersection.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sphynx/lynxkite-sphynx/vertex_set_intersection.go b/sphynx/lynxkite-sphynx/vertex_set_intersection.go index e9d7a19460..1afd1caa41 100644 --- a/sphynx/lynxkite-sphynx/vertex_set_intersection.go +++ b/sphynx/lynxkite-sphynx/vertex_set_intersection.go @@ -71,11 +71,16 @@ func doVertexSetIntersection(vertexSets []*VertexSet) (intersection *VertexSet, } firstEmbedding = NewEdgeBundle(len(allHaveIt), len(allHaveIt)) - mapping := vs0.GetMappingToOrdered() - for idx, id := range allHaveIt { - firstEmbedding.Src[idx] = SphynxId(idx) - firstEmbedding.Dst[idx] = mapping[id] - firstEmbedding.EdgeMapping[idx] = id + for j, k := 0, 0; j < len(mergeVertices) && k < len(allHaveIt); { + if mergeVertices[j].id == allHaveIt[k] { + firstEmbedding.Src[k] = SphynxId(k) + firstEmbedding.Dst[k] = SphynxId(j) + firstEmbedding.EdgeMapping[k] = allHaveIt[k] + j++ + k++ + } else { + j++ + } } return } From 9f6a83f2a67ba78e41200857f0193eafd026d0ef Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Thu, 11 Feb 2021 16:27:19 +0100 Subject: [PATCH 02/13] Use Printf from log instead of fmt. --- sphynx/lynxkite-sphynx/entity_cache.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sphynx/lynxkite-sphynx/entity_cache.go b/sphynx/lynxkite-sphynx/entity_cache.go index 7e2f9f6dee..3cebb4194e 100644 --- a/sphynx/lynxkite-sphynx/entity_cache.go +++ b/sphynx/lynxkite-sphynx/entity_cache.go @@ -4,6 +4,7 @@ package main import ( "fmt" + "log" "os" "sort" "strconv" @@ -108,12 +109,12 @@ func (entityCache *EntityCache) maybeGarbageCollect() { for i := 0; i < len(evictionCandidates) && memEvicted < howMuchMemoryToRecycle; i++ { guid := evictionCandidates[i].guid - fmt.Printf("Evicting: %v\n", evictionCandidates[i]) + log.Printf("Evicting: %v\n", evictionCandidates[i]) delete(entityCache.cache, guid) memEvicted += evictionCandidates[i].memUsage itemsEvicted++ } - fmt.Printf("Evicted %d entities (out of %d), estimated size: %d time: %d\n", + log.Printf("Evicted %d entities (out of %d), estimated size: %d time: %d\n", itemsEvicted, len(evictionCandidates), memEvicted, (ourTimestamp()-start)/1000000) entityCache.totalMemUsage -= memEvicted } From 7fc14cb1a7128c24011ae74e49283a1ffd978a1c Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Thu, 11 Feb 2021 19:00:35 +0100 Subject: [PATCH 03/13] Handle reading unordered files by sorting and merging instead of MappingToOrdered. --- .../networkit_community_detection.go | 6 +- sphynx/lynxkite-sphynx/types.go | 16 +-- sphynx/lynxkite-sphynx/unordered_disk_io.go | 100 +++++++++++++----- .../vertex_attribute_filter.go | 3 - .../vertex_set_intersection.go | 33 +----- 5 files changed, 79 insertions(+), 79 deletions(-) diff --git a/sphynx/lynxkite-sphynx/networkit_community_detection.go b/sphynx/lynxkite-sphynx/networkit_community_detection.go index a0d12bf8ef..fead4f8006 100644 --- a/sphynx/lynxkite-sphynx/networkit_community_detection.go +++ b/sphynx/lynxkite-sphynx/networkit_community_detection.go @@ -38,12 +38,12 @@ func init() { defer networkit.DeletePartition(p) vs := &VertexSet{} vs.MappingToUnordered = make([]int64, p.NumberOfSubsets()) - vs.MappingToOrdered = make(map[int64]SphynxId) + mappingToOrdered := make(map[int64]SphynxId) ss := p.GetSubsetIdsVector() defer networkit.DeleteUint64Vector(ss) for i := range vs.MappingToUnordered { vs.MappingToUnordered[i] = int64(ss.Get(i)) - vs.MappingToOrdered[int64(ss.Get(i))] = SphynxId(i) + mappingToOrdered[int64(ss.Get(i))] = SphynxId(i) } es := &EdgeBundle{} es.EdgeMapping = make([]int64, p.NumberOfElements()) @@ -54,7 +54,7 @@ func init() { for i := range es.EdgeMapping { es.EdgeMapping[i] = int64(i) es.Src[i] = SphynxId(i) - es.Dst[i] = SphynxId(vs.MappingToOrdered[int64(v.Get(i))]) + es.Dst[i] = SphynxId(mappingToOrdered[int64(v.Get(i))]) } ea.output("partitions", vs) ea.output("belongsTo", es) diff --git a/sphynx/lynxkite-sphynx/types.go b/sphynx/lynxkite-sphynx/types.go index 6a0c893854..5cf62bb109 100644 --- a/sphynx/lynxkite-sphynx/types.go +++ b/sphynx/lynxkite-sphynx/types.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" pb "github.com/lynxkite/lynxkite/sphynx/proto" - "sync" ) type Server struct { @@ -42,21 +41,8 @@ func NewEdgeBundle(size int, maxSize int) *EdgeBundle { } type VertexSet struct { - sync.Mutex + // This slice contains the Spark IDs in ascending order. MappingToUnordered []int64 - MappingToOrdered map[int64]SphynxId -} - -func (vs *VertexSet) GetMappingToOrdered() map[int64]SphynxId { - vs.Lock() - defer vs.Unlock() - if vs.MappingToOrdered == nil { - vs.MappingToOrdered = make(map[int64]SphynxId, len(vs.MappingToUnordered)) - for i, j := range vs.MappingToUnordered { - vs.MappingToOrdered[j] = SphynxId(i) - } - } - return vs.MappingToOrdered } // A scalar is stored as its JSON encoding. If you need the real value, unmarshal it for yourself. diff --git a/sphynx/lynxkite-sphynx/unordered_disk_io.go b/sphynx/lynxkite-sphynx/unordered_disk_io.go index cfda520e66..32610d562b 100644 --- a/sphynx/lynxkite-sphynx/unordered_disk_io.go +++ b/sphynx/lynxkite-sphynx/unordered_disk_io.go @@ -14,6 +14,7 @@ import ( "log" "os" "reflect" + "sort" "strings" ) @@ -28,6 +29,40 @@ func toUnorderedRows(e TabularEntity, vs1 *VertexSet, vs2 *VertexSet) []interfac } } +func sortIds(ids []int64) { + sort.Sort(Int64Slice(ids)) +} + +type Int64Slice []int64 + +func (a Int64Slice) Len() int { + return len(a) +} +func (a Int64Slice) Less(i, j int) bool { + return a[i] < a[j] +} +func (a Int64Slice) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +type unorderedEdgesSorter struct { + rows []UnorderedEdgeRow + less func(i, j int) bool +} + +func (self unorderedEdgesSorter) Len() int { + return len(self.rows) +} +func (self unorderedEdgesSorter) Swap(i, j int) { + self.rows[i], self.rows[j] = self.rows[j], self.rows[i] +} +func (self unorderedEdgesSorter) Less(i, j int) bool { + return self.less(i, j) +} +func sortEdgeRows(rows []UnorderedEdgeRow, less func(i, j int) bool) { + sort.Sort(unorderedEdgesSorter{rows, less}) +} + func (s *Server) WriteToUnorderedDisk(ctx context.Context, in *pb.WriteToUnorderedDiskRequest) (*pb.WriteToUnorderedDiskReply, error) { const numGoRoutines int64 = 4 guid := GUID(in.Guid) @@ -148,14 +183,12 @@ func (s *Server) ReadFromUnorderedDisk( rows = append(rows, partialRows...) } mappingToUnordered := make([]int64, numRows) - mappingToOrdered := make(map[int64]SphynxId) for i, v := range rows { mappingToUnordered[i] = v.Id - mappingToOrdered[v.Id] = SphynxId(i) } + sortIds(mappingToUnordered) entity = &VertexSet{ MappingToUnordered: mappingToUnordered, - MappingToOrdered: mappingToOrdered, } case *EdgeBundle: vs1, err := s.getVertexSet(GUID(in.Vsguid1)) @@ -167,7 +200,6 @@ func (s *Server) ReadFromUnorderedDisk( return nil, err } rows := make([]UnorderedEdgeRow, 0) - numRows := 0 for _, fr := range fileReaders { pr, err := reader.NewParquetReader(fr, new(UnorderedEdgeRow), numGoRoutines) if err != nil { @@ -175,28 +207,41 @@ func (s *Server) ReadFromUnorderedDisk( } partialNumRows := int(pr.GetNumRows()) partialRows := make([]UnorderedEdgeRow, partialNumRows) - numRows = numRows + partialNumRows if err := pr.Read(&partialRows); err != nil { return nil, fmt.Errorf("Failed to read parquet file of EdgeBundle: %v", err) } pr.ReadStop() rows = append(rows, partialRows...) } - edgeMapping := make([]int64, numRows) - src := make([]SphynxId, numRows) - dst := make([]SphynxId, numRows) - mappingToOrdered1 := vs1.GetMappingToOrdered() - mappingToOrdered2 := vs2.GetMappingToOrdered() - for i, row := range rows { - edgeMapping[i] = row.Id - src[i] = mappingToOrdered1[row.Src] - dst[i] = mappingToOrdered2[row.Dst] + // Translate Src to ordered IDs. + sortEdgeRows(rows, func(i, j int) bool { return rows[i].Src < rows[j].Src }) + for i, j := 0, 0; i < len(vs1.MappingToUnordered) && j < len(rows); { + if vs1.MappingToUnordered[i] == rows[j].Src { + rows[j].Src = int64(i) + j++ + } else { + i++ + } } - entity = &EdgeBundle{ - Src: src, - Dst: dst, - EdgeMapping: edgeMapping, + // Translate Dst to ordered IDs. + sortEdgeRows(rows, func(i, j int) bool { return rows[i].Dst < rows[j].Dst }) + for i, j := 0, 0; i < len(vs2.MappingToUnordered) && j < len(rows); { + if vs2.MappingToUnordered[i] == rows[j].Dst { + rows[j].Dst = int64(i) + j++ + } else { + i++ + } + } + // Store the results ordered by edge ID. + sortEdgeRows(rows, func(i, j int) bool { return rows[i].Id < rows[j].Id }) + es := NewEdgeBundle(len(rows), len(rows)) + for i, row := range rows { + es.EdgeMapping[i] = row.Id + es.Src[i] = SphynxId(row.Src) + es.Dst[i] = SphynxId(row.Dst) } + entity = es case TabularEntity: vs, err := s.getVertexSet(GUID(in.Vsguid1)) if err != nil { @@ -207,8 +252,6 @@ func (s *Server) ReadFromUnorderedDisk( rowSliceType := reflect.SliceOf(rowType) rowsPointer := reflect.New(rowSliceType) rows := rowsPointer.Elem() - - numRows := 0 for _, fr := range fileReaders { pr, err := reader.NewParquetReader(fr, e.unorderedRow(), numGoRoutines) if err != nil { @@ -218,7 +261,6 @@ func (s *Server) ReadFromUnorderedDisk( partialRowsPointer := reflect.New(rowSliceType) partialRows := partialRowsPointer.Elem() partialRows.Set(reflect.MakeSlice(rowSliceType, partialNumRows, partialNumRows)) - numRows = partialNumRows + numRows if err := pr.Read(partialRowsPointer.Interface()); err != nil { return nil, fmt.Errorf("Failed to read parquet file of %v: %v", reflect.TypeOf(e), err) } @@ -232,13 +274,17 @@ func (s *Server) ReadFromUnorderedDisk( defined := attr.Elem().FieldByName("Defined") idIndex := fieldIndex(rowType, "Id") valueIndex := fieldIndex(rowType, "Value") - mappingToOrdered := vs.GetMappingToOrdered() true := reflect.ValueOf(true) - for i := 0; i < numRows; i++ { - row := rows.Index(i) - orderedId := mappingToOrdered[row.Field(idIndex).Int()] - values.Index(int(orderedId)).Set(row.Field(valueIndex)) - defined.Index(int(orderedId)).Set(true) + sort.Slice(rows.Interface(), func(i, j int) bool { + return rows.Index(i).Field(idIndex).Int() < rows.Index(j).Field(idIndex).Int() + }) + for i, j := 0, 0; i < len(vs.MappingToUnordered) && j < rows.Len(); i++ { + row := rows.Index(j) + if vs.MappingToUnordered[i] == row.Field(idIndex).Int() { + values.Index(i).Set(row.Field(valueIndex)) + defined.Index(i).Set(true) + j++ + } } case *Scalar: sc, err := readScalar(dirName) diff --git a/sphynx/lynxkite-sphynx/vertex_attribute_filter.go b/sphynx/lynxkite-sphynx/vertex_attribute_filter.go index 09f8f1ddf0..dcd67a88ab 100644 --- a/sphynx/lynxkite-sphynx/vertex_attribute_filter.go +++ b/sphynx/lynxkite-sphynx/vertex_attribute_filter.go @@ -4,7 +4,6 @@ package main import ( "fmt" "strconv" - "sync" ) type filterType struct { @@ -151,9 +150,7 @@ func doVertexAttributeFilter(job filterJobDescription, vs *VertexSet, attr Tabul identity *EdgeBundle) { fvs = &VertexSet{ - Mutex: sync.Mutex{}, MappingToUnordered: make([]int64, 0, len(vs.MappingToUnordered)), - MappingToOrdered: nil, } identity = NewEdgeBundle(0, len(vs.MappingToUnordered)) switch a := attr.(type) { diff --git a/sphynx/lynxkite-sphynx/vertex_set_intersection.go b/sphynx/lynxkite-sphynx/vertex_set_intersection.go index 1afd1caa41..d0e725e35d 100644 --- a/sphynx/lynxkite-sphynx/vertex_set_intersection.go +++ b/sphynx/lynxkite-sphynx/vertex_set_intersection.go @@ -4,49 +4,21 @@ package main import ( "fmt" - "sort" - "sync" ) type MergeVertexEntry struct { id int64 count int } -type MergeVertexEntrySlice []MergeVertexEntry - -func (a MergeVertexEntrySlice) Len() int { - return len(a) -} -func (a MergeVertexEntrySlice) Less(i, j int) bool { - return a[i].id < a[j].id -} -func (a MergeVertexEntrySlice) Swap(i, j int) { - a[i], a[j] = a[j], a[i] -} - -type Int64Slice []int64 - -func (a Int64Slice) Len() int { - return len(a) -} -func (a Int64Slice) Less(i, j int) bool { - return a[i] < a[j] -} -func (a Int64Slice) Swap(i, j int) { - a[i], a[j] = a[j], a[i] -} func doVertexSetIntersection(vertexSets []*VertexSet) (intersection *VertexSet, firstEmbedding *EdgeBundle) { - mergeVertices := make(MergeVertexEntrySlice, len(vertexSets[0].MappingToUnordered)) + mergeVertices := make([]MergeVertexEntry, len(vertexSets[0].MappingToUnordered)) vs0 := vertexSets[0] for idx, id := range vs0.MappingToUnordered { mergeVertices[idx].id = id } - sort.Sort(mergeVertices) for i := 1; i < len(vertexSets); i++ { - w := make([]int64, len(vertexSets[i].MappingToUnordered)) - copy(w, vertexSets[i].MappingToUnordered) - sort.Sort(Int64Slice(w)) + w := vertexSets[i].MappingToUnordered for j, k := 0, 0; j < len(mergeVertices) && k < len(w); { if mergeVertices[j].id == w[k] { mergeVertices[j].count++ @@ -66,7 +38,6 @@ func doVertexSetIntersection(vertexSets []*VertexSet) (intersection *VertexSet, } } intersection = &VertexSet{ - Mutex: sync.Mutex{}, MappingToUnordered: allHaveIt, } From f85f115702357eceb7561fd1d521b9fde4737954 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Thu, 11 Feb 2021 19:00:53 +0100 Subject: [PATCH 04/13] Fix/delete NetworKit tests. --- sphynx/lynxkite-sphynx/networkit_test.go | 26 +----------------------- 1 file changed, 1 insertion(+), 25 deletions(-) diff --git a/sphynx/lynxkite-sphynx/networkit_test.go b/sphynx/lynxkite-sphynx/networkit_test.go index b62a7251fb..a2e1582667 100644 --- a/sphynx/lynxkite-sphynx/networkit_test.go +++ b/sphynx/lynxkite-sphynx/networkit_test.go @@ -56,30 +56,6 @@ func TestNewVertexAttribute(t *testing.T) { } } -func TestGraphToSphynx(t *testing.T) { - c := networkit.NewBarabasiAlbertGenerator(uint64(2), uint64(5)) - defer networkit.DeleteBarabasiAlbertGenerator(c) - g := c.Generate() - defer networkit.DeleteGraph(g) - vs, es := ToSphynx(g) - if len(vs.MappingToUnordered) != 5 { - t.Errorf("Vertex set is %v, expected 5.", vs.MappingToUnordered) - } - expectedSrc := []SphynxId{1, 1, 2, 2, 3, 3, 4, 4} - if len(es.Src) != len(expectedSrc) { - t.Errorf("Source list is %v, expected %v.", es.Src, expectedSrc) - } - if len(es.Dst) != len(expectedSrc) { - t.Errorf("Destination list is %v, expected length %v.", es.Dst, len(expectedSrc)) - } - for i := range es.Src { - if es.Src[i] != expectedSrc[i] { - t.Errorf("Source list is %v, expected %v.", es.Src, expectedSrc) - break - } - } -} - func TestVectorVector(t *testing.T) { c := networkit.NewBarabasiAlbertGenerator(uint64(3), uint64(10)) defer networkit.DeleteBarabasiAlbertGenerator(c) @@ -93,7 +69,7 @@ func TestVectorVector(t *testing.T) { for i := 0; i < int(points.Size()); i += 1 { x := points.Get(i).At(0) y := points.Get(i).At(1) - if x < -2 || x > 2 || x == 0 || y < -2 || y > 2 || y == 0 { + if x < -3 || x > 3 || x == 0 || y < -3 || y > 3 || y == 0 { t.Errorf("Unexpected coordinates: %v %v", x, y) } } From 950e9540711f5d7c218de8beba0258c1338020bc Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Thu, 11 Feb 2021 19:16:06 +0100 Subject: [PATCH 05/13] Sort when loading, assert when saving ordered data. --- sphynx/lynxkite-sphynx/entity_io.go | 3 +++ sphynx/lynxkite-sphynx/unordered_disk_io.go | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/sphynx/lynxkite-sphynx/entity_io.go b/sphynx/lynxkite-sphynx/entity_io.go index 98a7152d67..0dda2e5bc4 100644 --- a/sphynx/lynxkite-sphynx/entity_io.go +++ b/sphynx/lynxkite-sphynx/entity_io.go @@ -55,6 +55,7 @@ var vertexSetSchema = arrow.NewSchema( }, nil) func (v *VertexSet) toOrderedRows() array.Record { + assertSorted(v.MappingToUnordered) b := array.NewInt64Builder(arrowAllocator) defer b.Release() b.AppendValues(v.MappingToUnordered, nil) @@ -69,6 +70,8 @@ func (v *VertexSet) readFromOrdered(rec array.Record) error { for i, d := range data { v.MappingToUnordered[i] = d } + // For backward compatibility. + sortIds(v.MappingToUnordered) return nil } diff --git a/sphynx/lynxkite-sphynx/unordered_disk_io.go b/sphynx/lynxkite-sphynx/unordered_disk_io.go index 32610d562b..b8aa6c199a 100644 --- a/sphynx/lynxkite-sphynx/unordered_disk_io.go +++ b/sphynx/lynxkite-sphynx/unordered_disk_io.go @@ -32,6 +32,12 @@ func toUnorderedRows(e TabularEntity, vs1 *VertexSet, vs2 *VertexSet) []interfac func sortIds(ids []int64) { sort.Sort(Int64Slice(ids)) } +func assertSorted(ids []int64) { + if !sort.IsSorted(Int64Slice(ids)) { + // The previous loglines will point out which entity this is. + panic("These IDs are not sorted.") + } +} type Int64Slice []int64 From 6d3c23188a7331458b40bedd0cd79eddb380dc47 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Fri, 12 Feb 2021 10:52:53 +0100 Subject: [PATCH 06/13] Bit clearer logging. --- sphynx/lynxkite-sphynx/main.go | 1 + sphynx/lynxkite-sphynx/sphynxdisk.go | 4 ++-- sphynx/lynxkite-sphynx/unordered_disk_io.go | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sphynx/lynxkite-sphynx/main.go b/sphynx/lynxkite-sphynx/main.go index 501fda9cb2..bf36bcb367 100644 --- a/sphynx/lynxkite-sphynx/main.go +++ b/sphynx/lynxkite-sphynx/main.go @@ -62,6 +62,7 @@ func (s *Server) CanCompute(ctx context.Context, in *pb.CanComputeRequest) (*pb. func (s *Server) Compute(ctx context.Context, in *pb.ComputeRequest) (*pb.ComputeReply, error) { opInst := OperationInstanceFromJSON(in.Operation) + log.Printf("Computing %v.", shortOpName(opInst)) switch in.Domain { case "SphynxMemory": op, exists := operationRepository[shortOpName(opInst)] diff --git a/sphynx/lynxkite-sphynx/sphynxdisk.go b/sphynx/lynxkite-sphynx/sphynxdisk.go index 04837a02de..a38d554dae 100644 --- a/sphynx/lynxkite-sphynx/sphynxdisk.go +++ b/sphynx/lynxkite-sphynx/sphynxdisk.go @@ -38,8 +38,8 @@ func createEntity(typeName string) (Entity, error) { } func saveToOrderedDisk(e Entity, dataDir string, guid GUID) error { - log.Printf("saveToOrderedDisk guid %v", guid) typeName := e.typeName() + log.Printf("Writing %v %v to ordered disk.", typeName, guid) dirName := fmt.Sprintf("%v/%v", dataDir, guid) _ = os.Mkdir(dirName, 0775) typeFName := fmt.Sprintf("%v/type_name", dirName) @@ -96,7 +96,6 @@ func saveToOrderedDisk(e Entity, dataDir string, guid GUID) error { } func loadFromOrderedDisk(dataDir string, guid GUID) (Entity, error) { - log.Printf("loadFromOrderedDisk: %v", guid) dirName := fmt.Sprintf("%v/%v", dataDir, guid) typeFName := fmt.Sprintf("%v/type_name", dirName) typeData, err := ioutil.ReadFile(typeFName) @@ -104,6 +103,7 @@ func loadFromOrderedDisk(dataDir string, guid GUID) (Entity, error) { return nil, fmt.Errorf("Failed to read type of %v: %v", dirName, err) } typeName := string(typeData) + log.Printf("Reading %v %v from ordered disk.", typeName, guid) e, err := createEntity(typeName) if err != nil { return nil, err diff --git a/sphynx/lynxkite-sphynx/unordered_disk_io.go b/sphynx/lynxkite-sphynx/unordered_disk_io.go index b8aa6c199a..cb1ed32244 100644 --- a/sphynx/lynxkite-sphynx/unordered_disk_io.go +++ b/sphynx/lynxkite-sphynx/unordered_disk_io.go @@ -76,7 +76,7 @@ func (s *Server) WriteToUnorderedDisk(ctx context.Context, in *pb.WriteToUnorder if !exists { return nil, fmt.Errorf("Guid %v is missing", guid) } - log.Printf("Reindexing entity with guid %v to use spark IDs.", guid) + log.Printf("Writing %v %v to unordered disk.", entity.typeName(), guid) dirName := fmt.Sprintf("%v/%v", s.unorderedDataDir, guid) _ = os.Mkdir(dirName, 0775) fname := fmt.Sprintf("%v/part-00000.parquet", dirName) @@ -134,7 +134,6 @@ func (s *Server) WriteToUnorderedDisk(ctx context.Context, in *pb.WriteToUnorder func (s *Server) ReadFromUnorderedDisk( ctx context.Context, in *pb.ReadFromUnorderedDiskRequest) (*pb.ReadFromUnorderedDiskReply, error) { const numGoRoutines int64 = 4 - log.Printf("Reindexing entity with guid %v to use Sphynx IDs.", in.Guid) dirName := fmt.Sprintf("%v/%v", s.unorderedDataDir, in.Guid) files, err := ioutil.ReadDir(dirName) if err != nil { @@ -166,6 +165,7 @@ func (s *Server) ReadFromUnorderedDisk( in.Type = attributeType + in.Type } } + log.Printf("Reading %v %v from unordered disk.", in.Type, in.Guid) entity, err := createEntity(in.Type) if err != nil { return nil, err From 3acca47948eaaa6fd43c16acb4f85d1acaaa5a22 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Fri, 12 Feb 2021 10:53:26 +0100 Subject: [PATCH 07/13] Sort edge bundle in StripDuplicateEdgesFromBundle. --- .../strip_duplicate_edges_from_bundle.go | 1 + sphynx/lynxkite-sphynx/types.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/sphynx/lynxkite-sphynx/strip_duplicate_edges_from_bundle.go b/sphynx/lynxkite-sphynx/strip_duplicate_edges_from_bundle.go index bbadd0ed93..c6aa55f5da 100644 --- a/sphynx/lynxkite-sphynx/strip_duplicate_edges_from_bundle.go +++ b/sphynx/lynxkite-sphynx/strip_duplicate_edges_from_bundle.go @@ -25,6 +25,7 @@ func doStripDuplicateEdgesFromBundle(es *EdgeBundle) *EdgeBundle { uniqueBundle.EdgeMapping[i] = id i++ } + uniqueBundle.Sort() return uniqueBundle } diff --git a/sphynx/lynxkite-sphynx/types.go b/sphynx/lynxkite-sphynx/types.go index 5cf62bb109..8fcfcb5a15 100644 --- a/sphynx/lynxkite-sphynx/types.go +++ b/sphynx/lynxkite-sphynx/types.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" pb "github.com/lynxkite/lynxkite/sphynx/proto" + "sort" ) type Server struct { @@ -40,6 +41,21 @@ func NewEdgeBundle(size int, maxSize int) *EdgeBundle { } } +func (self *EdgeBundle) Len() int { + return len(self.EdgeMapping) +} +func (self *EdgeBundle) Swap(i, j int) { + self.Src[i], self.Src[j] = self.Src[j], self.Src[i] + self.Dst[i], self.Dst[j] = self.Dst[j], self.Dst[i] + self.EdgeMapping[i], self.EdgeMapping[j] = self.EdgeMapping[j], self.EdgeMapping[i] +} +func (self *EdgeBundle) Less(i, j int) bool { + return self.EdgeMapping[i] < self.EdgeMapping[j] +} +func (self *EdgeBundle) Sort() { + sort.Sort(self) +} + type VertexSet struct { // This slice contains the Spark IDs in ascending order. MappingToUnordered []int64 From eb7a6667ae98a83d4fae6ce3916d9caa80d8e4d1 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Fri, 12 Feb 2021 12:16:01 +0100 Subject: [PATCH 08/13] Upgrade testcontainers to fix the Neo4j test. --- build.sbt | 4 ++-- dependency-licenses/scala.md | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/build.sbt b/build.sbt index 13e8b5f2c2..912c2e3fe1 100644 --- a/build.sbt +++ b/build.sbt @@ -94,8 +94,8 @@ libraryDependencies ++= Seq( // Used for working with AVRO files. "org.apache.spark" %% "spark-avro" % sparkVersion.value, // For Neo4j tests. - "org.testcontainers" % "testcontainers" % "1.14.3" % Test, - "org.testcontainers" % "neo4j" % "1.14.3" % Test, + "org.testcontainers" % "testcontainers" % "1.15.2" % Test, + "org.testcontainers" % "neo4j" % "1.15.2" % Test, // Used for working with Delta tables. "io.delta" %% "delta-core" % "0.6.1" ) diff --git a/dependency-licenses/scala.md b/dependency-licenses/scala.md index 8fc3fc3495..b93fe4fd25 100644 --- a/dependency-licenses/scala.md +++ b/dependency-licenses/scala.md @@ -41,7 +41,6 @@ Apache | [Apache License](LICENSE.txt) | org.apache.httpcomponents # httpmime # Apache | [Apache License 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) | com.ning # async-http-client # 1.8.14 | Apache | [Apache License Version 2.0](LICENSE.txt) | org.yaml # snakeyaml # 1.15 | Apache | [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | net.java.dev.jna # jna # 5.5.0 | -Apache | [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | net.java.dev.jna # jna-platform # 5.5.0 | Apache | [Apache License, Version 2.0](https://aws.amazon.com/apache2.0) | com.amazonaws # aws-java-sdk # 1.7.4 | Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.clearspring.analytics # stream # 2.7.0 | Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.github.javaparser # javaparser-core # 3.2.5 | @@ -50,8 +49,6 @@ Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2. Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.cloud.bigdataoss # util # 1.6.1 | Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.cloud.bigdataoss # util-hadoop # 1.6.1-hadoop2 | Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) | com.jamesmurty.utils # java-xmlbuilder # 1.1 | -Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.kohlschutter.junixsocket # junixsocket-common # 2.0.4 | -Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.kohlschutter.junixsocket # junixsocket-native-common # 2.0.4 | Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) | com.typesafe # config # 1.2.1 | Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) | com.typesafe.akka # akka-actor_2.11 # 2.3.4 | Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) | com.typesafe.akka # akka-slf4j_2.11 # 2.3.4 | @@ -103,10 +100,14 @@ Apache | [Similar to Apache License but with the acknowledgment clause removed]( Apache | [The Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | io.opencensus # opencensus-api # 0.21.0 | Apache | [The Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | io.opencensus # opencensus-contrib-grpc-metrics # 0.21.0 | Apache | [The Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.spark-project.spark # unused # 1.0.0 | +Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.core # jackson-annotations # 2.10.3 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.core # jackson-annotations # 2.6.5 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.core # jackson-core # 2.6.5 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.core # jackson-databind # 2.6.5 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.dataformat # jackson-dataformat-yaml # 2.6.5 | +Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.github.docker-java # docker-java-api # 3.2.7 | +Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.github.docker-java # docker-java-transport # 3.2.7 | +Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.github.docker-java # docker-java-transport-zerodep # 3.2.7 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.api-client # google-api-client # 1.20.0 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.api-client # google-api-client-jackson2 # 1.20.0 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.api-client # google-api-client-java6 # 1.20.0 | @@ -132,7 +133,6 @@ Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licens Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.codehaus.jackson # jackson-mapper-asl # 1.9.13 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.ejml # ejml-core # 0.34 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.ejml # ejml-ddense # 0.34 | -Apache | [The Apache Software License, Version 2.0](https://www.apache.org/licenses/LICENSE-2.0.txt) | org.jetbrains # annotations # 19.0.0 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.seleniumhq.selenium # selenium-android-driver # 2.39.0 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.seleniumhq.selenium # selenium-api # 2.39.0 | Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.seleniumhq.selenium # selenium-chrome-driver # 2.39.0 | @@ -204,7 +204,6 @@ BSD | [The BSD License](http://www.antlr.org/license.html) | org.antlr # antlr4- BSD | [Three-clause BSD-style](https://github.com/mpilquist/simulacrum/blob/master/LICENSE) | com.github.mpilquist # simulacrum_2.11 # 0.10.0 | BSD | [Three-clause BSD-style](https://github.com/scodec/scodec-bits/blob/master/LICENSE) | org.scodec # scodec-bits_2.11 # 1.0.9 | BSD | [Two-clause BSD-style license](http://github.com/sbt/junit-interface/blob/master/LICENSE.txt) | com.novocode # junit-interface # 0.11-RC1 | -CC0 | [CC0 1.0 Universal License](http://creativecommons.org/publicdomain/zero/1.0/) | org.scijava # native-lib-loader # 2.0.2 | CDDL | [COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0](https://glassfish.dev.java.net/public/CDDLv1.0.html) | javax.activation # activation # 1.1.1 | GPL | [ Dual license consisting of the CDDL v1.1 and GPL v2 ](https://glassfish.java.net/public/CDDL+GPL_1_1.html) | org.glassfish # javax.json # 1.0.4 | GPL | [GPL2 w/ CPE](https://www.gnu.org/software/classpath/license.html) | jakarta.ws.rs # jakarta.ws.rs-api # 2.1.6 | @@ -226,7 +225,6 @@ MIT | [MIT](http://opensource.org/licenses/MIT) | com.github.julien-truffaut # m MIT | [MIT](http://www.opensource.org/licenses/mit-license.html) | com.lihaoyi # fansi_2.11 # 0.2.4 | MIT | [MIT](http://www.opensource.org/licenses/mit-license.html) | com.lihaoyi # pprint_2.11 # 0.5.2 | MIT | [MIT](http://www.opensource.org/licenses/mit-license.html) | com.lihaoyi # sourcecode_2.11 # 0.1.4 | -MIT | [MIT](http://opensource.org/licenses/MIT) | org.rnorth # tcp-unix-socket-proxy # 1.0.2 | MIT | [MIT](http://opensource.org/licenses/MIT) | org.rnorth.duct-tape # duct-tape # 1.0.8 | MIT | [MIT](http://opensource.org/licenses/MIT) | org.rnorth.visible-assertions # visible-assertions # 2.1.2 | MIT | [MIT](http://opensource.org/licenses/MIT) | org.scalaz.stream # scalaz-stream_2.11 # 0.8 | @@ -234,8 +232,8 @@ MIT | [MIT](http://opensource.org/licenses/MIT) | org.spire-math # jawn-parser_2 MIT | [MIT](http://opensource.org/licenses/MIT) | org.spire-math # kind-projector_2.11 # 0.7.1 | MIT | [MIT](http://opensource.org/licenses/MIT) | org.spire-math # spire-macros_2.11 # 0.13.0 | MIT | [MIT](http://opensource.org/licenses/MIT) | org.spire-math # spire_2.11 # 0.13.0 | -MIT | [MIT](http://opensource.org/licenses/MIT) | org.testcontainers # neo4j # 1.14.3 | -MIT | [MIT](http://opensource.org/licenses/MIT) | org.testcontainers # testcontainers # 1.14.3 | +MIT | [MIT](http://opensource.org/licenses/MIT) | org.testcontainers # neo4j # 1.15.2 | +MIT | [MIT](http://opensource.org/licenses/MIT) | org.testcontainers # testcontainers # 1.15.2 | MIT | [MIT](http://opensource.org/licenses/MIT) | org.typelevel # cats-core_2.11 # 0.9.0 | MIT | [MIT](http://opensource.org/licenses/MIT) | org.typelevel # cats-kernel_2.11 # 0.9.0 | MIT | [MIT](http://opensource.org/licenses/MIT) | org.typelevel # cats-macros_2.11 # 0.9.0 | From 564095530de052ed391f3fe03cb3edaf208ac4ef Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Fri, 12 Feb 2021 19:26:30 +0100 Subject: [PATCH 09/13] Improve "guid missing" error message. --- sphynx/lynxkite-sphynx/entity_cache.go | 7 +++++++ sphynx/lynxkite-sphynx/main.go | 4 ++-- sphynx/lynxkite-sphynx/operations.go | 2 +- sphynx/lynxkite-sphynx/sphynxdisk.go | 2 +- sphynx/lynxkite-sphynx/unordered_disk_io.go | 2 +- 5 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sphynx/lynxkite-sphynx/entity_cache.go b/sphynx/lynxkite-sphynx/entity_cache.go index 3cebb4194e..ce8b0ab596 100644 --- a/sphynx/lynxkite-sphynx/entity_cache.go +++ b/sphynx/lynxkite-sphynx/entity_cache.go @@ -78,6 +78,13 @@ func (entityCache *EntityCache) Set(guid GUID, entity Entity) { // But we do not want to update the timestamp for those. } +func NotInCacheError(kind string, guid GUID) error { + // If we drop something from the cache it will be reloaded before the next use. + // The exception is when we drop it right after loading it. This generally means + // the cache is too small. + return fmt.Errorf("Could not fit %v %v into memory. Increase SPHYNX_CACHED_ENTITIES_MAX_MEM_MB?") +} + type entityEvictionItem struct { guid GUID timestamp int64 diff --git a/sphynx/lynxkite-sphynx/main.go b/sphynx/lynxkite-sphynx/main.go index bf36bcb367..10bbffc865 100644 --- a/sphynx/lynxkite-sphynx/main.go +++ b/sphynx/lynxkite-sphynx/main.go @@ -118,7 +118,7 @@ func (s *Server) GetScalar(ctx context.Context, in *pb.GetScalarRequest) (*pb.Ge log.Printf("Received GetScalar request with GUID %v.", guid) entity, exists := s.entityCache.Get(guid) if !exists { - return nil, fmt.Errorf("Guid %v is missing", guid) + return nil, NotInCacheError("scalar", guid) } switch scalar := entity.(type) { @@ -139,7 +139,7 @@ func (s *Server) HasInSphynxMemory(ctx context.Context, in *pb.HasInSphynxMemory func (s *Server) getVertexSet(guid GUID) (*VertexSet, error) { entity, exists := s.entityCache.Get(guid) if !exists { - return nil, fmt.Errorf("Guid %v is missing", guid) + return nil, NotInCacheError("vertex set", guid) } switch vs := entity.(type) { case *VertexSet: diff --git a/sphynx/lynxkite-sphynx/operations.go b/sphynx/lynxkite-sphynx/operations.go index 4075c9ead6..6bb5e26dcf 100644 --- a/sphynx/lynxkite-sphynx/operations.go +++ b/sphynx/lynxkite-sphynx/operations.go @@ -19,7 +19,7 @@ func collectInputs(server *Server, opInst *OperationInstance) (map[string]Entity for name, guid := range opInst.Inputs { entity, exists := server.entityCache.Get(guid) if !exists { - return nil, fmt.Errorf("Guid %v is missing", guid) + return nil, NotInCacheError("input", guid) } inputs[name] = entity } diff --git a/sphynx/lynxkite-sphynx/sphynxdisk.go b/sphynx/lynxkite-sphynx/sphynxdisk.go index a38d554dae..4f85231704 100644 --- a/sphynx/lynxkite-sphynx/sphynxdisk.go +++ b/sphynx/lynxkite-sphynx/sphynxdisk.go @@ -160,7 +160,7 @@ func (s *Server) WriteToOrderedDisk( e, exists := s.entityCache.Get(guid) if !exists { - return nil, fmt.Errorf("Guid %v is missing", guid) + return nil, NotInCacheError("entity", guid) } if err := saveToOrderedDisk(e, s.dataDir, guid); err != nil { diff --git a/sphynx/lynxkite-sphynx/unordered_disk_io.go b/sphynx/lynxkite-sphynx/unordered_disk_io.go index cb1ed32244..b99440d89d 100644 --- a/sphynx/lynxkite-sphynx/unordered_disk_io.go +++ b/sphynx/lynxkite-sphynx/unordered_disk_io.go @@ -74,7 +74,7 @@ func (s *Server) WriteToUnorderedDisk(ctx context.Context, in *pb.WriteToUnorder guid := GUID(in.Guid) entity, exists := s.entityCache.Get(guid) if !exists { - return nil, fmt.Errorf("Guid %v is missing", guid) + return nil, NotInCacheError("entity", guid) } log.Printf("Writing %v %v to unordered disk.", entity.typeName(), guid) dirName := fmt.Sprintf("%v/%v", s.unorderedDataDir, guid) From db6e7d63265b88f327203b9596b6fc3815eaf297 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Mon, 15 Feb 2021 20:16:05 +0100 Subject: [PATCH 10/13] Expose pprof on a debug port. --- sphynx/lynxkite-sphynx/main.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sphynx/lynxkite-sphynx/main.go b/sphynx/lynxkite-sphynx/main.go index 10bbffc865..507962c0f1 100644 --- a/sphynx/lynxkite-sphynx/main.go +++ b/sphynx/lynxkite-sphynx/main.go @@ -14,6 +14,8 @@ import ( "google.golang.org/grpc/credentials" "log" "net" + "net/http" + _ "net/http/pprof" "os" "strings" ) @@ -187,6 +189,12 @@ func main() { if port == "" { log.Fatalf("Please set SPHYNX_PORT.") } + debugPort := os.Getenv("SPHYNX_DEBUG_PORT") + if debugPort != "" { + go func() error { + return http.ListenAndServe(fmt.Sprintf(":%s", debugPort), nil) + }() + } keydir := flag.String( "keydir", "", "directory of cert.pem and private-key.pem files (for encryption)") flag.Parse() From 2b0aa50a64b373f006e51d6036141ac40c30fac9 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Tue, 2 Mar 2021 13:56:01 +0100 Subject: [PATCH 11/13] More flexible parallelism instead of fixed 4. --- sphynx/lynxkite-sphynx/entity_cache.go | 2 ++ sphynx/lynxkite-sphynx/unordered_disk_io.go | 10 ++++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sphynx/lynxkite-sphynx/entity_cache.go b/sphynx/lynxkite-sphynx/entity_cache.go index ce8b0ab596..885687a18b 100644 --- a/sphynx/lynxkite-sphynx/entity_cache.go +++ b/sphynx/lynxkite-sphynx/entity_cache.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "runtime" "sort" "strconv" "sync" @@ -34,6 +35,7 @@ type cacheEntry struct { } var cachedEntitiesMaxMem = getNumericEnv("SPHYNX_CACHED_ENTITIES_MAX_MEM_MB", 1*1024) * 1024 * 1024 +var sphynxThreads = getNumericEnv("SPHYNX_THREADS", runtime.NumCPU()) func (entityCache *EntityCache) Get(guid GUID) (Entity, bool) { ts := ourTimestamp() diff --git a/sphynx/lynxkite-sphynx/unordered_disk_io.go b/sphynx/lynxkite-sphynx/unordered_disk_io.go index b99440d89d..9bec014e76 100644 --- a/sphynx/lynxkite-sphynx/unordered_disk_io.go +++ b/sphynx/lynxkite-sphynx/unordered_disk_io.go @@ -70,7 +70,6 @@ func sortEdgeRows(rows []UnorderedEdgeRow, less func(i, j int) bool) { } func (s *Server) WriteToUnorderedDisk(ctx context.Context, in *pb.WriteToUnorderedDiskRequest) (*pb.WriteToUnorderedDiskReply, error) { - const numGoRoutines int64 = 4 guid := GUID(in.Guid) entity, exists := s.entityCache.Get(guid) if !exists { @@ -88,7 +87,7 @@ func (s *Server) WriteToUnorderedDisk(ctx context.Context, in *pb.WriteToUnorder } switch e := entity.(type) { case TabularEntity: - pw, err := writer.NewParquetWriter(fw, e.unorderedRow(), numGoRoutines) + pw, err := writer.NewParquetWriter(fw, e.unorderedRow(), int64(sphynxThreads)) if err != nil { return nil, fmt.Errorf("Failed to create parquet writer: %v", err) } @@ -133,7 +132,6 @@ func (s *Server) WriteToUnorderedDisk(ctx context.Context, in *pb.WriteToUnorder func (s *Server) ReadFromUnorderedDisk( ctx context.Context, in *pb.ReadFromUnorderedDiskRequest) (*pb.ReadFromUnorderedDiskReply, error) { - const numGoRoutines int64 = 4 dirName := fmt.Sprintf("%v/%v", s.unorderedDataDir, in.Guid) files, err := ioutil.ReadDir(dirName) if err != nil { @@ -175,7 +173,7 @@ func (s *Server) ReadFromUnorderedDisk( rows := make([]UnorderedVertexRow, 0) numRows := 0 for _, fr := range fileReaders { - pr, err := reader.NewParquetReader(fr, e.unorderedRow(), numGoRoutines) + pr, err := reader.NewParquetReader(fr, e.unorderedRow(), int64(sphynxThreads)) if err != nil { return nil, fmt.Errorf("Failed to create parquet reader: %v", err) } @@ -207,7 +205,7 @@ func (s *Server) ReadFromUnorderedDisk( } rows := make([]UnorderedEdgeRow, 0) for _, fr := range fileReaders { - pr, err := reader.NewParquetReader(fr, new(UnorderedEdgeRow), numGoRoutines) + pr, err := reader.NewParquetReader(fr, new(UnorderedEdgeRow), int64(sphynxThreads)) if err != nil { return nil, fmt.Errorf("Failed to create parquet reader: %v", err) } @@ -259,7 +257,7 @@ func (s *Server) ReadFromUnorderedDisk( rowsPointer := reflect.New(rowSliceType) rows := rowsPointer.Elem() for _, fr := range fileReaders { - pr, err := reader.NewParquetReader(fr, e.unorderedRow(), numGoRoutines) + pr, err := reader.NewParquetReader(fr, e.unorderedRow(), int64(sphynxThreads)) if err != nil { return nil, fmt.Errorf("Failed to create parquet reader: %v", err) } From 3861bb32f8eaf3f684f04fdc8f5373f7b018dede Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Tue, 2 Mar 2021 14:41:16 +0100 Subject: [PATCH 12/13] Use a fast concurrent sort library. (sorty) --- sphynx/go.mod | 1 + sphynx/go.sum | 2 + sphynx/lynxkite-sphynx/entity_io.go | 3 +- sphynx/lynxkite-sphynx/unordered_disk_io.go | 101 +++++++++++--------- 4 files changed, 60 insertions(+), 47 deletions(-) diff --git a/sphynx/go.mod b/sphynx/go.mod index 4c5e36ce85..55df476d39 100644 --- a/sphynx/go.mod +++ b/sphynx/go.mod @@ -5,6 +5,7 @@ go 1.14 require ( github.com/apache/arrow/go/arrow v0.0.0-20200701075601-f25a014ab157 github.com/golang/protobuf v1.4.2 + github.com/jfcg/sorty v1.0.12 github.com/juju/errors v0.0.0-20200330140219-3fe23663418f github.com/juju/testing v0.0.0-20200608005635-e4eedbc6f7aa // indirect github.com/xitongsys/parquet-go v1.5.2 diff --git a/sphynx/go.sum b/sphynx/go.sum index 3ba77f72e2..ee4c0d0db4 100644 --- a/sphynx/go.sum +++ b/sphynx/go.sum @@ -92,6 +92,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jfcg/sorty v1.0.12 h1:1Np/SBt2ODK981ZyadqOViwPrZ4ncRPp1y7C+JqWQms= +github.com/jfcg/sorty v1.0.12/go.mod h1:+v4Q9+K64VQk8A8FTAw6hHg1WAnHwm07TROXBVtsjWY= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= diff --git a/sphynx/lynxkite-sphynx/entity_io.go b/sphynx/lynxkite-sphynx/entity_io.go index 0dda2e5bc4..57eb0c8b30 100644 --- a/sphynx/lynxkite-sphynx/entity_io.go +++ b/sphynx/lynxkite-sphynx/entity_io.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" + "github.com/jfcg/sorty" "io/ioutil" "os" "reflect" @@ -71,7 +72,7 @@ func (v *VertexSet) readFromOrdered(rec array.Record) error { v.MappingToUnordered[i] = d } // For backward compatibility. - sortIds(v.MappingToUnordered) + sorty.SortI8(v.MappingToUnordered) return nil } diff --git a/sphynx/lynxkite-sphynx/unordered_disk_io.go b/sphynx/lynxkite-sphynx/unordered_disk_io.go index 9bec014e76..d5253b807f 100644 --- a/sphynx/lynxkite-sphynx/unordered_disk_io.go +++ b/sphynx/lynxkite-sphynx/unordered_disk_io.go @@ -5,6 +5,7 @@ package main import ( "context" "fmt" + "github.com/jfcg/sorty" pb "github.com/lynxkite/lynxkite/sphynx/proto" "github.com/xitongsys/parquet-go-source/local" "github.com/xitongsys/parquet-go/reader" @@ -14,7 +15,6 @@ import ( "log" "os" "reflect" - "sort" "strings" ) @@ -29,44 +29,27 @@ func toUnorderedRows(e TabularEntity, vs1 *VertexSet, vs2 *VertexSet) []interfac } } -func sortIds(ids []int64) { - sort.Sort(Int64Slice(ids)) -} func assertSorted(ids []int64) { - if !sort.IsSorted(Int64Slice(ids)) { + if sorty.IsSortedI8(ids) != 0 { // The previous loglines will point out which entity this is. panic("These IDs are not sorted.") } } -type Int64Slice []int64 - -func (a Int64Slice) Len() int { - return len(a) -} -func (a Int64Slice) Less(i, j int) bool { - return a[i] < a[j] -} -func (a Int64Slice) Swap(i, j int) { - a[i], a[j] = a[j], a[i] -} - -type unorderedEdgesSorter struct { - rows []UnorderedEdgeRow - less func(i, j int) bool -} - -func (self unorderedEdgesSorter) Len() int { - return len(self.rows) -} -func (self unorderedEdgesSorter) Swap(i, j int) { - self.rows[i], self.rows[j] = self.rows[j], self.rows[i] -} -func (self unorderedEdgesSorter) Less(i, j int) bool { - return self.less(i, j) -} -func sortEdgeRows(rows []UnorderedEdgeRow, less func(i, j int) bool) { - sort.Sort(unorderedEdgesSorter{rows, less}) +// Useful if you want to sort something by keys without messing with it. +func sortedPermutation(ids []int64) []int { + permutation := make([]int, len(ids), len(ids)) + for i := range permutation { + permutation[i] = i + } + sorty.Sort(len(ids), func(i, k, r, s int) bool { + if ids[permutation[i]] < ids[permutation[k]] { + permutation[r], permutation[s] = permutation[s], permutation[r] + return true + } + return false + }) + return permutation } func (s *Server) WriteToUnorderedDisk(ctx context.Context, in *pb.WriteToUnorderedDiskRequest) (*pb.WriteToUnorderedDiskReply, error) { @@ -132,6 +115,7 @@ func (s *Server) WriteToUnorderedDisk(ctx context.Context, in *pb.WriteToUnorder func (s *Server) ReadFromUnorderedDisk( ctx context.Context, in *pb.ReadFromUnorderedDiskRequest) (*pb.ReadFromUnorderedDiskReply, error) { + sorty.Mxg = uint32(sphynxThreads) dirName := fmt.Sprintf("%v/%v", s.unorderedDataDir, in.Guid) files, err := ioutil.ReadDir(dirName) if err != nil { @@ -190,7 +174,7 @@ func (s *Server) ReadFromUnorderedDisk( for i, v := range rows { mappingToUnordered[i] = v.Id } - sortIds(mappingToUnordered) + sorty.SortI8(mappingToUnordered) entity = &VertexSet{ MappingToUnordered: mappingToUnordered, } @@ -218,7 +202,15 @@ func (s *Server) ReadFromUnorderedDisk( rows = append(rows, partialRows...) } // Translate Src to ordered IDs. - sortEdgeRows(rows, func(i, j int) bool { return rows[i].Src < rows[j].Src }) + sorty.Sort(len(rows), func(i, k, r, s int) bool { + if rows[i].Src < rows[k].Src { + if r != s { + rows[r], rows[s] = rows[s], rows[r] + } + return true + } + return false + }) for i, j := 0, 0; i < len(vs1.MappingToUnordered) && j < len(rows); { if vs1.MappingToUnordered[i] == rows[j].Src { rows[j].Src = int64(i) @@ -227,8 +219,15 @@ func (s *Server) ReadFromUnorderedDisk( i++ } } - // Translate Dst to ordered IDs. - sortEdgeRows(rows, func(i, j int) bool { return rows[i].Dst < rows[j].Dst }) + sorty.Sort(len(rows), func(i, k, r, s int) bool { + if rows[i].Dst < rows[k].Dst { + if r != s { + rows[r], rows[s] = rows[s], rows[r] + } + return true + } + return false + }) for i, j := 0, 0; i < len(vs2.MappingToUnordered) && j < len(rows); { if vs2.MappingToUnordered[i] == rows[j].Dst { rows[j].Dst = int64(i) @@ -238,7 +237,15 @@ func (s *Server) ReadFromUnorderedDisk( } } // Store the results ordered by edge ID. - sortEdgeRows(rows, func(i, j int) bool { return rows[i].Id < rows[j].Id }) + sorty.Sort(len(rows), func(i, k, r, s int) bool { + if rows[i].Id < rows[k].Id { + if r != s { + rows[r], rows[s] = rows[s], rows[r] + } + return true + } + return false + }) es := NewEdgeBundle(len(rows), len(rows)) for i, row := range rows { es.EdgeMapping[i] = row.Id @@ -278,15 +285,17 @@ func (s *Server) ReadFromUnorderedDisk( defined := attr.Elem().FieldByName("Defined") idIndex := fieldIndex(rowType, "Id") valueIndex := fieldIndex(rowType, "Value") - true := reflect.ValueOf(true) - sort.Slice(rows.Interface(), func(i, j int) bool { - return rows.Index(i).Field(idIndex).Int() < rows.Index(j).Field(idIndex).Int() - }) - for i, j := 0, 0; i < len(vs.MappingToUnordered) && j < rows.Len(); i++ { - row := rows.Index(j) - if vs.MappingToUnordered[i] == row.Field(idIndex).Int() { + trueValue := reflect.ValueOf(true) + ids := make([]int64, rows.Len(), rows.Len()) + for i := range ids { + ids[i] = rows.Index(i).Field(idIndex).Int() + } + permutation := sortedPermutation(ids) + for i, j := 0, 0; i < len(vs.MappingToUnordered) && j < len(permutation); i++ { + row := rows.Index(permutation[j]) + if vs.MappingToUnordered[i] == ids[permutation[j]] { values.Index(i).Set(row.Field(valueIndex)) - defined.Index(i).Set(true) + defined.Index(i).Set(trueValue) j++ } } From 38018b378dd7c9890a8583167d7300fab6110ef7 Mon Sep 17 00:00:00 2001 From: Daniel Darabos Date: Tue, 2 Mar 2021 15:11:30 +0100 Subject: [PATCH 13/13] Fix error. --- sphynx/lynxkite-sphynx/entity_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sphynx/lynxkite-sphynx/entity_cache.go b/sphynx/lynxkite-sphynx/entity_cache.go index 885687a18b..c20820341d 100644 --- a/sphynx/lynxkite-sphynx/entity_cache.go +++ b/sphynx/lynxkite-sphynx/entity_cache.go @@ -84,7 +84,7 @@ func NotInCacheError(kind string, guid GUID) error { // If we drop something from the cache it will be reloaded before the next use. // The exception is when we drop it right after loading it. This generally means // the cache is too small. - return fmt.Errorf("Could not fit %v %v into memory. Increase SPHYNX_CACHED_ENTITIES_MAX_MEM_MB?") + return fmt.Errorf("Could not fit %v %v into memory. Increase SPHYNX_CACHED_ENTITIES_MAX_MEM_MB?", kind, guid) } type entityEvictionItem struct {