Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka-value-parser #29

Merged
merged 1 commit into from
Nov 25, 2021
Merged

kafka-value-parser #29

merged 1 commit into from
Nov 25, 2021

Conversation

sworduo
Copy link
Contributor

@sworduo sworduo commented Nov 23, 2021

issue #8
import data from kafka to nebula

@codecov-commenter
Copy link

codecov-commenter commented Nov 24, 2021

Codecov Report

Merging #29 (393136f) into master (389bead) will decrease coverage by 0.20%.
The diff coverage is 11.76%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master      #29      +/-   ##
============================================
- Coverage     28.24%   28.04%   -0.21%     
  Complexity        6        6              
============================================
  Files            24       24              
  Lines          2078     2111      +33     
  Branches        388      396       +8     
============================================
+ Hits            587      592       +5     
- Misses         1389     1412      +23     
- Partials        102      107       +5     
Impacted Files Coverage Δ
...cala/com/vesoft/nebula/exchange/ErrorHandler.scala 0.00% <0.00%> (ø)
...in/scala/com/vesoft/nebula/exchange/Exchange.scala 2.07% <0.00%> (-0.05%) ⬇️
...t/nebula/exchange/reader/StreamingBaseReader.scala 0.00% <0.00%> (ø)
...la/com/vesoft/nebula/exchange/config/Configs.scala 63.65% <50.00%> (-0.54%) ⬇️
...cala/com/vesoft/nebula/exchange/MetaProvider.scala 30.15% <0.00%> (-1.33%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 389bead...393136f. Read the comment docs.

class KafkaReader(override val session: SparkSession, kafkaConfig: KafkaSourceConfigEntry)
class KafkaReader(override val session: SparkSession,
kafkaConfig: KafkaSourceConfigEntry,
fields: List[String])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please distinct the fields before using it.

.selectExpr("CAST(value AS STRING)")
.as[(String)]
.withColumn("value", from_json(col("value"), jsonSchema))
.select("value.*")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we need alias the dataframe's column name to name in fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had ever print the col name which is the name of fields. It's workable in my machine..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i tested it and the schema is the same with fields. Great work~

@@ -172,7 +173,8 @@ object Exchange {
LOG.info(s"field keys: ${fieldKeys.mkString(", ")}")
val nebulaKeys = edgeConfig.nebulaFields
LOG.info(s"nebula keys: ${nebulaKeys.mkString(", ")}")
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry)
val fields = edgeConfig.sourceField::edgeConfig.targetField::edgeConfig.fields
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edgeConfig.rankField should also be added.

batch: 10
interval.seconds: 10
}
# {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please roll back these changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new check is added in the config parse that an exception would be throw if any other config define after kafka, see Config.scala. However, there is two kafka defined in the application.conf. If I don't comment this section, the test would not pass.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll split the config of Kafka out and use a single config file for Kafka later.

Copy link
Contributor

@Nicole00 Nicole00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great PR ~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc affected PR: improvements or additions to documentation
Projects
Development

Successfully merging this pull request may close these issues.

None yet

3 participants