Skip to content

Commit

Permalink
Merge pull request #141 from lynxkite/darabos-no-ordered-mapping
Browse files Browse the repository at this point in the history
Ditch ordered mapping
  • Loading branch information
darabos committed Mar 24, 2021
2 parents 46ae521 + 38018b3 commit 6083147
Show file tree
Hide file tree
Showing 16 changed files with 172 additions and 137 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Expand Up @@ -94,8 +94,8 @@ libraryDependencies ++= Seq(
// Used for working with AVRO files.
"org.apache.spark" %% "spark-avro" % sparkVersion.value,
// For Neo4j tests.
"org.testcontainers" % "testcontainers" % "1.14.3" % Test,
"org.testcontainers" % "neo4j" % "1.14.3" % Test,
"org.testcontainers" % "testcontainers" % "1.15.2" % Test,
"org.testcontainers" % "neo4j" % "1.15.2" % Test,
// Used for working with Delta tables.
"io.delta" %% "delta-core" % "0.6.1"
)
Expand Down
14 changes: 6 additions & 8 deletions dependency-licenses/scala.md
Expand Up @@ -41,7 +41,6 @@ Apache | [Apache License](LICENSE.txt) | org.apache.httpcomponents # httpmime #
Apache | [Apache License 2.0](http://www.apache.org/licenses/LICENSE-2.0.html) | com.ning # async-http-client # 1.8.14 | <notextile></notextile>
Apache | [Apache License Version 2.0](LICENSE.txt) | org.yaml # snakeyaml # 1.15 | <notextile></notextile>
Apache | [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | net.java.dev.jna # jna # 5.5.0 | <notextile></notextile>
Apache | [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | net.java.dev.jna # jna-platform # 5.5.0 | <notextile></notextile>
Apache | [Apache License, Version 2.0](https://aws.amazon.com/apache2.0) | com.amazonaws # aws-java-sdk # 1.7.4 | <notextile></notextile>
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.clearspring.analytics # stream # 2.7.0 | <notextile></notextile>
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.github.javaparser # javaparser-core # 3.2.5 | <notextile></notextile>
Expand All @@ -50,8 +49,6 @@ Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.cloud.bigdataoss # util # 1.6.1 | <notextile></notextile>
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.cloud.bigdataoss # util-hadoop # 1.6.1-hadoop2 | <notextile></notextile>
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) | com.jamesmurty.utils # java-xmlbuilder # 1.1 | <notextile></notextile>
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.kohlschutter.junixsocket # junixsocket-common # 2.0.4 | <notextile></notextile>
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.kohlschutter.junixsocket # junixsocket-native-common # 2.0.4 | <notextile></notextile>
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) | com.typesafe # config # 1.2.1 | <notextile></notextile>
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) | com.typesafe.akka # akka-actor_2.11 # 2.3.4 | <notextile></notextile>
Apache | [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0) | com.typesafe.akka # akka-slf4j_2.11 # 2.3.4 | <notextile></notextile>
Expand Down Expand Up @@ -103,10 +100,14 @@ Apache | [Similar to Apache License but with the acknowledgment clause removed](
Apache | [The Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | io.opencensus # opencensus-api # 0.21.0 | <notextile></notextile>
Apache | [The Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | io.opencensus # opencensus-contrib-grpc-metrics # 0.21.0 | <notextile></notextile>
Apache | [The Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.spark-project.spark # unused # 1.0.0 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.core # jackson-annotations # 2.10.3 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.core # jackson-annotations # 2.6.5 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.core # jackson-core # 2.6.5 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.core # jackson-databind # 2.6.5 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.fasterxml.jackson.dataformat # jackson-dataformat-yaml # 2.6.5 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.github.docker-java # docker-java-api # 3.2.7 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.github.docker-java # docker-java-transport # 3.2.7 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.github.docker-java # docker-java-transport-zerodep # 3.2.7 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.api-client # google-api-client # 1.20.0 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.api-client # google-api-client-jackson2 # 1.20.0 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | com.google.api-client # google-api-client-java6 # 1.20.0 | <notextile></notextile>
Expand All @@ -132,7 +133,6 @@ Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licens
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.codehaus.jackson # jackson-mapper-asl # 1.9.13 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.ejml # ejml-core # 0.34 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.ejml # ejml-ddense # 0.34 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](https://www.apache.org/licenses/LICENSE-2.0.txt) | org.jetbrains # annotations # 19.0.0 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.seleniumhq.selenium # selenium-android-driver # 2.39.0 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.seleniumhq.selenium # selenium-api # 2.39.0 | <notextile></notextile>
Apache | [The Apache Software License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.txt) | org.seleniumhq.selenium # selenium-chrome-driver # 2.39.0 | <notextile></notextile>
Expand Down Expand Up @@ -204,7 +204,6 @@ BSD | [The BSD License](http://www.antlr.org/license.html) | org.antlr # antlr4-
BSD | [Three-clause BSD-style](https://github.com/mpilquist/simulacrum/blob/master/LICENSE) | com.github.mpilquist # simulacrum_2.11 # 0.10.0 | <notextile></notextile>
BSD | [Three-clause BSD-style](https://github.com/scodec/scodec-bits/blob/master/LICENSE) | org.scodec # scodec-bits_2.11 # 1.0.9 | <notextile></notextile>
BSD | [Two-clause BSD-style license](http://github.com/sbt/junit-interface/blob/master/LICENSE.txt) | com.novocode # junit-interface # 0.11-RC1 | <notextile></notextile>
CC0 | [CC0 1.0 Universal License](http://creativecommons.org/publicdomain/zero/1.0/) | org.scijava # native-lib-loader # 2.0.2 | <notextile></notextile>
CDDL | [COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0](https://glassfish.dev.java.net/public/CDDLv1.0.html) | javax.activation # activation # 1.1.1 | <notextile></notextile>
GPL | [ Dual license consisting of the CDDL v1.1 and GPL v2 ](https://glassfish.java.net/public/CDDL+GPL_1_1.html) | org.glassfish # javax.json # 1.0.4 | <notextile></notextile>
GPL | [GPL2 w/ CPE](https://www.gnu.org/software/classpath/license.html) | jakarta.ws.rs # jakarta.ws.rs-api # 2.1.6 | <notextile></notextile>
Expand All @@ -226,16 +225,15 @@ MIT | [MIT](http://opensource.org/licenses/MIT) | com.github.julien-truffaut # m
MIT | [MIT](http://www.opensource.org/licenses/mit-license.html) | com.lihaoyi # fansi_2.11 # 0.2.4 | <notextile></notextile>
MIT | [MIT](http://www.opensource.org/licenses/mit-license.html) | com.lihaoyi # pprint_2.11 # 0.5.2 | <notextile></notextile>
MIT | [MIT](http://www.opensource.org/licenses/mit-license.html) | com.lihaoyi # sourcecode_2.11 # 0.1.4 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.rnorth # tcp-unix-socket-proxy # 1.0.2 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.rnorth.duct-tape # duct-tape # 1.0.8 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.rnorth.visible-assertions # visible-assertions # 2.1.2 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.scalaz.stream # scalaz-stream_2.11 # 0.8 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.spire-math # jawn-parser_2.11 # 0.10.4 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.spire-math # kind-projector_2.11 # 0.7.1 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.spire-math # spire-macros_2.11 # 0.13.0 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.spire-math # spire_2.11 # 0.13.0 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.testcontainers # neo4j # 1.14.3 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.testcontainers # testcontainers # 1.14.3 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.testcontainers # neo4j # 1.15.2 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.testcontainers # testcontainers # 1.15.2 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.typelevel # cats-core_2.11 # 0.9.0 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.typelevel # cats-kernel_2.11 # 0.9.0 | <notextile></notextile>
MIT | [MIT](http://opensource.org/licenses/MIT) | org.typelevel # cats-macros_2.11 # 0.9.0 | <notextile></notextile>
Expand Down
1 change: 1 addition & 0 deletions sphynx/go.mod
Expand Up @@ -5,6 +5,7 @@ go 1.14
require (
github.com/apache/arrow/go/arrow v0.0.0-20200701075601-f25a014ab157
github.com/golang/protobuf v1.4.2
github.com/jfcg/sorty v1.0.12
github.com/juju/errors v0.0.0-20200330140219-3fe23663418f
github.com/juju/testing v0.0.0-20200608005635-e4eedbc6f7aa // indirect
github.com/xitongsys/parquet-go v1.5.2
Expand Down
2 changes: 2 additions & 0 deletions sphynx/go.sum
Expand Up @@ -92,6 +92,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jfcg/sorty v1.0.12 h1:1Np/SBt2ODK981ZyadqOViwPrZ4ncRPp1y7C+JqWQms=
github.com/jfcg/sorty v1.0.12/go.mod h1:+v4Q9+K64VQk8A8FTAw6hHg1WAnHwm07TROXBVtsjWY=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
Expand Down
14 changes: 12 additions & 2 deletions sphynx/lynxkite-sphynx/entity_cache.go
Expand Up @@ -4,7 +4,9 @@ package main

import (
"fmt"
"log"
"os"
"runtime"
"sort"
"strconv"
"sync"
Expand Down Expand Up @@ -33,6 +35,7 @@ type cacheEntry struct {
}

var cachedEntitiesMaxMem = getNumericEnv("SPHYNX_CACHED_ENTITIES_MAX_MEM_MB", 1*1024) * 1024 * 1024
var sphynxThreads = getNumericEnv("SPHYNX_THREADS", runtime.NumCPU())

func (entityCache *EntityCache) Get(guid GUID) (Entity, bool) {
ts := ourTimestamp()
Expand Down Expand Up @@ -77,6 +80,13 @@ func (entityCache *EntityCache) Set(guid GUID, entity Entity) {
// But we do not want to update the timestamp for those.
}

func NotInCacheError(kind string, guid GUID) error {
// If we drop something from the cache it will be reloaded before the next use.
// The exception is when we drop it right after loading it. This generally means
// the cache is too small.
return fmt.Errorf("Could not fit %v %v into memory. Increase SPHYNX_CACHED_ENTITIES_MAX_MEM_MB?", kind, guid)
}

type entityEvictionItem struct {
guid GUID
timestamp int64
Expand Down Expand Up @@ -108,12 +118,12 @@ func (entityCache *EntityCache) maybeGarbageCollect() {

for i := 0; i < len(evictionCandidates) && memEvicted < howMuchMemoryToRecycle; i++ {
guid := evictionCandidates[i].guid
fmt.Printf("Evicting: %v\n", evictionCandidates[i])
log.Printf("Evicting: %v\n", evictionCandidates[i])
delete(entityCache.cache, guid)
memEvicted += evictionCandidates[i].memUsage
itemsEvicted++
}
fmt.Printf("Evicted %d entities (out of %d), estimated size: %d time: %d\n",
log.Printf("Evicted %d entities (out of %d), estimated size: %d time: %d\n",
itemsEvicted, len(evictionCandidates), memEvicted, (ourTimestamp()-start)/1000000)
entityCache.totalMemUsage -= memEvicted
}
Expand Down
4 changes: 4 additions & 0 deletions sphynx/lynxkite-sphynx/entity_io.go
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/jfcg/sorty"
"io/ioutil"
"os"
"reflect"
Expand Down Expand Up @@ -55,6 +56,7 @@ var vertexSetSchema = arrow.NewSchema(
}, nil)

func (v *VertexSet) toOrderedRows() array.Record {
assertSorted(v.MappingToUnordered)
b := array.NewInt64Builder(arrowAllocator)
defer b.Release()
b.AppendValues(v.MappingToUnordered, nil)
Expand All @@ -69,6 +71,8 @@ func (v *VertexSet) readFromOrdered(rec array.Record) error {
for i, d := range data {
v.MappingToUnordered[i] = d
}
// For backward compatibility.
sorty.SortI8(v.MappingToUnordered)
return nil
}

Expand Down
13 changes: 11 additions & 2 deletions sphynx/lynxkite-sphynx/main.go
Expand Up @@ -14,6 +14,8 @@ import (
"google.golang.org/grpc/credentials"
"log"
"net"
"net/http"
_ "net/http/pprof"
"os"
"strings"
)
Expand Down Expand Up @@ -62,6 +64,7 @@ func (s *Server) CanCompute(ctx context.Context, in *pb.CanComputeRequest) (*pb.

func (s *Server) Compute(ctx context.Context, in *pb.ComputeRequest) (*pb.ComputeReply, error) {
opInst := OperationInstanceFromJSON(in.Operation)
log.Printf("Computing %v.", shortOpName(opInst))
switch in.Domain {
case "SphynxMemory":
op, exists := operationRepository[shortOpName(opInst)]
Expand Down Expand Up @@ -117,7 +120,7 @@ func (s *Server) GetScalar(ctx context.Context, in *pb.GetScalarRequest) (*pb.Ge
log.Printf("Received GetScalar request with GUID %v.", guid)
entity, exists := s.entityCache.Get(guid)
if !exists {
return nil, fmt.Errorf("Guid %v is missing", guid)
return nil, NotInCacheError("scalar", guid)
}

switch scalar := entity.(type) {
Expand All @@ -138,7 +141,7 @@ func (s *Server) HasInSphynxMemory(ctx context.Context, in *pb.HasInSphynxMemory
func (s *Server) getVertexSet(guid GUID) (*VertexSet, error) {
entity, exists := s.entityCache.Get(guid)
if !exists {
return nil, fmt.Errorf("Guid %v is missing", guid)
return nil, NotInCacheError("vertex set", guid)
}
switch vs := entity.(type) {
case *VertexSet:
Expand Down Expand Up @@ -186,6 +189,12 @@ func main() {
if port == "" {
log.Fatalf("Please set SPHYNX_PORT.")
}
debugPort := os.Getenv("SPHYNX_DEBUG_PORT")
if debugPort != "" {
go func() error {
return http.ListenAndServe(fmt.Sprintf(":%s", debugPort), nil)
}()
}
keydir := flag.String(
"keydir", "", "directory of cert.pem and private-key.pem files (for encryption)")
flag.Parse()
Expand Down
6 changes: 3 additions & 3 deletions sphynx/lynxkite-sphynx/networkit_community_detection.go
Expand Up @@ -38,12 +38,12 @@ func init() {
defer networkit.DeletePartition(p)
vs := &VertexSet{}
vs.MappingToUnordered = make([]int64, p.NumberOfSubsets())
vs.MappingToOrdered = make(map[int64]SphynxId)
mappingToOrdered := make(map[int64]SphynxId)
ss := p.GetSubsetIdsVector()
defer networkit.DeleteUint64Vector(ss)
for i := range vs.MappingToUnordered {
vs.MappingToUnordered[i] = int64(ss.Get(i))
vs.MappingToOrdered[int64(ss.Get(i))] = SphynxId(i)
mappingToOrdered[int64(ss.Get(i))] = SphynxId(i)
}
es := &EdgeBundle{}
es.EdgeMapping = make([]int64, p.NumberOfElements())
Expand All @@ -54,7 +54,7 @@ func init() {
for i := range es.EdgeMapping {
es.EdgeMapping[i] = int64(i)
es.Src[i] = SphynxId(i)
es.Dst[i] = SphynxId(vs.MappingToOrdered[int64(v.Get(i))])
es.Dst[i] = SphynxId(mappingToOrdered[int64(v.Get(i))])
}
ea.output("partitions", vs)
ea.output("belongsTo", es)
Expand Down
26 changes: 1 addition & 25 deletions sphynx/lynxkite-sphynx/networkit_test.go
Expand Up @@ -56,30 +56,6 @@ func TestNewVertexAttribute(t *testing.T) {
}
}

func TestGraphToSphynx(t *testing.T) {
c := networkit.NewBarabasiAlbertGenerator(uint64(2), uint64(5))
defer networkit.DeleteBarabasiAlbertGenerator(c)
g := c.Generate()
defer networkit.DeleteGraph(g)
vs, es := ToSphynx(g)
if len(vs.MappingToUnordered) != 5 {
t.Errorf("Vertex set is %v, expected 5.", vs.MappingToUnordered)
}
expectedSrc := []SphynxId{1, 1, 2, 2, 3, 3, 4, 4}
if len(es.Src) != len(expectedSrc) {
t.Errorf("Source list is %v, expected %v.", es.Src, expectedSrc)
}
if len(es.Dst) != len(expectedSrc) {
t.Errorf("Destination list is %v, expected length %v.", es.Dst, len(expectedSrc))
}
for i := range es.Src {
if es.Src[i] != expectedSrc[i] {
t.Errorf("Source list is %v, expected %v.", es.Src, expectedSrc)
break
}
}
}

func TestVectorVector(t *testing.T) {
c := networkit.NewBarabasiAlbertGenerator(uint64(3), uint64(10))
defer networkit.DeleteBarabasiAlbertGenerator(c)
Expand All @@ -93,7 +69,7 @@ func TestVectorVector(t *testing.T) {
for i := 0; i < int(points.Size()); i += 1 {
x := points.Get(i).At(0)
y := points.Get(i).At(1)
if x < -2 || x > 2 || x == 0 || y < -2 || y > 2 || y == 0 {
if x < -3 || x > 3 || x == 0 || y < -3 || y > 3 || y == 0 {
t.Errorf("Unexpected coordinates: %v %v", x, y)
}
}
Expand Down
2 changes: 1 addition & 1 deletion sphynx/lynxkite-sphynx/operations.go
Expand Up @@ -19,7 +19,7 @@ func collectInputs(server *Server, opInst *OperationInstance) (map[string]Entity
for name, guid := range opInst.Inputs {
entity, exists := server.entityCache.Get(guid)
if !exists {
return nil, fmt.Errorf("Guid %v is missing", guid)
return nil, NotInCacheError("input", guid)
}
inputs[name] = entity
}
Expand Down
6 changes: 3 additions & 3 deletions sphynx/lynxkite-sphynx/sphynxdisk.go
Expand Up @@ -38,8 +38,8 @@ func createEntity(typeName string) (Entity, error) {
}

func saveToOrderedDisk(e Entity, dataDir string, guid GUID) error {
log.Printf("saveToOrderedDisk guid %v", guid)
typeName := e.typeName()
log.Printf("Writing %v %v to ordered disk.", typeName, guid)
dirName := fmt.Sprintf("%v/%v", dataDir, guid)
_ = os.Mkdir(dirName, 0775)
typeFName := fmt.Sprintf("%v/type_name", dirName)
Expand Down Expand Up @@ -96,14 +96,14 @@ func saveToOrderedDisk(e Entity, dataDir string, guid GUID) error {
}

func loadFromOrderedDisk(dataDir string, guid GUID) (Entity, error) {
log.Printf("loadFromOrderedDisk: %v", guid)
dirName := fmt.Sprintf("%v/%v", dataDir, guid)
typeFName := fmt.Sprintf("%v/type_name", dirName)
typeData, err := ioutil.ReadFile(typeFName)
if err != nil {
return nil, fmt.Errorf("Failed to read type of %v: %v", dirName, err)
}
typeName := string(typeData)
log.Printf("Reading %v %v from ordered disk.", typeName, guid)
e, err := createEntity(typeName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -160,7 +160,7 @@ func (s *Server) WriteToOrderedDisk(

e, exists := s.entityCache.Get(guid)
if !exists {
return nil, fmt.Errorf("Guid %v is missing", guid)
return nil, NotInCacheError("entity", guid)
}

if err := saveToOrderedDisk(e, s.dataDir, guid); err != nil {
Expand Down
Expand Up @@ -25,6 +25,7 @@ func doStripDuplicateEdgesFromBundle(es *EdgeBundle) *EdgeBundle {
uniqueBundle.EdgeMapping[i] = id
i++
}
uniqueBundle.Sort()
return uniqueBundle
}

Expand Down

0 comments on commit 6083147

Please sign in to comment.