Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]Fix the error "Field “xxx” does not exist." when data is imported from Kafka to Nebula Graph via Nebula Exchange #8

Closed
jamieliu1023 opened this issue Sep 2, 2021 · 13 comments
Milestone

Comments

@jamieliu1023
Copy link

The bug was reported by a user on the Chinese forum: https://discuss.nebula-graph.com.cn/t/topic/2623/

For those who cannot understand Chinese well, please refer to the title of the issue for a basic background. The reason why the error occurs is that Nebula Exchange is not able to parse the "value" field of the data in Kafka.

Committer @guojun85 is working on this now. Thanks for his contribution in advance!

@guojun85
Copy link

guojun85 commented Sep 9, 2021

We are working on this issue, the fix plan is:

  1. value should be json format:
    {
    "field1": "value1",
    "field2": "value2",
    "field3": "value3"
    }
  2. We will parse value and get all the fields to match the fields in the configuration file.

@jamieliu1023
Copy link
Author

@Nicole00

@Nicole00
Copy link
Contributor

We are working on this issue, the fix plan is:

  1. value should be json format:
    {
    "items": [
    { "field1": "value1"},
    { "field2":"value2"},
    { "field3":"value3"}
    ]
    }
  2. We will parse value and get all the fields to match the fields in the configuration file.

Thanks very mach for your solution. There are two small questions we want to discuss with you:

  1. how to config the fields in the configuration file in your plan
  2. can we get the keywords field(key, offset, topic and so on) of kafka yet

@guojun85
Copy link

guojun85 commented Sep 10, 2021

We are working on this issue, the fix plan is:

  1. value should be json format:
    {
    "items": [
    { "field1": "value1"},
    { "field2":"value2"},
    { "field3":"value3"}
    ]
    }
  2. We will parse value and get all the fields to match the fields in the configuration file.

Thanks very mach for your solution. There are two small questions we want to discuss with you:

  1. how to config the fields in the configuration file in your plan
  2. can we get the keywords field(key, offset, topic and so on) of kafka yet
  1. how to config the fields in the configuration file in your plan
    【guojun】:Change configuration file: https://github.com/vesoft-inc/nebula-spark-utils/blob/master/nebula-exchange/src/main/resources/application.conf
    --KAFKA
    {
    name: tag-name-7
    type: {
    source: kafka
    sink: client
    }
    service: "kafka.service.address"
    topic: "topic-name"
    fields: [field1, field2, field3]
    nebula.fields: [field1, field2, field3]
    vertex: {
    field: field1
    }
    partition: 10
    batch: 10
    interval.seconds: 10
    }

  2. Don't need others keywords(field(key, offset, topic and so on), only parse value and import fileds in value to nebula fields.

@Nicole00
Copy link
Contributor

That‘s reasonable, Iooking forward to your pr ~

@Nicole00 Nicole00 transferred this issue from vesoft-inc/nebula-spark-utils Oct 18, 2021
@sworduo
Copy link
Contributor

sworduo commented Oct 18, 2021

I am sorry that I can not reply in this pr vesoft-inc/nebula-spark-utils#157. Sorry about my poor English. I am not very sure what "can we just modify the StreamingReader to parse the kafka's value to DataFrame" means. Does it mean we just modify the input logic and try to transform the Kafka data to DataFrame while maintain the process logic as before, i.e. get DataFrame from Kafka source and parse it in vertice/edgeProcessor separately? If we want to implement it, the first question is how to switch vertices/edgeProcessor.
For Kafka source, according to the code, we can not switch to another tag/edge once a tagconfig is chosen. In this case, we can only parse one tag defined in config to nebula.
Let's assuming that the number of tag and edge defined in config is N. One possible solution is that we create N sparkContext and N Kafka streaming reader. With the help of Muti-thread, each tag/edge is processed in one thread with one Kafka consumer separately. However, I don't think it's a good idea. Assuming that partition is P, in this case, there is N * P partition and N * P nebula-writer. Maybe it will throw a socket exception.
In addition, we find a bug in the pr. In the last version, it is only supported string type vid/source/target while the int type vid/source/target maybe throw exception. This is because we call jsonObj.getString() default for vid/source/target. However, for int type vid/source/target, it should call jsonObj.getLong(). It can be easily fixed in the KakfaProcessor.scala everywhere the getAndCheckVID is called.

That‘s reasonable, Iooking forward to your pr ~

@Nicole00
Copy link
Contributor

Yes, my point is we just modify the input logic and try to transform the Kafka data to DataFrame while maintain the process logic as before, i.e. get DataFrame from Kafka source and parse it in vertice/edgeProcessor separately .

  1. how to switch vertex/edge? In one exchange application, we just support one type of tag or edgeType for streaming datasource. I understand your consideration when the number of tags/edges and partition is large.
    Multiple tags or edges configured in one configure file are executed in order, and there's no paralleism between them. So we don't suppose submit all the application for all tags/edges at the meantime, (if do so, the applications may wait for resources of spark or yarn, whose default schedule policy is FIFO ). More likely N is less than 10.

@sworduo
Copy link
Contributor

sworduo commented Oct 22, 2021

That's OK. However, since the Kafka producer produces data all the time typically, it is expected for the Kafka consumer to consume data all the time as well. Hence, when one of the data source of tag/edge defined in configuration is kafak, the nebula-exchange application will only process that tag/edge forever which will not switch to any other tag/edge defined in the same configuration.

Hence, in the new pr, we will make the following restrictions:

  1. One application for one Kafka. So if more than one tag/edge need to be parsed from Kafka, they should be processed in different application.
  2. In order to ensure 1., we will check the configuration at the beginning. If the data source of tag/edge in configuration is Kafka and the number of tag/edge is more than 1, then an exception would be throw. In other words, for Kafka, one configuration should only define one tag or edge.

Implement summary:
We don't need to modify the input logic any more. All we need is to add a new logic in vertices/edgeprocessor.scala that parse data from value field if the data source is Kafka.

Expectation effect:
In this case, if someone need to parse several tag/edge from Kafka, they need to process these tag/edge in different application and re-consume the same Kafka data for number(tag+edge) times. And more nebula writer is necessary at the same time. I still don't thinks it's a good idea, however, in order to maintain the architecture, I think maybe it's ok in this way.

@Nicole00
Copy link
Contributor

Nicole00 commented Nov 5, 2021

So sorry for reply late.
It is look good to me, just two questions:

  1. can we process the kafka's data all in the Reader stage? The architecture commands the DataFrame that has entered VertexProcess/EdgeProcess does not need more process logic.
  2. that's a great idea to check the configuration at the beginning. However, if users' tag config oder is csv,kafka, then it is not allowed? In fact, the datasource csv and kafka can all be imported successfully. Maybe we need to ensure that there's no more other tag/edge config after kafka?

@sworduo
Copy link
Contributor

sworduo commented Nov 5, 2021

  1. I am not very sure. Since the json string is recorded in the value field of Kafka, we have to get Kafka data before parsing json. Maybe we can map the data in reader. But I have no idea whether it can work. If not, we can parse json in the tag/edgeprocessor.
  2. Yes, you're right. We just need to ensure that there is no config after Kafka, which will be detected at the beginning.

@Nicole00
Copy link
Contributor

Nicole00 commented Nov 8, 2021

In the Reader process, we can use the config fields to parse kafka's json value, just like:

assume flieds config is:[id,name,age] (all fields must exist in kafka's value data)

val df = session.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaConfig.server)
      .option("subscribe", kafkaConfig.topic)
      .load()
df.select(get_json_object($"value", "$.id").alias("id"), get_json_object($"value", "$.name").alias("name"), get_json_object($"value", "$.age").alias("age")).show()

Then we get the needed DataFrame from kafka, and other process keep the same with current logic.

@sworduo
Copy link
Contributor

sworduo commented Nov 8, 2021

I get your point. I will try it like df.select(value).map(v => parseJson)

@Sophie-Xie Sophie-Xie added this to the v3.0.0 milestone Nov 9, 2021
@Nicole00
Copy link
Contributor

Nicole00 commented Dec 2, 2021

#29 resolved it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants