Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encounter error "Unable to activate object" when there are multiple threads / concurrent tasks in Spark #5784

Closed
sparkle-apt opened this issue Dec 15, 2023 Discussed in #5783 · 22 comments
Labels
type/question Type: question about the product

Comments

@sparkle-apt
Copy link

sparkle-apt commented Dec 15, 2023

Settings

Hardware and software overview

NebulaGraph Database deployment

  • Deployment: installed in the single-node mode on an EC2 machine
  • Nebula version: 3.6.0
  • Nebula Spark Connector version: nebula-spark-connector_3.0-3.0-SNAPSHOT
  • Spark version: 3.3.3
  • Installation method: via RPM
  • Storage: 600G
  • vCPU: 8
  • Memory: 64G
  • OS: Linux

Computation cluster

  • Computation cluster uses an EMR cluster
  • EC2 machine and EMR cluster can connect to each other
  • Spark version: 3.1.2

Graph data

  • Number of nodes: 289418552
  • Number of edges: 303938330
  • Partition number: 20

Others

  • Already added public IP of the EC2 machine to hosts

Issues

When trying scanning full graph data such as count() as shown in the snippet below on the EMR machine, we encountered Unable to activate object error.
Snippet:

import org.apache.spark.sql.DataFrame
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}

val ec2_public_ip = "xx.xx.xx.xx"

val config = NebulaConnectionConfig.builder().withMetaAddress(s"${ec2_public_ip}:9559").withConnectionRetry(2).build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig.builder().withSpace("acct2asset_20231130").withLabel("USES").withNoColumn(false).withReturnCols(List()).withPartitionNum(10).build()
val dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()

dataset.count()

Error log:

23/12/15 07:59:57 WARN TaskSetManager: Lost task 1.0 in stage 10.0 (TID 103) (xx.xx.xx.xx executor 23): com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
	at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
	at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
	at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
	at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
	at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
	at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

23/12/15 07:59:57 ERROR TaskSetManager: Task 1 in stage 10.0 failed 4 times; aborting job
23/12/15 07:59:57 WARN TaskSetManager: Lost task 2.3 in stage 10.0 (TID 141) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 7.3 in stage 10.0 (TID 136) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 5.3 in stage 10.0 (TID 133) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 3.3 in stage 10.0 (TID 135) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 4.3 in stage 10.0 (TID 139) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 0.3 in stage 10.0 (TID 140) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 9.3 in stage 10.0 (TID 134) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 6.3 in stage 10.0 (TID 137) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
23/12/15 07:59:57 WARN TaskSetManager: Lost task 8.3 in stage 10.0 (TID 138) (xx.xx.xx.xx executor 23): TaskKilled (Stage cancelled)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 10.0 failed 4 times, most recent failure: Lost task 1.3 in stage 10.0 (TID 132) (xx.xx.xx.xx executor 23): com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
	at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
	at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
	at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
	at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
	at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
	at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2470)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2419)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2418)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2418)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1125)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1125)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1125)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2684)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2626)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2615)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:147)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:204)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:203)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:425)
  at org.apache.spark.sql.Dataset.$anonfun$count$1(Dataset.scala:3047)
  at org.apache.spark.sql.Dataset.$anonfun$count$1$adapted(Dataset.scala:3046)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3751)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3749)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:3046)
  ... 49 elided
Caused by: com.vesoft.nebula.client.meta.exception.ExecuteFailedException: Execute failed: no parts succeed, error message: Unable to activate object
  at com.vesoft.nebula.client.storage.scan.ScanResultIterator.throwExceptions(ScanResultIterator.java:100)
  at com.vesoft.nebula.client.storage.scan.ScanEdgeResultIterator.next(ScanEdgeResultIterator.java:142)
  at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow(NebulaReader.scala:263)
  at com.vesoft.nebula.connector.reader.NebulaReader.hasNextEdgeRow$(NebulaReader.scala:221)
  at com.vesoft.nebula.connector.reader.NebulaPartitionReader.hasNextEdgeRow(NebulaPartitionReader.scala:17)
  at com.vesoft.nebula.connector.reader.NebulaEdgePartitionReader.next(NebulaEdgePartitionReader.scala:14)
  at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
  at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:35)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:907)
  at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:133)
  at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
  at org.apache.spark.scheduler.Task.run(Task.scala:131)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:750)

However, we can successfully run the following and get results on EMR machine.

dataset.show()

We also tested scripts involving different volumes of the graph data. When val n_limit = 1000000, we can successfully run the following (which is a modified snippet from nebula-algorithm package):

val ORIGIN_ID_COL = "id"
val fieldNames         = dataset.schema.fieldNames
val n_limit = 1000000
val (srcName, dstName) = (fieldNames(0), fieldNames(1))
val srcIdDF: DataFrame = dataset.select(srcName).withColumnRenamed(srcName, ORIGIN_ID_COL).limit(n_limit)
val dstIdDF: DataFrame = dataset.select(dstName).withColumnRenamed(dstName, ORIGIN_ID_COL).limit(n_limit)
val idDF               = srcIdDF.union(dstIdDF).distinct()
idDF.show()

However, when we increase to val n_limit = 10000000, it failed and we got the same Unable to activate object error.

What we found so far

With more tests going on, we found that when number of all threads / concurrent tasks is 1, there would not be such error, whereas when number of threads is greater than 1, the error appears. We are suspecting that there is certain constraint of NebulaGraph Database and wondering whether proper configuration tuning could help.

Could you please help with this issue? Feel free to let me know if I need provide more information. Thanks a lot!

@sparkle-apt sparkle-apt changed the title Encounter error "Unable to activate object" when scanning full graph data Encounter error "Unable to activate object" when there are multiple threads / concurrent tasks Dec 15, 2023
@sparkle-apt sparkle-apt changed the title Encounter error "Unable to activate object" when there are multiple threads / concurrent tasks Encounter error "Unable to activate object" when there are multiple threads / concurrent tasks in Spark Dec 15, 2023
@QingZ11 QingZ11 added the type/question Type: question about the product label Dec 18, 2023
@QingZ11
Copy link
Contributor

QingZ11 commented Dec 18, 2023

In the previous post mentioned here: https://discuss.nebula-graph.com.cn/t/topic/9726, zhang_hytc encountered the same issue as you did. You can try the following steps:

First, execute the show hosts command in the nebula-console. This command displays the addresses of the storaged services exposed by the NebulaGraph metad service.

Next, confirm whether you can establish a connection from your local environment to the storaged addresses exposed by the metad service.

@sparkle-apt
Copy link
Author

Thanks @QingZ11 for your prompt response.
I confirm that the address of the storaged service exposed by the metad service is the public IP address of the storaged service as shown below (The IP of the storaged service is masked due to sensitivity).
show_hosts
And I confirm that I can establish a connection from my EMR cluster to the storaged addresses as shown below (The IP of the storaged service is masked due to sensitivity).
telnet

The issue we encounter is not that we cannot connect to the storaged service under any circumstances. Instead, the problem is that we encounter the error when total-executor-cores is greater than 4. This greatly limits our efficient usage of the graph database for our use cases.
Could you please help look into the issue and share insights that help us address it? Thank you!

@Nicole00
Copy link
Contributor

please make sure all the spark workers can ping the storaged address.

@sparkle-apt
Copy link
Author

Thanks @Nicole00 for reminder. I confirm that all Spark workers and the storaged service are within the same VPC network and their ports are connected.

@sparkle-apt
Copy link
Author

I've taken the initiative to do some preliminary checks, but so far, those have not led to a resolution. To proceed further and more effectively troubleshoot the issue, could you advise me on the following:
Log Files: Which specific log files or which specific information in logs should I review that may contain error messages or indicators related to this issue?
Configuration Files: Are there any configuration settings that I should inspect or tweak that might be relevant to this problem?
Diagnostic Tools/Commands: Are there tools or commands available to gather more diagnostic information?
If you require additional information or context from my end, please let me know, and I'll be sure to provide it. Thanks!

@sparkle-apt
Copy link
Author

In addition, we have observed weird behavior in another test, which is to connect to the database and count number via spark-shell.

spark-shell --master yarn --deploy-mode client --driver-memory=2G --executor-memory=2G  --num-executors=2 --executor-cores=2 --conf spark.dynamicAllocation.enabled=false --jars nebula-spark-connector_3.0-3.0-SNAPSHOT-jar-with-dependencies.jar

We run again the following snippet

import org.apache.spark.sql.DataFrame
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}

sc.setLogLevel("INFO")
val ec2_public_ip = "xx.xx.xx.xx"

val config = NebulaConnectionConfig.builder().withMetaAddress(s"${ec2_public_ip}:9559").withConnectionRetry(2).build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig.builder().withSpace("acct2asset_20231130").withLabel("USES").withNoColumn(false).withReturnCols(List()).withPartitionNum(20).build()
val dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
dataset.show()
dataset.count()

The first four tasks raised "Unable to activate object" the error while the following ones did not.
Screenshot 2023-12-20 at 11 10 21
Screenshot 2023-12-20 at 11 10 02
We are concerned about this unstable and unexpected behavior and looking forward to your suggestion. Thanks!
@Nicole00 @QingZ11 @wey-gu

@Nicole00
Copy link
Contributor

Nicole00 commented Dec 21, 2023

So wired! Does the first four tasks are located in the different machines with the other tasks?
Can you sure the telnet storaged_host_ip 9779 is ok for all the spark workers?

@sparkle-apt
Copy link
Author

@Nicole00 Yes, these tasks all run on the same single machine where storaged service is and I confirm telnet storaged_host_ip 9779 returns Connected to storaged_host_ip.

@Nicole00
Copy link
Contributor

@Nicole00 Yes, these tasks all run on the same single machine where storaged service is and I confirm telnet storaged_host_ip 9779 returns Connected to storaged_host_ip.

Really wired. If the tasks are all run on ONE SAME single machine, looks like the storaged server is not ready at 10:59:00, but ready at 11:01:09.

@Nicole00
Copy link
Contributor

could you please provide some log information for nebula storaged?

@sparkle-apt
Copy link
Author

could you please provide some log information for nebula storaged?

Sure, could you please let me know what minloglevel and v is needed in log settings so that I could provide logs that help.

@Nicole00
Copy link
Contributor

You can config minloglevel as 0 and config v as 3 for more detailed info.
https://docs.nebula-graph.io/3.6.0/5.configurations-and-logs/2.log-management/logs/#parameter_descriptions

@sparkle-apt
Copy link
Author

sparkle-apt commented Dec 21, 2023

The logging is configured so that minloglevel is 0 and v is 3. When rerunning the snippet of counting, we however did not observe any error. Instead, the running tasks seemed to be stuck without any task finished while it seems that data is being fetched slowly according to the log, which is abnormal. It seems to me the intensive logging may impact the performance of fetching data. As the full log is large, we truncated it to contain the top and representative information which is attached.
nebula-storaged-v3.txt
When v is reset to 0, we observed the same error again. However, no warning / error is found in the storaged log. We are getting more confused and not sure if these pieces of info help. Please let me know if you require additional information or context from my end.

@sparkle-apt
Copy link
Author

@Nicole00 btw following some random thought, we found tons of TCP connection with TIME_WAIT state when running the code below:

import org.apache.spark.sql.DataFrame
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}

sc.setLogLevel("INFO")
val ec2_public_ip = "xx.xx.xx.xx"

val config = NebulaConnectionConfig.builder().withMetaAddress(s"${ec2_public_ip}:9559").withConnectionRetry(2).build()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig.builder().withSpace("acct2asset_20231130").withLabel("USES").withNoColumn(false).withReturnCols(List()).withPartitionNum(20).build()
val dataset = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
dataset.count()

There are around 21k TCP connections:

(base) [ec2-user@xx.xx.xx.xx packages]$ netstat -a | grep -cE ':9779.*TIME_WAIT'
21494

Is this expected?

@Nicole00
Copy link
Contributor

Sorry for reply late.
Theoretically the connections to storaged will be 20(partitionNum) * (number of sotraged instance).

I checked the connection leak problem for the connector, the storageClient will be closed after one partition finish its task and the connectionPool inside can also be closed when storageClient is closed.

How many data in your USES label?

@sparkle-apt
Copy link
Author

@Nicole00 No worries! There are 303938330 USES edges in the space.

@Nicole00
Copy link
Contributor

OK, I'll make a test to see if there any connection leak. And at the mean time, maybe you can update your nebula-spark-connector to the latest version.

@sparkle-apt
Copy link
Author

A bit summary of what have been observed so far:

  1. Generally encountered error Unable to activate object when loading graph with number of total nodes greater than 4, observed both on the local and EMR cluster;
  2. Can successfully load with number of total nodes no greater than 4, observed both on the local and EMR cluster;
  3. Observed once that when loading graph the first four tasks raised "Unable to activate object" the error while the following ones did not and successfully finished;
  4. When logging configured at minloglevel = 0 and v = 3, i.e., large amount of logs written to disk, loading graph with number of total nodes greater than 4 can succeed, much much slower though;
  5. There are tons of TCP connection with TIME_WAIT state when loading graph on local

@Nicole00 Do you have any other ideas taking these into consideration? Anything we could try to increase parallelism when reading graph?

@Nicole00
Copy link
Contributor

I really cannot reproduce your problem.
I run the connector in local spark cluster with both 5 nodes and 1 node, and can read nebula's data successfully.

I still think it's a network problem.

@Nicole00
Copy link
Contributor

Nicole00 commented Jan 24, 2024

This question came up to me very accidentally, it's about the port amount.
maybe you can try according to the post. https://blog.csdn.net/gltncx11/article/details/122068479
@sparkle-apt

@QingZ11
Copy link
Contributor

QingZ11 commented Feb 18, 2024

@sparkle-apt hi, I have noticed that the issue you created hasn’t been updated for nearly a month, is this issue been resolved? If not resolved, can you provide some more information? If solved, can you close this issue?

Thanks a lot for your contribution anyway 😊

@sparkle-apt
Copy link
Author

We have decided to not be blocked by this issue for the moment and move forward with other projects and test in larger clusters. We will get back to it when bandwidth allows. So we can close the issue. Thanks for the reminder.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/question Type: question about the product
Projects
None yet
Development

No branches or pull requests

3 participants