Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24958] [WIP] Report executors' process tree total memory information to heartbeat signals #1

Closed
wants to merge 7 commits into from

Conversation

rezasafi
Copy link
Owner

This is just a preview of my changes for SPARK-24958 on top of the open PR for SPARK-23429:
apache#21221

Some improvements in integration

Integration with the unit tests of the upstream open PR

Fix an isuue with memory info computation.

Fix scalastyle errors

Some changes to address comments
return -1;
}
try {
val cmd = Array("bash", "-c", "echo $PPID")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you might want to at least drop a comment in here that this can be simplified in java9:
https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will add a comment.

* If the system isn't procfsBased the process tree metrics' values will be -1,
* meaning not available
*/
final val pTreeInfo: ProcessTreeMetrics = new ProcfsBasedSystems
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the MemoryManager is not a very good spot for this. You could put it in ExecutorMetricType, or just have an object ProcfsBasedSystems

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will find a better place for it. I jus put it there since this also gathering memory info

* Hadoop ProcfsBasedProcessTree class used regex and pattern matching to retrive the memory
* info. I tried that but found it not correct during tests, so I used normal string analysis
* instead. The computation of RSS and Vmem are based on proc(5):
* http://man7.org/linux/man-pages/man5/proc.5.html
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this changes at all, would we know? Or would we just start reporting wrong values?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah if it changes we need to fix this. probably it won't change though. I didn't find another way to retrieve this info.

createProcessTree

def isItProcfsBased: Boolean = {
val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this cause problems when someone tries to run tests on windows?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't test this on a windows machine. but since I will catch exceptions during the process tree creation/update and make isAvailable false in case of the exceptions I think there won't be a problem.

@@ -59,6 +59,18 @@ case object JVMOffHeapMemory extends ExecutorMetricType {
}
}

case object ProcessTreeRSSMemory extends ExecutorMetricType {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you separate out memory from python, the jvm, and other here as well? I think that breakdown is really important for most users. (can also be a follow-up)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to see what can I do.

}


def getRSSInfo(): Long = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know how expensive all this stuff is to do? Should this be done less often? Or maybe doing it at all is configurable? The other metrics are querying variables in the jvm, which should be significantly cheaper than this.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also worried about overhead. In general I think the depth of the tree isn't that much. So probably we will call bash and read the output at most two three times for every heartbeat. My tests didn't show a significant increase in time, but they were not very huge tests. Making it configurable is a good idea. I will try to do that.

@rezasafi
Copy link
Owner Author

rezasafi commented Aug 3, 2018

Thank you very much @squito for the review. I will update this with your comments hopefully ASAP.

Copy link

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see some examples of the metrics you can get with this, eg., showing the python memory in the executor in a pyspark job, or the java memory go up during a shuffle, etc.


package org.apache.spark.executor

private[spark] trait ProcessTreeMetrics {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo, its not worth creating the trait now when there is only one implementation. We can add that abstraction later if its useful. (this isn't exposed to users at all anyway.)

If we are keeping this, some of these should be private.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about future, when someone want to add new implementation, so that they use the same methods and not having differences across platforms

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I understand that goal, but who knows when / if that will ever happen. in the meantime, it just makes this a slightly harder to follow. Whenever someone does want to put in another implementation, its pretty easy for them to add the interface at that time.

var latestOtherVmemTotal: Long = 0
var latestOtherRSSTotal: Long = 0

createProcessTree
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method has a side-effect, so you should call it with parens createProcessTree()

}

val shouldLogStageExecutorProcessTreeMetrics = org.apache.spark.SparkEnv.get.conf.
getBoolean("spark.eventLog.logStageExecutorProcessTreeMetrics.enabled", true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use the config entry EVENT_LOG_STAGE_EXECUTOR_METRICS

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this so that the user can disable process tree metrics even when the normal executor metrics are enabled. So that they have more freedom in case they feel reporting process tree metrics introduces a lot of overhead

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I can see having an extra config here, but then you should create a config entry for it.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw in other places that we didn't do this. It would be headache to pass down a config object and I need a lot of changes

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a config entry without the need to send config object. thanks foe the comment.

Copy link

@wypoon wypoon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RSS numbers are number of pages, while the vmem numbers are number of bytes. For consistency, I think it would be better to convert the RSS numbers to number of bytes. We would need to know the page size.

" As a result reporting of ProcessTree metrics is stopped")
isAvailable = false
return -1
case _ => logDebug("Some exception occurred when trying to compute process tree. " +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe catch NonFatal instead?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much, @wypoon for the reviews. I though maybe better to not have an exception in the logs to avoid frightening the user

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an exception I should be worried about? I am running Spark Core 3.0.1 and I saw this warning (at least now it is a warning). I am running Windows 10. I spent hours trying to find why this is happening.

I get the following warning

WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped

" As a result reporting of ProcessTree metrics is stopped")
isAvailable = false
return new mutable.ArrayBuffer()
case _ => logDebug("Some exception occurred when trying to compute process tree. As a result" +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NonFatal, maybe?

Copy link

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the pr you're building upon is merged upstream, probably makes more sense to close this one and take discussion upstream now


package org.apache.spark.executor

private[spark] trait ProcessTreeMetrics {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I understand that goal, but who knows when / if that will ever happen. in the meantime, it just makes this a slightly harder to follow. Whenever someone does want to put in another implementation, its pretty easy for them to add the interface at that time.

}

val shouldLogStageExecutorProcessTreeMetrics = org.apache.spark.SparkEnv.get.conf.
getBoolean("spark.eventLog.logStageExecutorProcessTreeMetrics.enabled", true)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I can see having an extra config here, but then you should create a config entry for it.

@rezasafi
Copy link
Owner Author

rezasafi commented Oct 2, 2018

I will open the actual Pr shortly. Thank you everyone for the great reviews.

@rezasafi rezasafi closed this Dec 13, 2018
rezasafi pushed a commit that referenced this pull request May 22, 2019
## What changes were proposed in this pull request?

There were two related fixes regarding `from_json`, `get_json_object` and `json_tuple` ([Fix #1](apache@c8803c0),
 [Fix apache#2](apache@86174ea)), but they weren't comprehensive it seems. I wanted to extend those fixes to all the parsers, and add tests for each case.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#20302 from brkyvz/json-invfix.

(cherry picked from commit e01919e)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
rezasafi pushed a commit that referenced this pull request May 22, 2019
This is a backport of apache#20598.

## What changes were proposed in this pull request?

Solved two bugs to enable stream-stream self joins.

### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is necessary for the catalyst analyzer to convert the self-join logical plan DAG into a tree (by creating new instances of the leaf relations). This was causing the error `Failure when resolving conflicting references in Join:` (see JIRA for details).

### Incorrect attribute rewrite when splicing batch plans in MicroBatchExecution
When splicing the source's batch plan into the streaming plan (by replacing the StreamingExecutionPlan), we were rewriting the attribute reference in the streaming plan with the new attribute references from the batch plan. This was incorrectly handling the scenario when multiple StreamingExecutionRelation point to the same source, and therefore eventually point to the same batch plan returned by the source. Here is an example query, and its corresponding plan transformations.
```
val df = input.toDF
val join =
      df.select('value % 5 as "key", 'value).join(
        df.select('value % 5 as "key", 'value), "key")
```
Streaming logical plan before splicing the batch plan
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- StreamingExecutionRelation Memory[#1], value#12  // two different leaves pointing to same source
```
Batch logical plan after splicing the batch plan and before rewriting
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#1
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- LocalRelation [value#66]           // replaces StreamingExecutionRelation Memory[#1], value#12
```
Batch logical plan after rewriting the attributes. Specifically, for spliced, the new output attributes (value#66) replace the earlier output attributes (value#12, and value#1, one for each StreamingExecutionRelation).
```
Project [key#6, value#66, value#66]       // both value#1 and value#12 replaces by value#66
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9, value#66]
      +- LocalRelation [value#66]
```
This causes the optimizer to eliminate value#66 from one side of the join.
```
Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#66 % 5) AS key#6, value#66]
   :  +- LocalRelation [value#66]
   +- Project [(value#66 % 5) AS key#9]   // this does not generate value, incorrect join results
      +- LocalRelation [value#66]
```

**Solution**: Instead of rewriting attributes, use a Project to introduce aliases between the output attribute references and the new reference generated by the spliced plans. The analyzer and optimizer will take care of the rest.
```
Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
   :- Project [(value#1 % 5) AS key#6, value#1]
   :  +- Project [value#66 AS value#1]   // solution: project with aliases
   :     +- LocalRelation [value#66]
   +- Project [(value#12 % 5) AS key#9, value#12]
      +- Project [value#66 AS value#12]    // solution: project with aliases
         +- LocalRelation [value#66]
```

## How was this patch tested?
New unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#20765 from tdas/SPARK-23406-2.3.
rezasafi pushed a commit that referenced this pull request Oct 10, 2019
…comparison assertions

## What changes were proposed in this pull request?

This PR removes a few hardware-dependent assertions which can cause a failure in `aarch64`.

**x86_64**
```
rootdonotdel-openlab-allinone-l00242678:/home/ubuntu# uname -a
Linux donotdel-openlab-allinone-l00242678 4.4.0-154-generic apache#181-Ubuntu SMP Tue Jun 25 05:29:03 UTC
2019 x86_64 x86_64 x86_64 GNU/Linux

scala> import java.lang.Float.floatToRawIntBits
import java.lang.Float.floatToRawIntBits
scala> floatToRawIntBits(0.0f/0.0f)
res0: Int = -4194304
scala> floatToRawIntBits(Float.NaN)
res1: Int = 2143289344
```

**aarch64**
```
[rootarm-huangtianhua spark]# uname -a
Linux arm-huangtianhua 4.14.0-49.el7a.aarch64 #1 SMP Tue Apr 10 17:22:26 UTC 2018 aarch64 aarch64 aarch64 GNU/Linux

scala> import java.lang.Float.floatToRawIntBits
import java.lang.Float.floatToRawIntBits
scala> floatToRawIntBits(0.0f/0.0f)
res1: Int = 2143289344
scala> floatToRawIntBits(Float.NaN)
res2: Int = 2143289344
```

## How was this patch tested?

Pass the Jenkins (This removes the test coverage).

Closes apache#25186 from huangtianhua/special-test-case-for-aarch64.

Authored-by: huangtianhua <huangtianhua@huawei.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants