Skip to content

Commit

Permalink
[SPARK-47014][PYTHON][CONNECT] Implement methods dumpPerfProfiles and…
Browse files Browse the repository at this point in the history
… dumpMemoryProfiles of SparkSession

### What changes were proposed in this pull request?
Implement methods dumpPerfProfiles and dumpMemoryProfiles of SparkSession

### Why are the changes needed?
Complete support of (v2) SparkSession-based profiling.

### Does this PR introduce _any_ user-facing change?
Yes. dumpPerfProfiles and dumpMemoryProfiles of SparkSession are supported.

An example of dumpPerfProfiles is shown below.

```py
>>> udf("long")
... def add(x):
...   return x + 1
...
>>> spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
>>> spark.range(10).select(add("id")).collect()
...
>>> spark.dumpPerfProfiles("dummy_dir")
>>> os.listdir("dummy_dir")
['udf_2.pstats']
```

### How was this patch tested?
Unit tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#45073 from xinrong-meng/dump_profile.

Authored-by: Xinrong Meng <xinrong@apache.org>
Signed-off-by: Xinrong Meng <xinrong@apache.org>
  • Loading branch information
xinrong-meng committed Feb 14, 2024
1 parent c1321c0 commit 4b9e9d7
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 17 deletions.
10 changes: 10 additions & 0 deletions python/pyspark/sql/connect/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,16 @@ def showMemoryProfiles(self, id: Optional[int] = None) -> None:

showMemoryProfiles.__doc__ = PySparkSession.showMemoryProfiles.__doc__

def dumpPerfProfiles(self, path: str, id: Optional[int] = None) -> None:
self._profiler_collector.dump_perf_profiles(path, id)

dumpPerfProfiles.__doc__ = PySparkSession.dumpPerfProfiles.__doc__

def dumpMemoryProfiles(self, path: str, id: Optional[int] = None) -> None:
self._profiler_collector.dump_memory_profiles(path, id)

dumpMemoryProfiles.__doc__ = PySparkSession.dumpMemoryProfiles.__doc__


SparkSession.__doc__ = PySparkSession.__doc__

Expand Down
65 changes: 65 additions & 0 deletions python/pyspark/sql/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#
from abc import ABC, abstractmethod
import os
import pstats
from threading import RLock
from typing import Dict, Optional, TYPE_CHECKING
Expand Down Expand Up @@ -158,6 +159,70 @@ def _profile_results(self) -> "ProfileResults":
"""
...

def dump_perf_profiles(self, path: str, id: Optional[int] = None) -> None:
"""
Dump the perf profile results into directory `path`.
.. versionadded:: 4.0.0
Parameters
----------
path: str
A directory in which to dump the perf profile.
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
stats = self._perf_profile_results

def dump(id: int) -> None:
s = stats.get(id)

if s is not None:
if not os.path.exists(path):
os.makedirs(path)
p = os.path.join(path, f"udf_{id}_perf.pstats")
s.dump_stats(p)

if id is not None:
dump(id)
else:
for id in sorted(stats.keys()):
dump(id)

def dump_memory_profiles(self, path: str, id: Optional[int] = None) -> None:
"""
Dump the memory profile results into directory `path`.
.. versionadded:: 4.0.0
Parameters
----------
path: str
A directory in which to dump the memory profile.
id : int, optional
A UDF ID to be shown. If not specified, all the results will be shown.
"""
with self._lock:
code_map = self._memory_profile_results

def dump(id: int) -> None:
cm = code_map.get(id)

if cm is not None:
if not os.path.exists(path):
os.makedirs(path)
p = os.path.join(path, f"udf_{id}_memory.txt")

with open(p, "w+") as f:
MemoryProfiler._show_results(cm, stream=f)

if id is not None:
dump(id)
else:
for id in sorted(code_map.keys()):
dump(id)


class AccumulatorProfilerCollector(ProfilerCollector):
def __init__(self) -> None:
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,16 @@ def showMemoryProfiles(self, id: Optional[int] = None) -> None:

showMemoryProfiles.__doc__ = ProfilerCollector.show_memory_profiles.__doc__

def dumpPerfProfiles(self, path: str, id: Optional[int] = None) -> None:
self._profiler_collector.dump_perf_profiles(path, id)

dumpPerfProfiles.__doc__ = ProfilerCollector.dump_perf_profiles.__doc__

def dumpMemoryProfiles(self, path: str, id: Optional[int] = None) -> None:
self._profiler_collector.dump_memory_profiles(path, id)

dumpMemoryProfiles.__doc__ = ProfilerCollector.dump_memory_profiles.__doc__


def _test() -> None:
import os
Expand Down
20 changes: 12 additions & 8 deletions python/pyspark/sql/tests/test_udf_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,20 @@ def test_perf_profiler_udf(self):
with self.trap_stdout() as io_all:
self.spark.showPerfProfiles()

for id in self.profile_results:
self.assertIn(f"Profile of UDF<id={id}>", io_all.getvalue())
with tempfile.TemporaryDirectory() as d:
self.spark.dumpPerfProfiles(d)

with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)
for id in self.profile_results:
self.assertIn(f"Profile of UDF<id={id}>", io_all.getvalue())

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
io.getvalue(), f"10.*{os.path.basename(inspect.getfile(_do_computation))}"
)
with self.trap_stdout() as io:
self.spark.showPerfProfiles(id)

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
io.getvalue(), f"10.*{os.path.basename(inspect.getfile(_do_computation))}"
)
self.assertTrue(f"udf_{id}_perf.pstats" in os.listdir(d))

@unittest.skipIf(
not have_pandas or not have_pyarrow,
Expand Down
22 changes: 13 additions & 9 deletions python/pyspark/tests/test_memory_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def test_memory_profiler(self):

with tempfile.TemporaryDirectory() as d:
self.sc.dump_profiles(d)
self.assertTrue("udf_%d_memory.txt" % id in os.listdir(d))
self.assertTrue(f"udf_{id}_memory.txt" in os.listdir(d))

def test_profile_pandas_udf(self):
udfs = [self.exec_pandas_udf_ser_to_ser, self.exec_pandas_udf_ser_to_scalar]
Expand Down Expand Up @@ -232,16 +232,20 @@ def test_memory_profiler_udf(self):
with self.trap_stdout() as io_all:
self.spark.showMemoryProfiles()

for id in self.profile_results:
self.assertIn(f"Profile of UDF<id={id}>", io_all.getvalue())
with tempfile.TemporaryDirectory() as d:
self.spark.dumpMemoryProfiles(d)

with self.trap_stdout() as io:
self.spark.showMemoryProfiles(id)
for id in self.profile_results:
self.assertIn(f"Profile of UDF<id={id}>", io_all.getvalue())

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
)
with self.trap_stdout() as io:
self.spark.showMemoryProfiles(id)

self.assertIn(f"Profile of UDF<id={id}>", io.getvalue())
self.assertRegex(
io.getvalue(), f"Filename.*{os.path.basename(inspect.getfile(_do_computation))}"
)
self.assertTrue(f"udf_{id}_memory.txt" in os.listdir(d))

@unittest.skipIf(
not have_pandas or not have_pyarrow,
Expand Down

0 comments on commit 4b9e9d7

Please sign in to comment.