Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to write relationship #325

Closed
masonedmison opened this issue Apr 13, 2021 · 4 comments
Closed

Unable to write relationship #325

masonedmison opened this issue Apr 13, 2021 · 4 comments

Comments

@masonedmison
Copy link

masonedmison commented Apr 13, 2021

I am following the documentation to write relationships to aa neo4j db. I am able to write nodes but for relationships I get the following stack trace:

21/04/13 13:59:23 WARN SchemaService: Switching to query schema resolution
21/04/13 13:59:23 WARN SchemaService: For the following exception
org.neo4j.driver.exceptions.ClientException: Unable to convert scala.collection.immutable.Map$Map1 to Neo4j Value.
	at org.neo4j.driver.Values.value(Values.java:134)
	at org.neo4j.driver.internal.util.Extract.mapOfValues(Extract.java:203)
	at org.neo4j.driver.internal.AbstractQueryRunner.parameters(AbstractQueryRunner.java:69)
	at org.neo4j.driver.internal.AbstractQueryRunner.run(AbstractQueryRunner.java:43)
	at org.neo4j.spark.service.SchemaService.retrieveSchemaFromApoc(SchemaService.scala:68)
	at org.neo4j.spark.service.SchemaService.liftedTree2$1(SchemaService.scala:171)
	at org.neo4j.spark.service.SchemaService.structForRelationship(SchemaService.scala:155)
	at org.neo4j.spark.service.SchemaService.struct(SchemaService.scala:262)
	at org.neo4j.spark.DataSource.$anonfun$inferSchema$1(DataSource.scala:41)
	at org.neo4j.spark.DataSource.callSchemaService(DataSource.scala:29)
	at org.neo4j.spark.DataSource.inferSchema(DataSource.scala:41)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils$.getTableFromProvider(DataSourceV2Utils.scala:81)
	at org.apache.spark.sql.DataFrameWriter.getTable$1(DataFrameWriter.scala:339)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:401)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at AdeptxPipeline$.writeRel(AdeptxPipeline.scala:146)
	at AdeptxPipeline$.delayedEndpoint$AdeptxPipeline$1(AdeptxPipeline.scala:94)
	at AdeptxPipeline$delayedInit$body.apply(AdeptxPipeline.scala:5)
	at scala.Function0.apply$mcV$sp(Function0.scala:39)
	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
	at scala.App.$anonfun$main$1$adapted(App.scala:80)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.App.main(App.scala:80)
	at scala.App.main$(App.scala:78)
	at AdeptxPipeline$.main(AdeptxPipeline.scala:5)
	at AdeptxPipeline.main(AdeptxPipeline.scala)
21/04/13 13:59:23 INFO Driver: Closing driver instance 1795925655
21/04/13 13:59:23 INFO ConnectionPool: Closing connection pool towards nlp.raiders-dev.awscloud.abbvienet.com:8092
Exception in thread "main" org.apache.spark.sql.AnalysisException: TableProvider implementation org.neo4j.spark.DataSource cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:402)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
	at AdeptxPipeline$.writeRel(AdeptxPipeline.scala:146)
	at AdeptxPipeline$.delayedEndpoint$AdeptxPipeline$1(AdeptxPipeline.scala:94)
	at AdeptxPipeline$delayedInit$body.apply(AdeptxPipeline.scala:5)
	at scala.Function0.apply$mcV$sp(Function0.scala:39)
	at scala.Function0.apply$mcV$sp$(Function0.scala:39)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
	at scala.App.$anonfun$main$1$adapted(App.scala:80)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.App.main(App.scala:80)
	at scala.App.main$(App.scala:78)
	at AdeptxPipeline$.main(AdeptxPipeline.scala:5)
	at AdeptxPipeline.main(AdeptxPipeline.scala)

Here is the dataframe schema I am trying to write:

root
 |-- NCT_ID: string (nullable = true)
 |-- ID: string (nullable = true)

and my code:

  finalSiteStudyDF.write
    .format("org.neo4j.spark.DataSource")
    .option("url", NEO4JURL)
    .option("authentication.type", "basic")
    .option("batch.size", 2500)
    .option("transaction.retries", "10")
    .option("relationship.save.strategy", "keys")
    .option("authentication.basic.username", "neo4j")
    .option("authentication.basic.password", "zeppelin")
    .option("relationship", "HAS_SITE")
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Study")
    .option("relationship.source.node.keys", "NCT_ID")
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Site")
    .option("relationship.target.node.keys", "ID")

I have also tried using "Overwrite" as the save.mode with the same result.

The expected behavior is of course for this to write my relationship edges.

Scala version = 2.12.10
Spark version = 3.1.1
Neo4j jar connector included in spark jars and in SBT file like:
"neo4j-contrib" %% "neo4j-connector-apache-spark" % "4.0.1_for_spark_3"

Kind of stumped here, what could be the issue?

@masonedmison
Copy link
Author

bump

@utnaf
Copy link
Contributor

utnaf commented Apr 16, 2021

Hi @masonedmison

The error says

Exception in thread "main" org.apache.spark.sql.AnalysisException: TableProvider implementation org.neo4j.spark.DataSource cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead.

You can find more about this issue on the official docs

So I would try this:

  finalSiteStudyDF.write
    .mode("Append") // <-- added this
    .format("org.neo4j.spark.DataSource")
    .option("url", NEO4JURL)
    .option("authentication.type", "basic")
    .option("batch.size", 2500)
    .option("transaction.retries", "10")
    .option("relationship.save.strategy", "keys")
    .option("authentication.basic.username", "neo4j")
    .option("authentication.basic.password", "zeppelin")
    .option("relationship", "HAS_SITE")
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Study")
    .option("relationship.source.node.keys", "NCT_ID")
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Site")
    .option("relationship.target.node.keys", "ID")

Let me know!

@masonedmison
Copy link
Author

Bah! I figured it would be something simple like this ha! It works now, thanks so much! I would suggest to add this aditional paramter to the documentation - I at least do not see it present https://neo4j.com/developer/spark/writing/#bookmark-write-rel

Thanks again for your time and for developing this very awesome module!

@utnaf
Copy link
Contributor

utnaf commented Apr 16, 2021

All good! I think we need to make it more clear 😄 Our plan is actually to support ErrorIfExists... but we are having some troubles getting through that. Hopefully it will be done in the near future.

@utnaf utnaf closed this as completed Apr 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants