Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamingLens Insights always showing "Streaming Query State: NONEWBATCHES" in Logs. #5

Open
rpatid10 opened this issue Sep 30, 2021 · 11 comments

Comments

@rpatid10
Copy link

rpatid10 commented Sep 30, 2021

Hi All,

I am using StreamingLens in my spark structure streaming application but it's always showing same logs .BatchId is getting updated but Streaming Query State: NONEWBATCHES remains same.
can someone suggest why the State and recommendations are not updating in logs.

|||||||||||||||||| StreamingLens Insights |||||||||||||||||||||||||
BatchId: 344
Analysis Time: 00s 000ms
Expected Micro Batch SLA: 120s 000ms
Batch Running Time: 00s 000ms
Critical Time: 00s 000ms
Streaming Query State: NONEWBATCHES
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

21/10/01 15:50:04 WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription(e68c3c2c-6d5f-469e-864a-)

Spark Submit Command:

spark-submit
--verbose
--name SparkStreamingLens
--num-executors 1
--conf streamingLens.reporter.intervalMinutes=1
--jars /home/abc/jars/spark-streaminglens_2.11-0.5.3.jar,
/home/abc/jars/kafka-clients-0.10.2.1.jar,
--master yarn
--deploy-mode cluster
--driver-cores 1 --driver-memory 2G --executor-cores 1 --executor-memory 2G
--supervise --class com.data.datalake.SparkStreamingLens
/home/abc/jar/SparkStreamingLens-spark-utility_2.11-1.0.jar

@abhishekd0907 @itsvikramagr @shubhamtagra @jsensarma @mjose007 @akumarb2010 @itsvikramagr @Indu-sharma
@akumarb2010 @iamrohit @beriaanirudh @mayurdb @michaelmior @rishitesh @emlyn @vrajat @fdemesmaeker @indit-qubole Kindly Suggest.

Kindly Guide if is there anything needs to change here.

https://github.com/qubole/streaminglens/blob/master/src/main/scala/com/qubole/spark/streaminglens/common/results/AggregateStateResults.scala

https://github.com/qubole/streaminglens/blob/master/src/main/scala/com/qubole/spark/streaminglens/common/results/StreamingCriticalPathResults.scala

As in Project (com.qubole.spark.streaminglens.QueryInsightsManager) below code is available to fetch the insights.

| |||||||||||||||||| StreamingLens Inisights |||||||||||||||||||||||||
| BatchId: ${results.batchId}
| Analysis Time: ${pd(results.analysisTime)}
| Expected Micro Batch SLA: ${pd(results.streamingCriticalPathResults.expectedMicroBatchSLA)}
| Batch Running Time: ${pd(results.streamingCriticalPathResults.batchRunningTime)}
| Critical Time: ${pd(results.streamingCriticalPathResults.criticalTime)}
| Streaming Query State: ${results.streamingCriticalPathResults.streamingQueryState.toString}
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""".stripMargin)

Here we are taking all the details from streamingCriticalPathResults and here only code available for NONEWBATCH State

case class StreamingCriticalPathResults(expectedMicroBatchSLA: Long = 0,
batchRunningTime: Long = 0,
criticalTime: Long = 0,
streamingQueryState: StreamingState.Value = StreamingState.NONEWBATCHES)

Also in com.qubole.spark.streaminglens.common.results AggregateStateResults.scala below code is available.

**package com.qubole.spark.streaminglens.common.results

case class AggregateStateResults(state: String = "NO NEW BATCHES",

                         recommendation: String = "Streaming Query State: NO NEW BATCHES<br>")**

KIndly Suggest.

@rpatid10
Copy link
Author

rpatid10 commented Oct 2, 2021

Someone kindly help. Or suggest if is thr any other support channel is available for the same.
I.e. slack channel.

@rpatid10
Copy link
Author

rpatid10 commented Oct 3, 2021

@abhishekd0907 kindly help.

@abhishekd0907
Copy link
Collaborator

Can you attach full driver logs?

@rpatid10
Copy link
Author

rpatid10 commented Oct 3, 2021

@abhishekd0907 I Have attached the Log.Kindly Check.

@abhishekd0907 let me know if I need to share any other logs also.

Thanks.

@abhishekd0907
Copy link
Collaborator

Streaminglens was build and tested with Spark 2.4 Application is using Spark 2.2 There has been change in some internal APIs between Spark 2.2 and Spark 2.4 which Streaminglens uses, so present code is not working with Spark 2.2 and leading to following error.

21/10/01 15:50:04 WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription(e68c3c2c-6d5f-469e-864a-5d353d6e4bc2,2)

@rpatid10
Copy link
Author

rpatid10 commented Oct 3, 2021

Streaminglens was build and tested with Spark 2.4 Application is using Spark 2.2 There has been change in some internal APIs between Spark 2.2 and Spark 2.4 which Streaminglens uses, so present code is not working with Spark 2.2 and leading to following error.

21/10/01 15:50:04 WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription(e68c3c2c-6d5f-469e-864a-5d353d6e4bc2,2)

@abhishekd0907 but this warning is not showing when I have removed the checkpoint location.all other details are same when I am removing checkpoint Location .I am attaching Fresh Location with new CheckPoint Location. Kindly Suggest.

@abhishekd0907
Copy link
Collaborator

In the new logs2.txt without checkpoint location, new batches are not even being created. I can see only

21/10/03 12:44:24 INFO HoodieStreamingSink: Micro batch id=0 succeeded

but no logs for micro batches 1,2,3,.... and so on. This is because of removing checkpoint location. Since new batches are not being created, Streaminglens has nothing to analyze so there are no logs,

@rpatid10
Copy link
Author

rpatid10 commented Oct 3, 2021

@abhishekd0907 Okay Thanks a lot .So there Is no other way to Use StreamingLens with Spark 2.2 ryt? or is there any workaround we can do to use StreamingLens with Spark 2.2.? I am trying to do it I will Share some more logs by tomorrow morning
if any new logs shows with Micro batch id value and without this warning "WARN QueryInsightsManager: Streaming Lens failed key not found: BatchDescription".

@rpatid10
Copy link
Author

rpatid10 commented Oct 4, 2021

Hi @abhishekd0907 ,

I am able to Remove this warning ,was getting this warning because my application batch interval was 4 min which was less than the default analysis Interval of StreamingLense i.e. 5 min. So currentTime - lastAnalyzedTimeMills >= streamingLensConfig.analysisIntervalMinutes * 60 * 1000 this condition was giving false as boolean value and 'logWarning(s"Streaming Lens failed " + e.getMessage)' and this line was giving warning.

I was debugging the issue and observed below points.

  1. In QueryInsightsManager.scala.

if (insights.streamingCriticalPathResults.streamingQueryState.equals(StreamingState.ERROR)) {
throw new SparkException("Unexpected Error or Timeout occurred during Analysis")
}
streamingLensResultsBuffer.enqueue(insights)
eventsReporter.foreach(
.sendEvent())
}
}_

This value always will be insights.streamingCriticalPathResults.streamingQueryState 'NONEWBATCHES' as in StreamingCriticalPathResults only 'NONEWBATCHES' state specific code is available. and it is always giving no new batch and taking expectedMicroBatchSLA: Long = 0, batchRunningTime: Long = 0, criticalTime: Long = 0, streamingQueryState: StreamingState.Value = StreamingState.NONEWBATCHES) as default values from StreamingCriticalPathResults.

case class StreamingCriticalPathResults(expectedMicroBatchSLA: Long = 0,
batchRunningTime: Long = 0,
criticalTime: Long = 0,
streamingQueryState: StreamingState.Value = StreamingState.NONEWBATCHES)

private def startStreamingAnalysis(queryProgress: QueryProgress): Unit = {
val currentTime = System.currentTimeMillis()
if (shouldTriggerAnalysis(currentTime)) {
val insights = streamingQueryAnalyzer.analyze(queryProgress)
logResultsIfNecessary(insights)
println("Insides form startStreamingAnalysis method of QueryInsightManager" +insights)
lastAnalyzedTimeMills = currentTime
println("insights.streamingCriticalPathResults.streamingQueryState value : " +insights.streamingCriticalPathResults.streamingQueryState)
if (insights.streamingCriticalPathResults.streamingQueryState.equals(StreamingState.ERROR)) {
throw new SparkException("Unexpected Error or Timeout occurred during Analysis")
}
streamingLensResultsBuffer.enqueue(insights)
eventsReporter.foreach(
.sendEvent())
}
}
_**

This Block is always taking Values from StreamingCriticalPathResults.streamingQueryState which is NONEWBATCHES.

Kindly see the below logs and Suggest.

QueryInsightsManager.analysis block results:

value of queryProgress : QueryProgress(22,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:07:43.483Z,3,0.22653477308766895)
lastProgress.batchId=22
lastProgress.id=12900529-7063-4446-a4b8-b58a94194e89
lastProgress.timestamp=2021-10-04T21:07:43.483Z
lastProgress.numInputRows=3
lastProgress.processedRowsPerSecond=0.22653477308766895
currentTime : 1633381676786
analysisIntervalMinutes time: 300000,
value of default interval in milliseconds this should be <= application batch interval time in milliseconds : 300000

StreamingQueryAnalyzer.scala Results :
queryProgress : QueryProgress(22,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:07:43.483Z,3,0.22653477308766895)
lastAnalyzedBatchId : -1
batchStartAndEndTimes : (1633381663483,1633381676726)
batchStartAndEndTimes._1 : 1633381663483
batchStartAndEndTimes._2 : 1633381676726
batchRunningTime : 13243
batchDescription : BatchDescription(12900529-7063-4446-a4b8-b58a94194e89,22)

QueryInsightsManager.analysisTask() results:

value of queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:10:00.000Z,0,0.0)
lastProgress.batchId=23
lastProgress.id=12900529-7063-4446-a4b8-b58a94194e89
lastProgress.timestamp=2021-10-04T21:10:00.000Z
lastProgress.numInputRows=0
lastProgress.processedRowsPerSecond=0.0
currentTime : 1633381800115
analysisIntervalMinutes time: 300000
value of default interval in milliseconds this should be <= application batch interval time in milliseconds : 300000


Note: here also if batchrunningtime is '0' (no data is available to process) then only its printing the insights by calling (StreamingCriticalPathResults(300000,0,0,NONEWBATCHES)) and generating logs. and if batch running time is greater then 0(data is available to process) then is not giving the insights.

StreamingQueryAnalyzer.scala Results :
queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:10:00.000Z,0,0.0)
lastAnalyzedBatchId : -1
batchStartAndEndTimes : (1633381800000,0)
batchStartAndEndTimes._1 : 1633381800000
batchStartAndEndTimes._2 : 0
insightsStreamingLensResults(23,0,StreamingCriticalPathResults(300000,0,0,NONEWBATCHES))
Hello I am Inside QueryInsightsManager.scala Try Block
Streaming Query Analyzer Results :
queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:10:00.000Z,0,0.0)
lastAnalyzedBatchId : -1
batchStartAndEndTimes : (1633381800000,0)
batchStartAndEndTimes._1 : 1633381800000
batchStartAndEndTimes._2 : 0

Insides form startStreamingAnalysis method of QueryInsightManager StreamingLensResults(23,0,StreamingCriticalPathResults(120000,0,0,NONEWBATCHES))
insights.streamingCriticalPathResults.streamingQueryState value : NONEWBATCHES

value of queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:15:00.000Z,0,0.0)

lastProgress.batchId=23
lastProgress.id=12900529-7063-4446-a4b8-b58a94194e89
lastProgress.timestamp=2021-10-04T21:15:00.000Z
lastProgress.numInputRows=0
lastProgress.processedRowsPerSecond=0.0
currentTime : 1633382100114
lastAnalyzedTimeMills: 1633381800123
analysisIntervalMinutes time: 120000
value of default interval in milliseconds this should be <= application batch interval time in milliseconds : 300000

StreamingQueryAnalyzer.scala Results : Results :
queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:15:00.000Z,0,0.0)
lastAnalyzedBatchId : -1
batchStartAndEndTimes : (1633382100000,0)
batchStartAndEndTimes._1 : 1633382100000
batchStartAndEndTimes._2 : 0
insightsStreamingLensResults(23,0,StreamingCriticalPathResults(300000,,0,0,NONEWBATCHES))
Hello I am Inside Try Block
Streaming Query Analyzer Results :
queryProgress : QueryProgress(23,12900529-7063-4446-a4b8-b58a94194e89,2021-10-04T21:15:00.000Z,0,0.0)
lastAnalyzedBatchId : -1
batchStartAndEndTimes : (1633382100000,0)
batchStartAndEndTimes._1 : 1633382100000
batchStartAndEndTimes._2 : 0

Insides form startStreamingAnalysis method of QueryInsightsManager.scala*
QueryInsightManagerStreamingLensResults(23,0,StreamingCriticalPathResults(300000,0,0,NONEWBATCHES))
insights.streamingCriticalPathResults.streamingQueryState value : NONEWBATCHES
*


  1. While printing the Insights from QueryInsightsManager.scala.

| |||||||||||||||||| StreamingLens Inisights |||||||||||||||||||||||||
|BatchId: ${results.batchId}
| Analysis Time: ${pd(results.analysisTime)}
| Expected Micro Batch SLA: ${pd(results.streamingCriticalPathResults.expectedMicroBatchSLA)}
| Batch Running Time: ${pd(results.streamingCriticalPathResults.batchRunningTime)}
| Critical Time: ${pd(results.streamingCriticalPathResults.criticalTime)}
| Streaming Query State: ${results.streamingCriticalPathResults.streamingQueryState.toString}
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""".stripMargin)
logInfo(logOutput.toString())

expectedMicroBatchSLA,batchRunningTime,criticalTime,streamingQueryState are taking from streamingCriticalPathResults which are hardcoded for NONEWBATCHES.

Kindly suggest if Any Changes are required.

@abhishekd0907
Copy link
Collaborator

Have you made the changes I suggested you?

@rpatid10
Copy link
Author

rpatid10 commented Oct 7, 2021

@abhishekd0907 yes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants