Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Futures timed out after [5 seconds] #84

Closed
java8964 opened this issue Mar 28, 2016 · 28 comments
Closed

Futures timed out after [5 seconds] #84

java8964 opened this issue Mar 28, 2016 · 28 comments

Comments

@java8964
Copy link

I am trying to test FiloDB 0.2 with a 6 nodes C* (DSE 4.8.5) cluster, running Spark 1.5.2.

The 100k sample data coming with FiloDB works fine, but when I tried to load 50M data of our use case, with a dataset I come out of POC, I got the following error message when trying to do this step:

newDF.write.format("filodb.spark").option("dataset", "poc1").option("row_keys", "Email").option("segment_key","Domain").option("partition_keys","Partition").mode(SaveMode.Overwrite).save()

java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at filodb.spark.FiloRelation$.parse(FiloRelation.scala:36)
at filodb.spark.FiloRelation$.actorAsk(FiloRelation.scala:42)
at filodb.spark.package$FiloContext.createNewDataset(package.scala:165)
at filodb.spark.package$FiloContext.createOrUpdateDataset(package.scala:216)
at filodb.spark.package$FiloContext.saveAsFilo(package.scala:268)
at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:53)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:25)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:30)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)
at $iwC$$iwC$$iwC$$iwC$$iwC.(:34)
at $iwC$$iwC$$iwC$$iwC.(:36)
at $iwC$$iwC$$iwC.(:38)
at $iwC$$iwC.(:40)
at $iwC.(:42)
at (:44)
at .(:48)
at .()
at .(:7)
at .()

I am not sure this related to our C* settings, but it looks like a timeout from the FiloDB, but I don't see any configuration in the application.conf can adjust it.

What setting I should change for this?

Thanks

@velvia
Copy link
Member

velvia commented Mar 28, 2016

@java8964 huh. ok I'll add a change so that timeout can be adjusted, and maybe make the default higher. That's weird, that's when just adjusting metadata and not writing the actual data, normally very quick.

@java8964
Copy link
Author

Is it hardcoded in the code? Is there any runtime configuration I can change? Or can you point to the source code where I can modify the default value?

I want to test the read/query cases, so ingestion is not the real concern for us as now.

Thanks

@java8964
Copy link
Author

I found out the source code of "5 seconds" and change it to "60 seconds", but now I got a new error which I don't understand, still in the same step as above.

Here is the way I started my spark-shell:

~/spark/bin/spark-shell --jars /home/yzhang/FiloDB-0.2/spark/target/scala-2.10/filodb-spark-assembly-0.2.jar,/home/yzhang/spark-csv_2.10-1.4.0.jar,/home/yzhang/commons-csv-1.2.jar --driver-memory 3G --executor-memory 8g --total-executor-cores 18

But in "newDF.write.format("filodb.spark").option("dataset", "poc1").option("row_keys", "Email").option("segment_key","Domain").option("partition_keys","Partition").mode(SaveMode.Overwrite).save()" step, I got the follow errors on the console:

16/03/28 13:41:20 WARN ControlConnection: No rpc_address found for host /10.20.95.209 in 10.20.95.201:9042's peers system table. /10.20.95.209 will be ignored.
[Stage 2:> (0 + 18) / 78]16/03/28 13:41:55 WARN ThrowableSerializationWrapper: Task exception could not be deserialized
java.lang.ClassNotFoundException: filodb.coordinator.RowSource$IngestionErr
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

What I don't understand is that the Class is in the jar file:

[yzhang@p2-s1cassana201 ~]$ jar tvf /home/yzhang/FiloDB-0.2/spark/target/scala-2.10/filodb-spark-assembly-0.2.jar | grep "RowSource$IngestionErr"
1774 Mon Mar 28 13:38:18 EDT 2016 filodb/coordinator/RowSource$IngestionErr$.class
2446 Mon Mar 28 13:38:18 EDT 2016 filodb/coordinator/RowSource$IngestionErr.class

@java8964
Copy link
Author

Here is the log I can find in the spark executor:

16/03/28 14:01:01 ERROR DatasetCoordinatorActor: Error in reprojection task (poc1/0)
java.lang.StringIndexOutOfBoundsException: String index out of range: 1836016516
at java.lang.String.checkBounds(String.java:385)
at java.lang.String.(String.java:462)
at com.google.flatbuffers.Table.__string(Table.java:50)
at org.velvia.filo.vector.SimpleStringVector.data(SimpleStringVector.java:18)
at org.velvia.filo.codecs.SimpleStringWrapper.apply(SimpleWrappers.scala:46)
at org.velvia.filo.codecs.SimpleStringWrapper.apply(SimpleWrappers.scala:40)
at org.velvia.filo.FastFiloRowReader.getString(RowReader.scala:162)
at org.velvia.filo.RowReader$StringFieldExtractor$.getField(RowReader.scala:118)
at org.velvia.filo.RowReader$StringFieldExtractor$.getField(RowReader.scala:117)
at filodb.core.SingleKeyTypeBase$$anonfun$getKeyFunc$1.apply(KeyType.scala:60)
at filodb.core.SingleKeyTypeBase$$anonfun$getKeyFunc$1.apply(KeyType.scala:58)
at filodb.core.store.AppendingChunkMergingStrategy$$anonfun$1.apply(ChunkMergingStrategy.scala:109)
at filodb.core.store.AppendingChunkMergingStrategy$$anonfun$1.apply(ChunkMergingStrategy.scala:108)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.toStream(Iterator.scala:1143)
at scala.collection.AbstractIterator.toStream(Iterator.scala:1157)
at scala.collection.Iterator$$anonfun$toStream$1.apply(Iterator.scala:1143)
at scala.collection.Iterator$$anonfun$toStream$1.apply(Iterator.scala:1143)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
at scala.collection.immutable.Stream.foldLeft(Stream.scala:563)
at scala.collection.TraversableOnce$class.$div$colon(TraversableOnce.scala:138)
at scala.collection.AbstractTraversable.$div$colon(Traversable.scala:105)
at scala.collection.immutable.TreeMap.$plus$plus(TreeMap.scala:162)
at filodb.core.store.UpdatableChunkRowMap.$plus$plus(ChunkRowMap.scala:56)
at filodb.core.store.UpdatableChunkRowMap$.apply(ChunkRowMap.scala:82)
at filodb.core.store.AppendingChunkMergingStrategy.mergeSegments(ChunkMergingStrategy.scala:111)
at filodb.core.store.CachedMergingColumnStore$$anonfun$appendSegment$1.apply(ColumnStore.scala:203)
at filodb.core.store.CachedMergingColumnStore$$anonfun$appendSegment$1.apply(ColumnStore.scala:202)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:01 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-poc1-0#1076727534] has terminated! Ingestion for (poc1,0) will stop.
16/03/28 14:01:02 ERROR Executor: Exception in task 10.0 in stage 2.0 (TID 89)
scala.MatchError: IngestionErr(Ingestion actors shut down, check error logs) (of class filodb.coordinator.RowSource$IngestionErr)
at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:248)
at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:246)
at filodb.spark.package$$anonfun$filodb$spark$package$$ingestRddRows$1.applyOrElse(package.scala:55)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at filodb.spark.FiloRelation$.parse(FiloRelation.scala:36)
at filodb.spark.FiloRelation$.actorAsk(FiloRelation.scala:42)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:55)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:02 ERROR Executor: Exception in task 5.0 in stage 2.0 (TID 84)
scala.MatchError: IngestionErr(Ingestion actors shut down, check error logs) (of class filodb.coordinator.RowSource$IngestionErr)
at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:248)
at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:246)
at filodb.spark.package$$anonfun$filodb$spark$package$$ingestRddRows$1.applyOrElse(package.scala:55)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at filodb.spark.FiloRelation$.parse(FiloRelation.scala:36)
at filodb.spark.FiloRelation$.actorAsk(FiloRelation.scala:42)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:55)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:02 ERROR Executor: Exception in task 15.0 in stage 2.0 (TID 94)
scala.MatchError: IngestionErr(Ingestion actors shut down, check error logs) (of class filodb.coordinator.RowSource$IngestionErr)
at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:248)
at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:246)
at filodb.spark.package$$anonfun$filodb$spark$package$$ingestRddRows$1.applyOrElse(package.scala:55)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at filodb.spark.FiloRelation$.parse(FiloRelation.scala:36)
at filodb.spark.FiloRelation$.actorAsk(FiloRelation.scala:42)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:55)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:02 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 79)
scala.MatchError: IngestionErr(Ingestion actors shut down, check error logs) (of class filodb.coordinator.RowSource$IngestionErr)
at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:248)
at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:246)
at filodb.spark.package$$anonfun$filodb$spark$package$$ingestRddRows$1.applyOrElse(package.scala:55)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at filodb.spark.FiloRelation$.parse(FiloRelation.scala:36)
at filodb.spark.FiloRelation$.actorAsk(FiloRelation.scala:42)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:55)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:02 ERROR Executor: Exception in task 5.1 in stage 2.0 (TID 107)
akka.actor.InvalidActorNameException: actor name [poc1_0_5] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:201)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:41)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:54)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:02 ERROR Executor: Exception in task 0.1 in stage 2.0 (TID 108)
akka.actor.InvalidActorNameException: actor name [poc1_0_0] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:201)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:41)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:54)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:02 ERROR Executor: Exception in task 5.2 in stage 2.0 (TID 109)
akka.actor.InvalidActorNameException: actor name [poc1_0_5] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:201)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:41)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:54)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:02 ERROR Executor: Exception in task 0.2 in stage 2.0 (TID 110)
akka.actor.InvalidActorNameException: actor name [poc1_0_0] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:201)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:41)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:54)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:02 ERROR Executor: Exception in task 5.3 in stage 2.0 (TID 111)
akka.actor.InvalidActorNameException: actor name [poc1_0_5] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:201)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:41)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:54)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/03/28 14:01:02 ERROR Executor: Exception in task 0.3 in stage 2.0 (TID 112)
akka.actor.InvalidActorNameException: actor name [poc1_0_0] is not unique!
at akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:76)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:369)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:201)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:41)
at akka.actor.ActorCell.attachChild(ActorCell.scala:369)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:553)
at filodb.spark.package$.filodb$spark$package$$ingestRddRows(package.scala:54)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:301)
at filodb.spark.package$FiloContext$$anonfun$insertIntoFilo$1.apply(package.scala:297)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$18.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Let me explain that what kind of data I generated to test filoDB:

  1. I want to test if filoDB is a good candidate for Billion level big dim slow change dataset.
  2. The test data is 50M rows, but first 4 columns are HashID, Email, Domain and Partitions.
  3. Email is unique as "1@gmail.com, 2@yahoo.com, 3@hotmail.com, 4@other1.com ..... 50,000,000@other4.com", used as row key
  4. Domain is a 7 unique domains "gmail.com, yahoo.com, hotmail.com, other1.com, other2.com, other3.com, other4.com", so the data is even distribute between 7 domains of 50M rows, used as segment key
  5. HashID is the Math.abs(email.hashCode())
  6. Partition key is "HashID % 1024"
  7. So above all, email is the row key, segment by the domain, and partition key is "HashID % 1024"
  8. there are additional fields of the emails, which stored in individual columns of row.

Any hint about why I got the above errors?

Thanks

@velvia
Copy link
Member

velvia commented Mar 28, 2016

Yeah, it's hardcoded in FiloRelation.scala, lines 35 and 40, see the "5
seconds".

On Mon, Mar 28, 2016 at 10:24 AM, Yong Zhang notifications@github.com
wrote:

Is it hardcoded in the code? Is there any runtime configuration I can
change? Or can you point to the source code where I can modify the default
value?

I want to test the read/query cases, so ingestion is not the real concern
for us as now.

Thanks


You are receiving this because you commented.
Reply to this email directly or view it on GitHub
#84 (comment)

@velvia
Copy link
Member

velvia commented Mar 28, 2016

@java8964 many thanks for detailed errors. Hmmm. I'd like to try to get the binary data from your Emails column. Basically the output of the Cassandra CQL query:

SELECT partition, segmentid, chunkid, data from poc1_chunks WHERE columnname = 'Email';

as well as some of the emails that you are trying to ingest. This would help me debug. Thanks!

I would also try deleting the keyspace and restarting just to be sure.

Also: just a question. Is your Spark data already partitioned by node? What I mean is, currently you need to ensure that data for the same partition key is only on one node; if it is not you need to do a sort() by partition key in Spark before ingestion.

thanks,
Evan

@java8964
Copy link
Author

Hi, Evan:

Thanks for replying.

It looks like the query you want me to run will get lots of rows, so I have to add "allow filtering", and only show the first 3 rows below: (not sure if you need more)

cqlsh:filodb> SELECT partition, segmentid, chunkid, data from poc1_chunks WHERE columnname = 'Email' allow filtering;

partition | segmentid | chunkid | data

0x00000130 | 0x676d61696c2e636f6d | 0 | 0x020100000c00000008000c0004000800080000002c010000040000000a000000280000003c0000005000000064000000780000008c000000a0000000b4000000c8000000dc000000110000003133363135313840676d61696c2e636f6d000000110000003133373038313340676d61696c2e636f6d000000110000003830353134373640676d61696c2e636f6d000000110000003830353235363540676d61696c2e636f6d000000110000003830353430333940676d61696c2e636f6d000000110000003830353435303140676d61696c2e636f6d000000110000003830353436323240676d61696c2e636f6d000000110000003830353531323840676d61696c2e636f6d000000110000003830363436363540676d61696c2e636f6d000000110000003830363738313140676d61696c2e636f6d00000008000800070000000800000000000000
0x00000130 | 0x676d61696c2e636f6d | 1 | 0x020100000c00000008000c0004000800080000008400000004000000040000001000000024000000380000004c000000110000003533373738343940676d61696c2e636f6d000000110000003533393337333340676d61696c2e636f6d000000110000003837323138303540676d61696c2e636f6d000000110000003837343134303740676d61696c2e636f6d00000008000800070000000800000000000000
0x00000130 | 0x676d61696c2e636f6d | 2 | 0x020100000c00000008000c0004000800080000000c0200000400000012000000480000005c000000700000008400000098000000ac000000c0000000d4000000e8000000fc0000001001000024010000380100004c0100006001000074010000880100009c010000110000003430343433303840676d61696c2e636f6d000000110000003430353634303840676d61696c2e636f6d0000001000000036383530303240676d61696c2e636f6d000000001000000036383736363440676d61696c2e636f6d000000001000000036383936303040676d61696c2e636f6d000000001000000036393131353140676d61696c2e636f6d000000001000000036393233363140676d61696c2e636f6d000000001000000037303133303440676d61696c2e636f6d000000001000000037303336303340676d61696c2e636f6d00000000110000003733383537353640676d61696c2e636f6d000000110000003733383839303240676d61696c2e636f6d000000110000003733393338343140676d61696c2e636f6d000000110000003733393433343740676d61696c2e636f6d000000110000003733393434363840676d61696c2e636f6d000000110000003733393533313540676d61696c2e636f6d000000110000003733393535353740676d61696c2e636f6d000000110000003733393537393940676d61696c2e636f6d000000110000003733393736313440676d61696c2e636f6d00000008000800070000000800000000000000
0x00000130 | 0x676d61696c2e636f6d | 3 |

I think I maybe missed the "sort" part. I remember you mentioned somewhere in the document, but can you help me understand why I have to put the same partition key data into one SPARK node? Won't all the data eventually stored in C*? This sounds like an expensive requirement for ingesting data. In your 100k test data, the partition key is round globaleventid of 10,000. I didn't recall there is any sort to make each partition on one spark node? Is that right?

I will drop and reset the keyspace, and I will start with small size of data. I guess I am too excited to try 50M after I found out the 100k sample data work great in our environment :-)

Before I restart everything, I like to discuss the data modeling I tried to do for this example data, maybe you can tell me if I totally misunderstand how FiloDB works.

I could have these billions of emails, each with the some attributes, you can think them like some rates, scores, count etc. Periodically (maybe daily for now), some part of row (maybe small, like 10%) will be updated. That is one reason I don't want to use HDFS. More important, for the read path, I have to support:

  1. concurrent single retrieved by email address fast
  2. concurrent retrieved by list of email address(totally random, size can be from hundreds to hundreds of thousands) with low latency
  3. Maybe not concurrently, but batch retrieved all the data, joining with other data in Spark, for data ming or data scientist research.
  4. There could be retrieving the data with filtered by some attributes without email address

Here is the top 10 rows I generated for testing FiloDB:
head -10 data/emails.dat
HashID,Email,Domain,GlobalStop,RestrictedAddress,RestrictedDomain,RoleAddress,Active
1139904900,1@yahoo.com,yahoo.com,0,1,0,1,0
840853597,2@gmail.com,gmail.com,0,0,0,0,1
127424394,3@hotmail.com,hotmail.com,1,0,0,1,0
1145201472,4@other1.com,other1.com,1,1,0,0,0
1752933240,5@yahoo.com,yahoo.com,0,0,1,1,0
745082697,6@yahoo.com,yahoo.com,1,0,0,1,1
757028205,7@other2.edu,other2.edu,0,0,0,0,1
627019701,8@other3.gov,other3.gov,1,0,1,0,0
1632993648,9@hotmail.com,hotmail.com,1,1,0,0,1
1143440388,10@other3.gov,other3.gov,1,1,1,1,1

As you can see, I could have more attributes, but just for testing right now, I append only 5 boolean attributes.
Before FiloDB, I was think using HBase, but I don't think it can handle read path 3) and 4). So I want to give FiloDB a try. From the document and all the videos I can find online, it looks like I shouldn't use unique email address directly as partition key (Please confirm this). In that case, I just use the Math.abs(hashCode) of email address % 1024, as suggested as one way in the document. The trick part is the segment key, as the document describes, each segment should have some (not small) data. Maybe later I will store time series data of these emails, and rounding time could be a candidate as segment key. But for now, I am not sure this will be true, so I just use the domain part of the emails as the segment key. This will sort each partition's data by domain, even though I really don't have any read path by domain name. Finally, I choose the email address itself as the row key, which is unique each partition (globally in fact). Does this sound like reasonable modeling, at least for the first step of testing the query performance of filoDB for my case?

I will drop and recreate all the keyspaces, and start with small data first. Just want some feedback about my questions above.

Thanks

Yong

@velvia
Copy link
Member

velvia commented Mar 29, 2016

Hi Yong, thanks for the detailed email. I’ll respond inline later or tomorrow, but if you are coming to Strata San Jose at all, you should come by our FIloDB booth this week, would be easy to chat in person :)

-Evan

On Mar 28, 2016, at 5:03 PM, Yong Zhang notifications@github.com wrote:

Hi, Evan:

Thanks for replying.

It looks like the query you want me to run will get lots of rows, so I have to add "allow filtering", and only show the first 3 rows below: (not sure if you need more)

cqlsh:filodb> SELECT partition, segmentid, chunkid, data from poc1_chunks WHERE columnname = 'Email' allow filtering;

partition | segmentid | chunkid | data

0x00000130 | 0x676d61696c2e636f6d | 0 | 0x020100000c00000008000c0004000800080000002c010000040000000a000000280000003c0000005000000064000000780000008c000000a0000000b4000000c8000000dc000000110000003133363135313840676d61696c2e636f6d000000110000003133373038313340676d61696c2e636f6d000000110000003830353134373640676d61696c2e636f6d000000110000003830353235363540676d61696c2e636f6d000000110000003830353430333940676d61696c2e636f6d000000110000003830353435303140676d61696c2e636f6d000000110000003830353436323240676d61 696c2e636f6d000000110000003830353531323840676d61696c2e636f6d000000110000003830363436363540676d61696c2e636f6d000000110000003830363738313140676d61696c2e636f6d00000008000800070000000800000000000000
0x00000130 | 0x676d61696c2e636f6d | 1 | 0x020100000c00000008000c0004000800080000008400000004000000040000001000000024000000380000004c000000110000003533373738343940676d6169 6c2e636f6d000000110000003533393337333340676d61696c2e636f6d000000110000003837323138303540676d61696c2e636f6d000000110000003837343134303740676d61696c2e636f6d00000008000800070000000800000000000000
0x00000130 | 0x676d61696c2e636f6d | 2 | 0x020100000c00000008000c0004000800080000000c0200000400000012000000480000005c000000700000008400000098000000ac000000c0000000d4000000e8000000fc0000001001000024010000380100004c0100006001000074010000880100009c010000110000003430343433303840676d61696c2e636f6d000000110000003430353634303840676d61696c2e636f6d0000001000000036383530303240676d61696c2e636f6d000000001000000036383736363440676d61696c2e636f6d000000001000000036383936303040676d61696c2e636f6d000000001000000036393131353140676d61696c2e636f6d000000001000000036393233363140676d61696c2e636f6d000000001000000037303133303440676d61696c2e636f6d000000001000000037303336303340676d61696c2e636f6d00000000110000003733383537353640676d61696c2e636f6d000000110000003733383839303240676d61696c2e636f6d000000110000003733393338343140676d61696c2e636f6d000000110000003733393433343740676d61696c2e636f6d000000110000003733393434363840676d61696c2e636f6d000000110000003733393533313540676d6169 6c2e636f6d000000110000003733393535353740676d61696c2e636f6d000000110000003733393537393940676d61696c2e636f6d000000110000003733393736313440676d61696c2e636f6d00000008000800070000000800000000000000
0x00000130 | 0x676d61696c2e636f6d | 3 |

I think I maybe missed the "sort" part. I remember you mentioned somewhere in the document, but can you help me understand why I have to put the same partition key data into one SPARK node? Won't all the data eventually stored in C*? This sounds like an expensive requirement for ingesting data. In your 100k test data, the partition key is round globaleventid of 10,000. I didn't recall there is any sort to make each partition on one spark node? Is that right?

I will drop and reset the keyspace, and I will start with small size of data. I guess I am too exacted to try 50M after I found out the 100k sample data work great in our environment :-)

Before I restart everything, I like to discuss the data modeling I tried to do for this example data, maybe you can tell me if I totally misunderstand how FiloDB works.

I could have these billions of emails, each with the some attributes, you can think them like some rates, scores, count etc. Periodically (maybe daily for now), some part of row (maybe small, like 10%) will be updated. That is one reason I don't want to use HDFS. More important, for the read path, I have to support:

  1. concurrent single retrieved by email address fast
  2. concurrent retrieved by list of email address(totally random, size can be from hundreds to hundreds of thousands) with low latency
  3. Maybe not concurrently, but batch retrieved all the data, joining with other data in Spark, for data ming or data scientist research.
  4. There could be retrieving the data with filtered by some attributes without email address

Here is the top 10 rows I generated for testing FiloDB:
head -10 data/emails.dat
HashID,Email,Domain,GlobalStop,RestrictedAddress,RestrictedDomain,RoleAddress,Active
1139904900,1@yahoo.com mailto:1@yahoo.com,yahoo.com,0,1,0,1,0
840853597,2@gmail.com mailto:2@gmail.com,gmail.com,0,0,0,0,1
127424394,3@hotmail.com mailto:3@hotmail.com,hotmail.com,1,0,0,1,0
1145201472,4@other1.com mailto:4@other1.com,other1.com,1,1,0,0,0
1752933240,5@yahoo.com mailto:5@yahoo.com,yahoo.com,0,0,1,1,0
745082697,6@yahoo.com mailto:6@yahoo.com,yahoo.com,1,0,0,1,1
757028205,7@other2.edu mailto:7@other2.edu,other2.edu,0,0,0,0,1
627019701,8@other3.gov mailto:8@other3.gov,other3.gov,1,0,1,0,0
1632993648,9@hotmail.com mailto:9@hotmail.com,hotmail.com,1,1,0,0,1
1143440388,10@other3.gov mailto:10@other3.gov,other3.gov,1,1,1,1,1

As you can see, I could have more attributes, but just for testing right now, I append only 5 boolean attributes.
Before FiloDB, I was think using HBase, but I don't think it can handle read path 3) and 4). So I want to give FiloDB a try. From the document and all the videos I can find online, it looks like I shouldn't use unique email address directly as partition key (Please confirm this). In that case, I just use the Math.abs(hashCode) of email address % 1024, as suggested as one way in the document. The trick part is the segment key, as the document describes, each segment should have some (not small) data. Maybe later I will store time series data of these emails, and rounding time could be a candidate as segment key. But for now, I am not sure this will be true, so I just use the domain part of the emails as the segment key. This will sort each partition's data by domain, even though I really don't have any read path by domain name. Finally, I choose the email address itself as the row key, which is unique each partition (globally in fact). Does this sound like reasonable modeling, at leas t for the first step of testing the query performance of filoDB for my case?

I will drop and recreate all the keyspaces, and start with small data first. Just want some feedback about my questions above.

Thanks

Yong


You are receiving this because you commented.
Reply to this email directly or view it on GitHub #84 (comment)

@java8964
Copy link
Author

Unfortunately, I am in the east coast. Would love to join with you guys in NY if possible next time.

@velvia
Copy link
Member

velvia commented Mar 29, 2016

Yong,

I’ll be in NYC for Scala Days the second week of May or so. Replies inlined:

I think I maybe missed the "sort" part. I remember you mentioned somewhere in the document, but can you help me understand why I have to put the same partition key data into one SPARK node? Won't all the data eventually stored in C*? This sounds like an expensive requirement for ingesting data. In your 100k test data, the partition key is round globaleventid of 10,000. I didn't recall there is any sort to make each partition on one spark node? Is that right?

Yes all the data ends up in C*, but the process of converting rows into efficient columnar chunks works best when the data for one partition ends up in the same node. Strictly speaking a sort is not what is needed, what is needed is a shuffle such that data for each partition ends up together; actually sorting data within each node is not needed.

Cassandra does the same thing underneath the hood for regular row data - it will shuffle data across the network to the right coordinator according to the partition. Due to the columnar nature, Cassandra does not do that for this kind of data, so this has to be done for now for users. If we integrated directly into Cassandra, you would need a custom C* installation which would be much harder for folks to try out :-p

As you can see, I could have more attributes, but just for testing right now, I append only 5 boolean attributes.

Before FiloDB, I was think using HBase, but I don't think it can handle read path 3) and 4). So I want to give FiloDB a try. From the document and all the videos I can find online, it looks like I shouldn't use unique email address directly as partition key (Please confirm this). In that case, I

If you only have one record or a few records for each email, then yeah email is not a good partition key.
just use the Math.abs(hashCode) of email address % 1024, as suggested as one way in the document. The trick part is the segment key, as the document describes, each segment should have some (not small) data. Maybe later I will store time series data of these emails, and rounding time could be a candidate as segment key. But for now, I am not sure this will be true, so I just use the domain part of the emails as the segment key. This will sort each partition's data by domain, even though I really don't have any read path by domain name. Finally, I choose the email address itself as the row key, which is unique each partition (globally in fact). Does this sound like reasonable modeling, at leas t for the first step

of testing the query performance of filoDB for my case?

I think this is a reasonable setup, and probably perfect for read paths 3 and 4. For 1 where you want to retrieve a single record, the read would be less efficient for large domains because you would be retrieving many records for that domain at the same time. This is where playing around the segment key will yield a better balance - as you said, something along time series lines might make sense. Hope that helps!

I will drop and recreate all the keyspaces, and start with small data first. Just want some feedback about my questions above.

Thanks

Yong


You are receiving this because you commented.
Reply to this email directly or view it on GitHub #84 (comment)

@java8964
Copy link
Author

Evan, thanks for the detail information. It helps a lot for me understanding the internal work of FiloDB.

Here are steps what I did again to test loading small data, and based on your feedback:

  1. drop and recreate the keyspace
    cqlsh> desc keyspaces;
    filodb solr_admin cfs_archive "HiveMetaStore" dse_perf cfs
    dse_security system "OpsCenter" system_traces dse_system
    cqlsh> drop keyspace filodb;
    OperationTimedOut: errors={}, last_host=127.0.0.1
    cqlsh> desc keyspaces;
    dse_security system "OpsCenter" "HiveMetaStore" yzhang dse_system
    solr_admin cfs_archive system_traces dse_perf cfs
    cqlsh> exit

./filo-cli --command init
Initializing FiloDB Cassandra tables...
Succeeded.

  1. I load your 100k sample data, without any issue, and I can query the data very fast
  2. I tried to load my sample data, but this time, I only generate 1k data, very small. Below are top 10 rows
    PartitionId,Email,Domain,GlobalStop,RestrictedAddress,RestrictedDomain,RoleAddress,Active
    334,1@other3.gov,other3.gov,0,0,1,1,1
    955,2@yahoo.com,yahoo.com,0,1,0,1,1
    656,3@other3.gov,other3.gov,1,1,0,0,1
    455,4@yahoo.com,yahoo.com,1,0,1,1,1
    26,5@gmail.com,gmail.com,0,1,1,1,0
    130,6@other1.com,other1.com,0,0,0,1,1
    276,7@other3.gov,other3.gov,1,0,1,1,0
    1009,8@hotmail.com,hotmail.com,0,1,1,0,0
    116,9@yahoo.com,yahoo.com,1,0,1,1,0
    886,10@hotmail.com,hotmail.com,0,0,1,1,0

So instead of hashID, I generate the partition id directly using "hashCode % 1024", to make everything as simple as possible

  1. I started the Spark 1.5.2, with 3G memory of driver, and 16g of executor heap, each executor will have 3 concurrency as it is a 5 work nodes spark cluster
    ~/spark/bin/spark-shell --jars /home/yzhang/FiloDB-0.2/spark/target/scala-2.10/filodb-spark-assembly-0.2.jar,/home/yzhang/spark-csv_2.10-1.4.0.jar,/home/yzhang/commons-csv-1.2.jar --driver-memory 3G --executor-memory 16g --total-executor-cores 15

  2. Try to load these simple 1k test data
    import org.apache.spark.sql.SaveMode

val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/emails_1k.csv")

scala> csvDF.printSchema
root
|-- PartitionId: integer (nullable = true)
|-- Email: string (nullable = true)
|-- Domain: string (nullable = true)
|-- GlobalStop: integer (nullable = true)
|-- RestrictedAddress: integer (nullable = true)
|-- RestrictedDomain: integer (nullable = true)
|-- RoleAddress: integer (nullable = true)
|-- Active: integer (nullable = true)

csvDF.sort($"PartitionId").show(1000)

I can see all 1k data sorted by PartitionId

  1. Now I try to save it
    csvDF.sort($"PartitionId").write.format("filodb.spark").
    option("dataset", "poc1").
    option("row_keys", "Email").
    option("segment_key", "Domain").
    option("partition_keys", "PartitionId").
    mode(SaveMode.Overwrite).save()
    The spark action hang in the sorting part, or I think indeed in ingestion into C* part. From the spark console, I can only see this:
    [Stage 5:> (0 + 15) / 200]

But in the spark executor log, I saw the following errors:
org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0
at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:264)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:294)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:117)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.access$100(UnsafeExternalRowSorter.java:40)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:151)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:914)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:634)
at filodb.coordinator.RowSource$$anonfun$reading$1.applyOrElse(RowSource.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at filodb.coordinator.BaseActor.aroundReceive(BaseActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/29/2016 10:44:24.002] [filo-spark-akka.actor.default-dispatcher-6] [akka://filo-spark/user/poc1_0_3] Internal error: release called on 67108864 bytes but task only has 0
org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0
at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:264)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:294)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:117)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.access$100(UnsafeExternalRowSorter.java:40)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:151)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:914)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:634)
at filodb.coordinator.RowSource$$anonfun$reading$1.applyOrElse(RowSource.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at filodb.coordinator.BaseActor.aroundReceive(BaseActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/29/2016 10:44:24.002] [filo-spark-akka.actor.default-dispatcher-6] [akka://filo-spark/user/poc1_0_13] Internal error: release called on 67108864 bytes but task only has 0
org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0
at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:264)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:294)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:117)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.access$100(UnsafeExternalRowSorter.java:40)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:151)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:914)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:634)
at filodb.coordinator.RowSource$$anonfun$reading$1.applyOrElse(RowSource.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at filodb.coordinator.BaseActor.aroundReceive(BaseActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks

@velvia
Copy link
Member

velvia commented Mar 29, 2016

Yong, can you please send me your 1k rows data file and I'll try to repro it locally? Thanks.

-Evan
"Never doubt that a small group of thoughtful, committed citizens can change the world" - M. Mead

On Mar 29, 2016, at 7:59 AM, Yong Zhang notifications@github.com wrote:

Evan, thanks for the detail information. It helps a lot for me understanding the internal work of FiloDB.

Here are steps what I did again to test loading small data, and based on your feedback:

  1. drop and recreate the keyspace
    cqlsh> desc keyspaces;
    filodb solr_admin cfs_archive "HiveMetaStore" dse_perf cfs
    dse_security system "OpsCenter" system_traces dse_system
    cqlsh> drop keyspace filodb;
    OperationTimedOut: errors={}, last_host=127.0.0.1
    cqlsh> desc keyspaces;
    dse_security system "OpsCenter" "HiveMetaStore" yzhang dse_system
    solr_admin cfs_archive system_traces dse_perf cfs
    cqlsh> exit

./filo-cli --command init
Initializing FiloDB Cassandra tables...
Succeeded.

  1. I load your 100k sample data, without any issue, and I can query the data very fast
  2. I tried to load my sample data, but this time, I only generate 1k data, very small. Below are top 10 rows
    PartitionId,Email,Domain,GlobalStop,RestrictedAddress,RestrictedDomain,RoleAddress,Active
    334,1@other3.gov,other3.gov,0,0,1,1,1
    955,2@yahoo.com,yahoo.com,0,1,0,1,1
    656,3@other3.gov,other3.gov,1,1,0,0,1
    455,4@yahoo.com,yahoo.com,1,0,1,1,1
    26,5@gmail.com,gmail.com,0,1,1,1,0
    130,6@other1.com,other1.com,0,0,0,1,1
    276,7@other3.gov,other3.gov,1,0,1,1,0
    1009,8@hotmail.com,hotmail.com,0,1,1,0,0
    116,9@yahoo.com,yahoo.com,1,0,1,1,0
    886,10@hotmail.com,hotmail.com,0,0,1,1,0

So instead of hashID, I generate the partition id directly using "hashCode % 1024", to make everything as simple as possible

  1. I started the Spark 1.5.2, with 3G memory of driver, and 16g of executor heap, each executor will have 3 concurrency as it is a 5 work nodes spark cluster
    ~/spark/bin/spark-shell --jars /home/yzhang/FiloDB-0.2/spark/target/scala-2.10/filodb-spark-assembly-0.2.jar,/home/yzhang/spark-csv_2.10-1.4.0.jar,/home/yzhang/commons-csv-1.2.jar --driver-memory 3G --executor-memory 16g --total-executor-cores 15

  2. Try to load these simple 1k test data
    import org.apache.spark.sql.SaveMode

val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/emails_1k.csv")

scala> csvDF.printSchema
root
|-- PartitionId: integer (nullable = true)
|-- Email: string (nullable = true)
|-- Domain: string (nullable = true)
|-- GlobalStop: integer (nullable = true)
|-- RestrictedAddress: integer (nullable = true)
|-- RestrictedDomain: integer (nullable = true)
|-- RoleAddress: integer (nullable = true)
|-- Active: integer (nullable = true)

csvDF.sort($"PartitionId").show(1000)

I can see all 1k data sorted by PartitionId

  1. Now I try to save it
    csvDF.sort($"PartitionId").write.format("filodb.spark").
    option("dataset", "poc1").
    option("row_keys", "Email").
    option("segment_key", "Domain").
    option("partition_keys", "PartitionId").
    mode(SaveMode.Overwrite).save()
    The spark action hang in the sorting part, or I think indeed in ingestion into C* part. From the spark console, I can only see this:
    [Stage 5:> (0 + 15) / 200]

But in the spark executor log, I saw the following errors:
org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0
at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:264)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:294)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:117)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.access$100(UnsafeExternalRowSorter.java:40)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:151)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:914)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:634)
at filodb.coordinator.RowSource$$anonfun$reading$1.applyOrElse(RowSource.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at filodb.coordinator.BaseActor.aroundReceive(BaseActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/29/2016 10:44:24.002] [filo-spark-akka.actor.default-dispatcher-6] [akka://filo-spark/user/poc1_0_3] Internal error: release called on 67108864 bytes but task only has 0
org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0
at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:264)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:294)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:117)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.access$100(UnsafeExternalRowSorter.java:40)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:151)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:914)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:634)
at filodb.coordinator.RowSource$$anonfun$reading$1.applyOrElse(RowSource.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at filodb.coordinator.BaseActor.aroundReceive(BaseActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/29/2016 10:44:24.002] [filo-spark-akka.actor.default-dispatcher-6] [akka://filo-spark/user/poc1_0_13] Internal error: release called on 67108864 bytes but task only has 0
org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0
at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:264)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:294)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:117)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.access$100(UnsafeExternalRowSorter.java:40)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:151)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:914)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:634)
at filodb.coordinator.RowSource$$anonfun$reading$1.applyOrElse(RowSource.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at filodb.coordinator.BaseActor.aroundReceive(BaseActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks


You are receiving this because you commented.
Reply to this email directly or view it on GitHub

@java8964
Copy link
Author

Sure. Attached here.
emails_1k.zip

@java8964
Copy link
Author

Hi, Evan:

Just FYI, I can reproduce this problem on my local laptop using "spark.filodb.store=in-memory". It is great for debugging with option of "in-memory".

Yong

@velvia
Copy link
Member

velvia commented Mar 29, 2016

Thanks, that’s really helpful. Will look at it when I can, most likely not until later this week.

On Mar 29, 2016, at 4:32 PM, Yong Zhang notifications@github.com wrote:

Hi, Evan:

Just FYI, I can reproduce this problem on my local laptop using "spark.filodb.store=in-memory". It is great for debugging with option of "in-memory".

Yong


You are receiving this because you commented.
Reply to this email directly or view it on GitHub #84 (comment)

@velvia
Copy link
Member

velvia commented Mar 30, 2016

Hmm. So I can reproduce this on laptop using in-memory, with the sorting. That’s interesting, will have to look into what this external row sorter thing does. If I remove the sort, then the ingest of all 1k rows works.

On Mar 29, 2016, at 7:59 AM, Yong Zhang notifications@github.com wrote:

Evan, thanks for the detail information. It helps a lot for me understanding the internal work of FiloDB.

Here are steps what I did again to test loading small data, and based on your feedback:

  1. drop and recreate the keyspace
    cqlsh> desc keyspaces;
    filodb solr_admin cfs_archive "HiveMetaStore" dse_perf cfs
    dse_security system "OpsCenter" system_traces dse_system
    cqlsh> drop keyspace filodb;
    OperationTimedOut: errors={}, last_host=127.0.0.1
    cqlsh> desc keyspaces;
    dse_security system "OpsCenter" "HiveMetaStore" yzhang dse_system
    solr_admin cfs_archive system_traces dse_perf cfs
    cqlsh> exit

./filo-cli --command init
Initializing FiloDB Cassandra tables...
Succeeded.

  1. I load your 100k sample data, without any issue, and I can query the data very fast
  2. I tried to load my sample data, but this time, I only generate 1k data, very small. Below are top 10 rows
    PartitionId,Email,Domain,GlobalStop,RestrictedAddress,RestrictedDomain,RoleAddress,Active
    334,1@other3.gov mailto:1@other3.gov,other3.gov,0,0,1,1,1
    955,2@yahoo.com mailto:2@yahoo.com,yahoo.com,0,1,0,1,1
    656,3@other3.gov mailto:3@other3.gov,other3.gov,1,1,0,0,1
    455,4@yahoo.com mailto:4@yahoo.com,yahoo.com,1,0,1,1,1
    26,5@gmail.com mailto:5@gmail.com,gmail.com,0,1,1,1,0
    130,6@other1.com mailto:6@other1.com,other1.com,0,0,0,1,1
    276,7@other3.gov mailto:7@other3.gov,other3.gov,1,0,1,1,0
    1009,8@hotmail.com mailto:8@hotmail.com,hotmail.com,0,1,1,0,0
    116,9@yahoo.com mailto:9@yahoo.com,yahoo.com,1,0,1,1,0
    886,10@hotmail.com mailto:10@hotmail.com,hotmail.com,0,0,1,1,0

So instead of hashID, I generate the partition id directly using "hashCode % 1024", to make everything as simple as possible

  1. I started the Spark 1.5.2, with 3G memory of driver, and 16g of executor heap, each executor will have 3 concurrency as it is a 5 work nodes spark cluster
    ~/spark/bin/spark-shell --jars /home/yzhang/FiloDB-0.2/spark/target/scala-2.10/filodb-spark-assembly-0.2.jar,/home/yzhang/spark-csv_2.10-1.4.0.jar,/home/yzhang/commons-csv-1.2.jar --driver-memory 3G --executor-memory 16g --total-executor-cores 15

  2. Try to load these simple 1k test data
    import org.apache.spark.sql.SaveMode

val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/emails_1k.csv")

scala> csvDF.printSchema
root
|-- PartitionId: integer (nullable = true)
|-- Email: string (nullable = true)
|-- Domain: string (nullable = true)
|-- GlobalStop: integer (nullable = true)
|-- RestrictedAddress: integer (nullable = true)
|-- RestrictedDomain: integer (nullable = true)
|-- RoleAddress: integer (nullable = true)
|-- Active: integer (nullable = true)

csvDF.sort($"PartitionId").show(1000)

I can see all 1k data sorted by PartitionId

  1. Now I try to save it
    csvDF.sort($"PartitionId").write.format("filodb.spark").
    option("dataset", "poc1").
    option("row_keys", "Email").
    option("segment_key", "Domain").
    option("partition_keys", "PartitionId").
    mode(SaveMode.Overwrite).save()
    The spark action hang in the sorting part, or I think indeed in ingestion into C* part. From the spark console, I can only see this:
    [Stage 5:> (0 + 15) / 200]

But in the spark executor log, I saw the following errors:
org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0
at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:264)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:294)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:117)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.access$100(UnsafeExternalRowSorter.java:40)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:151)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:914)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:634)
at filodb.coordinator.RowSource$$anonfun$reading$1.applyOrElse(RowSource.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at filodb.coordinator.BaseActor.aroundReceive(BaseActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/29/2016 10:44:24.002] [filo-spark-akka.actor.default-dispatcher-6] [akka://filo-spark/user/poc1_0_3] Internal error: release called on 67108864 bytes but task only has 0
org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0
at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:264)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:294)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:117)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.access$100(UnsafeExternalRowSorter.java:40)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:151)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:914)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:634)
at filodb.coordinator.RowSource$$anonfun$reading$1.applyOrElse(RowSource.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at filodb.coordinator.BaseActor.aroundReceive(BaseActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [03/29/2016 10:44:24.002] [filo-spark-akka.actor.default-dispatcher-6] [akka://filo-spark/user/poc1_0_13] Internal error: release called on 67108864 bytes but task only has 0
org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0
at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:264)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:294)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:117)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.access$100(UnsafeExternalRowSorter.java:40)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:151)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:914)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:929)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:968)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:972)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:634)
at filodb.coordinator.RowSource$$anonfun$reading$1.applyOrElse(RowSource.scala:78)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at filodb.coordinator.BaseActor.aroundReceive(BaseActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks


You are receiving this because you commented.
Reply to this email directly or view it on GitHub #84 (comment)

@java8964
Copy link
Author

Yes, I noticed that too.

I even test following, partition without sorting, but got the same error:

scala> import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SaveMode

scala> val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/emails_1k.csv")
csvDF: org.apache.spark.sql.DataFrame = [PartitionId: int, Email: string, Domain: string, GlobalStop: int, RestrictedAddress: int, RestrictedDomain: int, RoleAddress: int, Active: int]

scala> import org.apache.spark.sql.functions.{rowNumber}
import org.apache.spark.sql.functions.rowNumber

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val w = Window.partitionBy($"PartitionId")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@3e03037f

scala> val newDF = csvDF.withColumn("rn", rowNumber.over(w)).drop("rn")
newDF: org.apache.spark.sql.DataFrame = [PartitionId: int, Email: string, Domain: string, GlobalStop: int, RestrictedAddress: int, RestrictedDomain: int, RoleAddress: int, Active: int]

scala> newDF.write.format("filodb.spark").option("dataset", "poc1").option("row_keys", "Email").option("segment_key", "Domain").option("partition_keys", "PartitionId").mode(SaveMode.Overwrite).save()

@java8964
Copy link
Author

It looks like this problem only exists with the new tungsten shuffle manager.
After I disable it (spark.sql.tungsten.enabled=false) and use the old sort shuffle, everything works. It is maybe a tungsten sort problem. So for now, I will stick with sort shuffle.

I think 1k will have no problem to ingest, and tomorrow, I will test more data. Will update this ticket if I face other issue for my 50M goal.

Thanks

Yong

@velvia
Copy link
Member

velvia commented Mar 31, 2016

Great thanks. Will keep investigating this though.

On Mar 30, 2016, at 6:22 PM, Yong Zhang notifications@github.com wrote:

It looks like this problem only exists with the new tungsten shuffle manager.
After I disable it (spark.sql.tungsten.enabled=false) and use the old sort shuffle, everything works. It is maybe a tungsten sort problem. So for now, I will stick with sort shuffle.

I think 1k will have no problem to ingest, and tomorrow, I will test more data. Will update this ticket if I face other issue for my 50M goal.

Thanks

Yong


You are receiving this because you commented.
Reply to this email directly or view it on GitHub #84 (comment)

@java8964
Copy link
Author

Hi, Evan:
I succeeded loading 50M data. The important trick is to avoid "Tungsten sort". I am very impressive of the querying speed. I am very confident that if Filo DB matures enough, there will be lots of use cases can benefit from it.

There are 2 issues I found out so far, but I understand for a version 0.2 release, it is normal:

  1. I loaded 10M first, then loaded 50M (include the first 10M rows) again. The total rows are 49991285 after 2nd load. There are 8k data gone.
  2. Even self querying this dataset is super fast, but joining with another small dataset in the Spark causing problems.

I will try to understand more about FiloDB, but thanks for such an amazing open source idea.

Let me know what I can help for maturing it.

Yong

@velvia
Copy link
Member

velvia commented Apr 1, 2016

Yong,

Thanks so much for the kind words.

  1. 8k data missing - I’m in the middle of adding a stress test to debug and make sure no data is missing.
  2. what do you mean that joining is causing problems? Joining will be slower for sure, Spark’s join optimization is not very good. I would make sure that you are pushing down all predicates. Is the other table also FiloDB, or regular Cassandra, or something else?

I wrote a blog post about doing fast joins recently.

I will investigate the tungsten sort failure still. After Spark 1.6 you can’t turn off tungsten any more, so we do want to get the sort working.

On Mar 31, 2016, at 7:06 AM, Yong Zhang notifications@github.com wrote:

Hi, Evan:
I succeeded loading 50M data. The important trick is to avoid "Tungsten sort". I am very impressive of the querying speed. I am very confident that if Filo DB matures enough, there will be lots of use cases can benefit from it.

There are 2 issues I found out so far, but I understand for a version 0.2 release, it is normal:

  1. I loaded 10M first, then loaded 50M (include the first 10M rows) again. The total rows are 49991285 after 2nd load. There are 8k data gone.
  2. Even self querying this dataset is super fast, but joining with another small dataset in the Spark causing problems.

I will try to understand more about FiloDB, but thanks for such an amazing open source idea.

Let me know what I can help for maturing it.

Yong


You are receiving this because you commented.
Reply to this email directly or view it on GitHub #84 (comment)

@java8964
Copy link
Author

java8964 commented Apr 1, 2016

Hi, Evan:
Is there mailing list or google group for FiloDB? I searched, but didn't find any.
Yong

Date: Thu, 31 Mar 2016 23:42:55 -0700
From: notifications@github.com
To: FiloDB@noreply.github.com
CC: java8964@hotmail.com
Subject: Re: [tuplejump/FiloDB] Futures timed out after 5 seconds

Yong,

Thanks so much for the kind words.

  1. 8k data missing - I’m in the middle of adding a stress test to debug and make sure no data is missing.

  2. what do you mean that joining is causing problems? Joining will be slower for sure, Spark’s join optimization is not very good. I would make sure that you are pushing down all predicates. Is the other table also FiloDB, or regular Cassandra, or something else?

I wrote a blog post about doing fast joins recently.

I will investigate the tungsten sort failure still. After Spark 1.6 you can’t turn off tungsten any more, so we do want to get the sort working.

On Mar 31, 2016, at 7:06 AM, Yong Zhang notifications@github.com wrote:

Hi, Evan:

I succeeded loading 50M data. The important trick is to avoid "Tungsten sort". I am very impressive of the querying speed. I am very confident that if Filo DB matures enough, there will be lots of use cases can benefit from it.

There are 2 issues I found out so far, but I understand for a version 0.2 release, it is normal:

  1. I loaded 10M first, then loaded 50M (include the first 10M rows) again. The total rows are 49991285 after 2nd load. There are 8k data gone.

  2. Even self querying this dataset is super fast, but joining with another small dataset in the Spark causing problems.

I will try to understand more about FiloDB, but thanks for such an amazing open source idea.

Let me know what I can help for maturing it.

Yong

You are receiving this because you commented.

Reply to this email directly or view it on GitHub #84 (comment)


You are receiving this because you modified the open/close state.
Reply to this email directly or view it on GitHub

@velvia
Copy link
Member

velvia commented Apr 1, 2016

https://groups.google.com/forum/#!forum/filodb-discuss https://groups.google.com/forum/#!forum/filodb-discuss

I haven’t advertised it yet. I just added a link to the top of the README. There is also a Gitter channel.

On Apr 1, 2016, at 6:45 AM, Yong Zhang notifications@github.com wrote:

Hi, Evan:
Is there mailing list or google group for FiloDB? I searched, but didn't find any.
Yong

Date: Thu, 31 Mar 2016 23:42:55 -0700
From: notifications@github.com
To: FiloDB@noreply.github.com
CC: java8964@hotmail.com
Subject: Re: [tuplejump/FiloDB] Futures timed out after 5 seconds

Yong,

Thanks so much for the kind words.

  1. 8k data missing - I’m in the middle of adding a stress test to debug and make sure no data is missing.

  2. what do you mean that joining is causing problems? Joining will be slower for sure, Spark’s join optimization is not very good. I would make sure that you are pushing down all predicates. Is the other table also FiloDB, or regular Cassandra, or something else?

I wrote a blog post about doing fast joins recently.

I will investigate the tungsten sort failure still. After Spark 1.6 you can’t turn off tungsten any more, so we do want to get the sort working.

On Mar 31, 2016, at 7:06 AM, Yong Zhang notifications@github.com wrote:

Hi, Evan:

I succeeded loading 50M data. The important trick is to avoid "Tungsten sort". I am very impressive of the querying speed. I am very confident that if Filo DB matures enough, there will be lots of use cases can benefit from it.

There are 2 issues I found out so far, but I understand for a version 0.2 release, it is normal:

  1. I loaded 10M first, then loaded 50M (include the first 10M rows) again. The total rows are 49991285 after 2nd load. There are 8k data gone.

  2. Even self querying this dataset is super fast, but joining with another small dataset in the Spark causing problems.

I will try to understand more about FiloDB, but thanks for such an amazing open source idea.

Let me know what I can help for maturing it.

Yong

You are receiving this because you commented.

Reply to this email directly or view it on GitHub #84 (comment)


You are receiving this because you modified the open/close state.
Reply to this email directly or view it on GitHub


You are receiving this because you commented.
Reply to this email directly or view it on GitHub #84 (comment)

@velvia
Copy link
Member

velvia commented Apr 3, 2016

BTW @java8964 I discovered, I think, why the error above org.apache.spark.SparkException: Internal error: release called on 67108864 bytes but task only has 0 at org.apache.spark.shuffle.ShuffleMemoryManager.release(ShuffleMemoryManager.scala:117) a occurs on sort, and only for Tungsten. The ShuffleMemoryManager relies on a TaskContext, which uses a ThreadLocal. FiloDB however uses Akka actors for ingestion and messaging, and this means the thread on which the ingestion is happening is not the same thread as where the TaskContext is located, which leads to incorrect task memory info and the error above.

Alas, changing the ingestion such that the actor can run on the same thread as the task is not easy. I'll investigate a bit.

@java8964
Copy link
Author

java8964 commented Apr 4, 2016

Hi, Evan:

Thanks for the updating. Meantime, I played with FiloDB for different testing cases I have, and I got more questions :-)

  1. Now I can load 100M data on this 6 nodes cluster. I changed my modeling as following:
    val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/poc2.csv")
    val df = csvDF.withColumn("PartId", substring($"Email", 0, 2)).sort($"PartId").drop("PartId")
    df.write.format("filodb.spark").option("dataset", "poc2").option("row_keys", "Email").option("segment_key", "Domain").option("partition_keys", ":stringPrefix Email 2").mode(SaveMode.Overwrite).save()
    So I use the first 2 characters as partition key, which allows me retrieve single email from 100M by email address sub seconds:
    scala> time(sqlContext.sql("select * from poc2 where Email='8i3999@other1.com'").show)
    +----------------+-----------------+-----------------+----------+-----------+----------+------+
    |RestrictedDomain|RestrictedAddress| Email|GlobalStop|RoleAddress| Domain|Active|
    +----------------+-----------------+-----------------+----------+-----------+----------+------+
    | 1| 1|8i3999@other1.com| 1| 1|other1.com| 0|
    +----------------+-----------------+-----------------+----------+-----------+----------+------+
    time: 0.264 seconds
    This is very important for me.
    My first question is that is the partition key still affect how the data stored and sorted by the segment keys? For example, in the above example, if I use the first 2 characters of email as the partition key, but use the domain as segment keys, in the data of '8i3999@other1.com' and '8i255@gmail.com', they will belong to same partition, but will they be stored in the segment or not? Or ask this way, is the data stored/sorted by segment keys ONLY, or really partition keys + segment keys? Understanding this help me to design the correct modeling for our case.

  2. Originally when I test the following case, join the 100M emails with randomly 1k, 10k, 100k emails, I forget the add "broadcast", so the join very slow and eventually fails in the spark. After I use the following way, I can get consistent latency:
    scala> time(poc2.join(broadcast(r_1k),poc2("Email")===r_1k("Address"),"inner").agg(sum("GlobalStop")).show)
    +---------------+
    |sum(GlobalStop)|
    +---------------+
    | 500|
    +---------------+
    time: 22.218 seconds
    scala> time(poc2.join(broadcast(r_10k),poc2("Email")===r_10k("Address"),"inner").agg(sum("GlobalStop")).show)
    +---------------+
    |sum(GlobalStop)|
    +---------------+
    | 4948|
    +---------------+
    time: 22.21 seconds
    scala>time(poc2.join(broadcast(r_100k),poc2("Email")===r_100k("Address"),"inner").agg(sum("GlobalStop")).show)
    +---------------+
    |sum(GlobalStop)|
    +---------------+
    | 50063|
    +———————+
    time: 22.321 seconds

You can see, in this case, I can get consistently about 22 seconds, even I randomly join by 1k, 10k, and 100k emails (I have to use "broadcast", I think it makes sense this case).
But I want to know if there is anything I can do to push the limits of FiloDB, I want to see what is the smallest latency I can get for the above 3 queries. What suggestions you can think of?

  1. Then I changed my modeling, as following:
    val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/poc2.csv")
    val df = csvDF.withColumn("PartId", substring($"Email", 0, 2)).sort($"PartId").drop("PartId")
    df.write.format("filodb.spark").option("dataset", "poc3").option("row_keys", "Email").option("segment_key", ":stringPrefix Email 3").option("partition_keys", ":stringPrefix Email 2").mode(SaveMode.Overwrite).save()

So instead of using domain as segment keys, I use the first 3 characters as the segment keys.
To my surprise, I lost data while ingesting the same 100M data into it.

scala> val poc3 = sqlContext.read.format("filodb.spark").option("dataset", "poc3").load()
poc3: org.apache.spark.sql.DataFrame = [RestrictedDomain: int, RestrictedAddress: int, Email: string, GlobalStop: int, RoleAddress: int, Domain: string, Active: int]
scala> poc3.count
res2: Long = 99981408
So in this way, I lost some rows while ingesting 100M data. Any hints?

  1. Losting update while ingesting data is a big concern for me as right now. From the document, I understand there COULD be only one writer per partition in ingestion, correct? I need to write some testing code to check how reliable FiloDB of ingesting data. Will update with you later what I find out.

  2. I am still trying to understand all the tuning options I have for FiloDB, that including on the C* side. Currently everything is default as in FiloDB 0.2. I really like to push the limits of it, if I can.

@velvia
Copy link
Member

velvia commented Apr 5, 2016

Hi Yong,

I’m very close to a fix for the Tungsten sort thing btw…. but a quick answer inline before i get my kids up….

My first question is that is the partition key still affect how the data stored and sorted by the segment keys? For example, in the above example, if I use the first 2 characters of email as the partition key, but use the domain as segment keys, in the data of '8i3999@other1.com mailto:8i3999@other1.com' and '8i255@gmail.com mailto:8i255@gmail.com', they will belong to same partition, but will they be stored in the segment or not? Or ask this way, is the data stored/sorted by segment keys ONLY, or really partition keys + segment keys? Understanding this help me to design the correct modeling for our case.

This is the cassandra table layout (you can inspect the same using cqlsh):

CREATE TABLE filodb.gdelt_chunks (
partition blob,
version int,
columnname text,
segmentid blob,
chunkid int,
data blob,
PRIMARY KEY ((partition, version), columnname, segmentid, chunkid)
) WITH COMPACT STORAGE

Thus as you can see, both the partition key and the segment ID (key) are part of the primary key. I think your design is correct - prefix 2 chars for email, then segment by domain. If you add domain to your filter/predicates (right now you have to do domain >= DOMAIN1 && domain <= DOMAIN1 unfortunately … for the pushdown to work) then you will be able to look up not only the exact partition but the exact segment within a partition as well.

You can see, in this case, I can get consistently about 22 seconds, even I randomly join by 1k, 10k, and 100k emails (I have to use "broadcast", I think it makes sense this case).

But I want to know if there is anything I can do to push the limits of FiloDB, I want to see what is the smallest latency I can get for the above 3 queries. What suggestions you can think of?

Have a look at this blog post:
http://www.planetcassandra.org/blog/achieving-sub-second-sql-joins-and-building-a-data-warehouse-using-spark-cassandra-and-filodb/ http://www.planetcassandra.org/blog/achieving-sub-second-sql-joins-and-building-a-data-warehouse-using-spark-cassandra-and-filodb/

TL/DR; the key is aiming for single partition queries (such as your email search above) for all tables, or at least achieving pushdowns. Broadcast doesn’t help too much if the table is big, because of the cost of broadcasting itself. I mean it avoids shuffles but there is also a high cost to broadcast it in the first place (unless you can keep broadcast tables in memory - we can discuss this more)

  1. Then I changed my modeling, as following:
    val csvDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("/home/yzhang/data/poc2.csv")
    val df = csvDF.withColumn("PartId", substring($"Email", 0, 2)).sort($"PartId").drop("PartId")
    df.write.format("filodb.spark").option("dataset", "poc3").option("row_keys", "Email").option("segment_key", ":stringPrefix Email 3").option("partition_keys", ":stringPrefix Email 2").mode(SaveMode.Overwrite).save()

So instead of using domain as segment keys, I use the first 3 characters as the segment keys.
To my surprise, I lost data while ingesting the same 100M data into it.

Hmm. Run ./filo-cli —command analyze —dataset poc3 and compare it to your other data model. You might see that the average size of segment is not that big. Feel free to post it here. That will tell you if it’s a good idea or not (for my NYC Taxi dataset, this was NOT a good idea - although a great stress test).

scala> val poc3 = sqlContext.read.format("filodb.spark").option("dataset", "poc3").load()
poc3: org.apache.spark.sql.DataFrame = [RestrictedDomain: int, RestrictedAddress: int, Email: string, GlobalStop: int, RoleAddress: int, Domain: string, Active: int]
scala> poc3.count
res2: Long = 99981408
So in this way, I lost some rows while ingesting 100M data. Any hints?

  1. Losting update while ingesting data is a big concern for me as right now. From the document, I understand there COULD be only one writer per partition in ingestion, correct? I need to write some testing code to check how reliable FiloDB of ingesting data. Will update with you later what I find out.

Thanks, this is very helpful to stabilize & productionize FiloDB. You run different tests than what my client is running.

  1. I am still trying to understand all the tuning options I have for FiloDB, that including on the C* side. Currently everything is default as in FiloDB 0.2. I really like to push the limits of it, if I can.


You are receiving this because you commented.
Reply to this email directly or view it on GitHub #84 (comment)

@java8964
Copy link
Author

java8964 commented Apr 5, 2016

I created a separate issue about Ingesting data (#86), we can talk about problem I faced about ingesting there. For this, I would like to discuss with you about the query optimization.

Right now, I understand better about how the FiloDB storing the data. The question now is how pushdowns can be sent to FiloDB reliably in Spark.
For example, I still use first 2 characters of email as partition key, domain as segment key. So in any query, with Email equals + Domain <= and >= is the best query performance I can get. That is my goal, but this pushdown doesn't work in Spark. I show you as examples below, and running them in memory storage on my local laptop. So I have 2 DFs, poc2 with 1M emails + attributes, and r_1k is 1k emails + domains ONLY:

poc2.filter(poc2("Email") === "vgd999@yahoo.com").explain(true)
== Physical Plan ==
Filter (Email#10 = vgd999@yahoo.com)
Scan FiloRelation(poc2,0,4)[RestrictedDomain#8,RestrictedAddress#9,Email#10,GlobalStop#11,RoleAddress#12,Domain#13,Active#14]

This is good, as the filter is pushed down to FiloDB. So in cluster, I also get the sub second latency.
Same as this:

scala> poc2.filter(poc2("Email") === "vgd999@yahoo.com" && poc2("Domain") >= "yahoo.com" && poc2("Domain") <= "yahoo.com").explain(true)
== Physical Plan ==
Filter (((Email#10 = vgd999@yahoo.com) && (Domain#13 >= yahoo.com)) && (Domain#13 <= yahoo.com))
Scan FiloRelation(poc2,0,4)[RestrictedDomain#8,RestrictedAddress#9,Email#10,GlobalStop#11,RoleAddress#12,Domain#13,Active#14]

So the above query plan is my goal. But if I have 1k data also in another spark df (not in filoDB), let's say in HDFS or other storage:

scala> r_1k.count
res3: Long = 1001

scala> r_1k.printSchema
root
|-- Email: string (nullable = true)
|-- Domain: string (nullable = true)

First, without broadcast:
scala> poc2.join(r_1k,poc2("Email") === r_1k("Email") && poc2("Domain") >= r_1k("Domain") && poc2("Domain") <= r_1k("Domain"),"inner").explain(true)
== Physical Plan ==
Filter ((Domain#13 >= Domain#21) && (Domain#13 <= Domain#21))
SortMergeJoin [Email#10], [Email#20]
ExternalSort [Email#10 ASC], false
Exchange hashpartitioning(Email#10)
Scan FiloRelation(poc2,0,4)[RestrictedDomain#8,RestrictedAddress#9,Email#10,GlobalStop#11,RoleAddress#12,Domain#13,Active#14]
ExternalSort [Email#20 ASC], false
Exchange hashpartitioning(Email#20)
Scan CsvRelation(,Some(/home/yzhang/data/list_1k.csv),true,,,",null,#,PERMISSIVE,COMMONS,false,false,false,null,true,null,,null)[Email#20,Domain#21]

You can see Spark use "SortMergeJoin", which is a VERY expensive way for 100M + 1k join. That is the reason almost killing me when I test this on cluster. More important, you can see the Email (Partition key) is NOT part of the filter any more. In this case, all the C* node will be queried about this 1k data, even though it could be just small part of the nodes really contain the data of these 1k emails.

With broadcast, the plan is following:
scala> poc2.join(broadcast(r_1k),poc2("Email") === r_1k("Email") && poc2("Domain") >= r_1k("Domain") && poc2("Domain") <= r_1k("Domain"),"inner").explain(true)
== Physical Plan ==
Filter ((Domain#13 >= Domain#21) && (Domain#13 <= Domain#21))
BroadcastHashJoin [Email#10], [Email#20], BuildRight
Scan FiloRelation(poc2,0,4)[RestrictedDomain#8,RestrictedAddress#9,Email#10,GlobalStop#11,RoleAddress#12,Domain#13,Active#14]
Scan CsvRelation(,Some(/home/yzhang/data/list_1k.csv),true,,,",null,#,PERMISSIVE,COMMONS,false,false,false,null,true,null,,null)[Email#20,Domain#21]

So Spark goes with BroadcastHashJoin. Even it comes with cost of broadcasting, but it is much better than the SortMerge in this case. That is why I originally found out I have to use "broadcast".

It is very difficult keeping filtering by partition keys in the join, even though that is the best way for filoDB. I have to come out sql like following:
sqlContext.sql("select * from poc2 where Email in ('vgd999@yahoo.com','zui1000@gmail.com')").explain(true) to get the best performance

but in Spark, I CANNOT do the subquery with in
sqlContext.sql("select * from poc2 where Email in (select Email from r_1k)").explain(true)

Yong

velvia added a commit that referenced this issue Apr 6, 2016
Keep the iterator reader in the original Spark worker thread, use a Queue
@velvia
Copy link
Member

velvia commented Apr 7, 2016

Yong,

I actually cannot tell by the physical plan alone whether there is a pushdown, can you? Instead I rely on the logs. If you set log to INFO, at least for filodb.spark logger in your log4j.properties, you will see the pushdown like this:

16/04/06 14:02:24 INFO FiloRelation$: Incoming filters = List()
16/04/06 14:02:24 INFO FiloRelation$: Matching partition key col name / pos / keyType: Map()
16/04/06 14:02:24 INFO FiloRelation$: Filters by position: List()
16/04/06 14:02:24 INFO FiloRelation$: Using default filtering function

(The above is for a query with no predicates. If there are working predicates the Incoming Filters will have filled out, and filters by position will show it being pushed down.

So what are you trying to accomplish and can you show me the raw SQL instead of the Spark DF DSL? Ie you want to look up all the emails that contain some attribute in the r_1k table? I guess that would be expensive as there is no predicate pushdown that would really work. You would be better off to do a two step query, like

val emails = r1kDF.select(……)
val poc = sql.sql(s“SELECT email, blah, blah FROM poc2 WHERE email IN (${emails.toList.mkString(‘, ‘)})”)

The kind of sub-second JOIN would be of this form:

     SELECT t1.col1, t2.col2
     FROM small_table t1, big_table t2
     WHERE t2.a = t1.a AND (t1.a = 'XYZ' AND t2.a = 'XYZ')

where a is a partition key.

On Apr 5, 2016, at 1:07 PM, Yong Zhang notifications@github.com wrote:

I created a separate issue about Ingesting data (#86 #86), we can talk about problem I faced about ingesting there. For this, I would like to discuss with you about the query optimization.

Right now, I understand better about how the FiloDB storing the data. The question now is how pushdowns can be sent to FiloDB reliably in Spark.
For example, I still use first 2 characters of email as partition key, domain as segment key. So in any query, with Email equals + Domain <= and >= is the best query performance I can get. That is my goal, but this pushdown doesn't work in Spark. I show you as examples below, and running them in memory storage on my local laptop. So I have 2 DFs, poc2 with 1M emails + attributes, and r_1k is 1k emails + domains ONLY:

poc2.filter(poc2("Email") === "vgd999@yahoo.com mailto:vgd999@yahoo.com").explain(true)
== Physical Plan ==
Filter (Email#10 = vgd999@yahoo.com mailto:vgd999@yahoo.com)
Scan FiloRelation(poc2,0,4)[RestrictedDomain#8,RestrictedAddress#9,Email#10,GlobalStop#11,RoleAddress#12,Domain#13,Active#14]

This is good, as the filter is pushed down to FiloDB. So in cluster, I also get the sub second latency.
Same as this:

scala> poc2.filter(poc2("Email") === "vgd999@yahoo.com mailto:vgd999@yahoo.com" && poc2("Domain") >= "yahoo.com" && poc2("Domain") <= "yahoo.com").explain(true)
== Physical Plan ==
Filter (((Email#10 = vgd999@yahoo.com mailto:vgd999@yahoo.com) && (Domain#13 >= yahoo.com)) && (Domain#13 <= yahoo.com))
Scan FiloRelation(poc2,0,4)[RestrictedDomain#8,RestrictedAddress#9,Email#10,GlobalStop#11,RoleAddress#12,Domain#13,Active#14]

So the above query plan is my goal. But if I have 1k data also in another spark df (not in filoDB), let's say in HDFS or other storage:

scala> r_1k.count
res3: Long = 1001

scala> r_1k.printSchema
root
|-- Email: string (nullable = true)
|-- Domain: string (nullable = true)

First, without broadcast:
scala> poc2.join(r_1k,poc2("Email") === r_1k("Email") && poc2("Domain") >= r_1k("Domain") && poc2("Domain") <= r_1k("Domain"),"inner").explain(true)
== Physical Plan ==
Filter ((Domain#13 >= Domain#21) && (Domain#13 <= Domain#21))
SortMergeJoin [Email#10], [Email#20]
ExternalSort [Email#10 ASC], false
Exchange hashpartitioning(Email#10)
Scan FiloRelation(poc2,0,4)[RestrictedDomain#8,RestrictedAddress#9,Email#10,GlobalStop#11,RoleAddress#12,Domain#13,Active#14]
ExternalSort [Email#20 ASC], false
Exchange hashpartitioning(Email#20)
Scan CsvRelation(,Some(/home/yzhang/data/list_1k.csv),true,,,",null,#,PERMISSIVE,COMMONS,false,false,false,null,true,null,,null)[Email#20,Domain#21]

You can see Spark use "SortMergeJoin", which is a VERY expensive way for 100M + 1k join. That is the reason almost killing me when I test this on cluster. More important, you can see the Email (Partition key) is NOT part of the filter any more. In this case, all the C* node will be queried about this 1k data, even though it could be just small part of the nodes really contain the data of these 1k emails.

With broadcast, the plan is following:
scala> poc2.join(broadcast(r_1k),poc2("Email") === r_1k("Email") && poc2("Domain") >= r_1k("Domain") && poc2("Domain") <= r_1k("Domain"),"inner").explain(true)
== Physical Plan ==
Filter ((Domain#13 >= Domain#21) && (Domain#13 <= Domain#21))
BroadcastHashJoin [Email#10], [Email#20], BuildRight
Scan FiloRelation(poc2,0,4)[RestrictedDomain#8,RestrictedAddress#9,Email#10,GlobalStop#11,RoleAddress#12,Domain#13,Active#14]
Scan CsvRelation(,Some(/home/yzhang/data/list_1k.csv),true,,,",null,#,PERMISSIVE,COMMONS,false,false,false,null,true,null,,null)[Email#20,Domain#21]

So Spark goes with BroadcastHashJoin. Even it comes with cost of broadcasting, but it is much better than the SortMerge in this case. That is why I originally found out I have to use "broadcast".

It is very difficult keeping filtering by partition keys in the join, even though that is the best way for filoDB. I have to come out sql like following:
sqlContext.sql("select * from poc2 where Email in ('vgd999@yahoo.com mailto:vgd999@yahoo.com','zui1000@gmail.com mailto:zui1000@gmail.com')").explain(true) to get the best performance

but in Spark, I CANNOT do the subquery with in
sqlContext.sql("select * from poc2 where Email in (select Email from r_1k)").explain(true)

Yong


You are receiving this because you commented.
Reply to this email directly or view it on GitHub #84 (comment)

velvia pushed a commit that referenced this issue Feb 14, 2018
* feat(coordinator): singleton ShardMapper recovery infrastructure; remove shardsPerCoord state from AssignmentStrategy
* feat(coordinator): make ShardAssignmentStrategy truly stateless/idempotent; add specific unit test
* fix(coordinator): fix bug when adding coordinator which already has shards assigned
* Silence console logging for multi-JVM tests.  Can be easily re-enabled.
* feat(coordinator): Incorporate Guardian-based shard/subscription state recovery for ClusterSingleton

Changes from Helena's previous implementation:
- Instead of the NodeClusterActor telling its local Guardian to subscribe to everything, now each Guardian,
  on every node, subscribes to everything, thus guaranteeing every node will have the backup
- The ShardCoordinatorActor sends an initial ShardMapper snapshot to every subscriber
- API/mechanism to recover the current snapshots and subscriptions from the Guardian
- unit test to test recovery mechanism

* Update doc on sharding and state recovery
* Do not start ingestion stream if it is already running (IngestionActor)
* CR from Helena

Turn on env var LOG_AKKA_TO_CONSOLE if you want detailed test output for multi-jvm and other coordinator tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants