Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do we have support foreachbatch sinks #58

Closed
HariprasadAllaka1612 opened this issue Nov 1, 2019 · 1 comment
Closed

Do we have support foreachbatch sinks #58

HariprasadAllaka1612 opened this issue Nov 1, 2019 · 1 comment

Comments

@HariprasadAllaka1612
Copy link

HariprasadAllaka1612 commented Nov 1, 2019

Hi,

I am using this repo to build structured streaming kinesis data streams using spark 2.4.0.

I wanted to understand if we have support for foreachbatch sinks.

https://spark.apache.org/docs/2.4.0/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

To explain further,

My use case is

  1. read the stream from kinesis
  2. Batch the data using foreachBatch and then write the data into S3.
  3. Should be using S3 as checkpoint location but using HDFS is also possible,

The problem i am facing here is, when i am using foreachbatch the shard-commits folder is not getting updated for each batch, its not writing anything more than shardt-commit/0

attached is the checkpoint dump file its generating when writing to local.
The behaviour is same when i am writing the checkpoint to S3 as well.

checkpoint.zip

Sample code is below

''' val roundStreamDf = sc.readStream
.format("kinesis")
.option("streamName",streamName)
.option("endpointUrl",s"https://kinesis.${region}.amazonaws.com")
.option("awsAccessKeyId", acceskeyId)
.option("awsSecretKey", secretAccessKey)
.option("startingposition",START_POS)
.option("kinesis.client.describeShardInterval","3600")
.option("kinesis.client.avoidEmptyBatches",true)
.load()
val roundStreamIntermediateDf = roundStreamDf
.selectExpr("cast (data as STRING) jsonData")
.select(from_json(col("jsonData"), ScalaReflection.schemaFor[Round].dataType.asInstanceOf[StructType]).as("round"))
.select("round.*")
roundStreamIntermediateDf
.writeStream
.foreachBatch{(batchDf: DataFrame, batchId: Long ) =>
val RoundDf = commonRoundDataProcess(batchDf)
RoundDf.persist()
while(opNameIterator.hasNext){
val opName = opNameIterator.next()
val finalRoundDf = RoundDf.filter(col("OperatorShortName") === opName)
if(!finalRoundDf.head(1).isEmpty){ accessDataS3.writeDataToRefinedHudiS3(sc,finalRoundDf,extraOptions,opName,ROUND_TYPE_OF_DATA)
}
}
RoundDf.unpersist()
}
.outputMode("Update")
.trigger(Trigger.ProcessingTime("60 seconds"))
.option("checkpointLocation",s"${checkPointBucket}/checkpoint/${ROUND_TYPE_OF_DATA}/")
.start()
.awaitTermination()'''

Th code basically read the stream and have some processes that needs to be carried out in foreachBatch Sink and then be pushed to S3.

attached is the error log.
error.log

I have tried increasing the number of attempts to 20 thiniking it might be a problem with EC of S3 but when tried on my local or HDFS I have the same problem

Quick help will be much appreciated.

@itsvikramagr
Copy link
Contributor

This does not look like the problem with EC.

Foreachbatch might have added some new construct which might be impacting the job. you would like to look at the executor's log to debug it.

This issue is duplicate of #57. Closing this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants