Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Coverage of pyspark user defined function #658

Open
nedbat opened this issue May 2, 2018 · 25 comments
Open

Coverage of pyspark user defined function #658

nedbat opened this issue May 2, 2018 · 25 comments
Labels
exotic Unusual execution environment

Comments

@nedbat
Copy link
Owner

nedbat commented May 2, 2018

Originally reported by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


I have a case where I have some pyspark codes in my code base and I am trying to test them.
When doing that, I find that any python UDF I can with spark does not get covered even though I am running it. Note that I am running it in the local spark mode.

Reproducible example:

#!python

def get_new_col(spark, df):
    def myadd(x, y):
        import sys, os
        print("sys.version_info =", sys.version_info)
        print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
        x1 = x
        y1 = y
        return str(float(x1) + float(y1))

    spark.udf.register('myadd', myadd)
    return df.selectExpr(['*', 'myadd(x, y) as newcol'])


def run():
    try:
        import findspark
        findspark.init()
    except ImportError:
        pass
    import pyspark
    spark = pyspark.sql.SparkSession.Builder().master("local[2]").getOrCreate()
    df = spark.createDataFrame([
        [1.0, 1.0],
        [1.0, 2.0],
        [1.0, 2.0]
    ], ['x', 'y'])

    outdf = get_new_col(spark, df)
    outdf.show()
    outdf.printSchema()
    assert outdf.columns == (df.columns + ['newcol'])

    spark.stop()


if __name__ == '__main__':
    run()

This says the UDF was not covered even though it did run.

Here are the logs when I run it:

#!python
$ coverage run example.py
2018-05-04 14:58:29 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2018-05-04 14:58:30 WARN  Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 0:>                                                          (0 + 1) / 1]sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
sys.version_info = sys.version_info(major=3, minor=6, micro=4, releaselevel='final', serial=0)
{'COVERAGE_PROCESS_START': ''}
+---+---+------+
|  x|  y|newcol|
+---+---+------+
|1.0|1.0|   2.0|
|1.0|2.0|   3.0|
|1.0|2.0|   3.0|
+---+---+------+

root
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- newcol: string (nullable = true)

Relevant packages:
Python 3.6.4 :: Anaconda, Inc.
coverage (4.5.1)

Edit 1: Simplified the reproducible example to remove unittest and pytest.


@nedbat
Copy link
Owner Author

nedbat commented May 2, 2018

Thanks for the report. I've never used PySpark. Before I try to reproduce this, what packages do I need to install to be able to run this code? I'm on a Mac, with Python 3.6. Give me the complete details of what I need to do to see the problem.

@nedbat
Copy link
Owner Author

nedbat commented May 2, 2018

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


Hm, it may be a bit complicated to setup (spark can get messy to install)

To reproduce, install:

  • Apache Spark
    • Java
    • Scala
    • xcode dev installations
  • Python 3.x testing stuff
    • pytest
    • pytest-cov
    • coverage

For Spark you could try: https://medium.freecodecamp.org/installing-scala-and-apache-spark-on-mac-os-837ae57d283f

I have not had much luck getting it to work with brew though - but my setup is a little more complicated than just spark. It's never worked for me in one shot :P We can talk on gitter or IRC if you like if you run into issues trying to reproduce.

A quick note if you're not familiar with this system, PySpark uses Py4J which calls internal Java routines. So the df.selectExpr you see it actually calling a Java function internally.
And that Java function goes back to call the registered UDF with spark.udf.register().

Hence the function is definitely running in a different process inside that JVM I believe.

It's Python Shell > JVM > Python Shell

@nedbat
Copy link
Owner Author

nedbat commented May 3, 2018

Issue #657 is also about PySpark.

@nedbat
Copy link
Owner Author

nedbat commented May 4, 2018

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


Ned, I am trying to see if I can understand what spark exactly does so we can figure this out. Here are the steps:

  • I open a python shell
  • I import pyspark and create a session/context
  • Spark will now call a Popen() to a bash script.
  • The bash script contains bash some environment variable creation
  • Then it calls a Java jar
  • After this all communication between the Python shell and Java jar are done using Socket communication.
  • Using the socket-communication, the python function get_new_col is sent (serialized by cloudpickle i think) and the serialized-python-function is saved in Java
  • To execute this function, the Java process creates a ProcessBuilder to create a new Python process. And runs the code in this second python process.

I ran the following in the background:

#!bash

while sleep 0.1; do echo date=$(date) py=$(ps aux | grep pytho[n] | wc -l) java=$(ps aux | grep jav[a] | wc -l) cov=$(ps aux | grep coverag[e] | wc -l); done

And verified that the sequence is:

  • Python process created + Coverage process created
  • Java process created
  • Python process created (second)
  • Python process killed (second)
  • Java process killed
  • Python process killed + Coverage process killed

So, the question I think can boil down to how to make all these python processes use coverage.

EDIT: The processes are:

/Users/abdealijk/anaconda3/bin/python /Users/abdealijk/anaconda3/bin/coverage run example.py
/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home/bin/java -cp /usr/local/hadoop/spark/conf/:/usr/local/hadoop/spark/jars/*:/usr/local/hadoop/hadoop/etc/hadoop/ -Xmx1g org.apache.spark.deploy.SparkSubmit --conf spark.master=local[1] pyspark-shell
/Users/abdealijk/anaconda3/bin/python -m pyspark.daemon

@nedbat
Copy link
Owner Author

nedbat commented May 4, 2018

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


I read over http://coverage.readthedocs.io/en/latest/subprocess.html and tried:

EDIT: Realized that setting the env variable to 1 was causing some issues as the value is taken as the coverage configuration file to use.
export COVERAGE_PROCESS_START= fixed that error but it didnt cover the UDF :(

What I did:

  • Set the environment variable export COVERAGE_PROCESS_START=
  • Then added /Users/abdealijk/anaconda3/lib/python3.6/site-packages/sitecustomize.py with import coverage; coverage.process_startup()

But this didnt increase my coverage.
Inside the function, when I do print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')}) I can see {'COVERAGE_PROCESS_START': ''} which does seem correct.

For debugging I even tried:

    def myadd(x, y):
        import coverage
        cov = coverage.Coverage(config_file=None)
        cov.start()
        import sys, os
        print("sys.version_info =", sys.version_info)
        print({k: v for k, v in os.environ.items() if k.lower().startswith('cov')})
        x1 = x
        y1 = y
        return str(float(x1) + float(y1))

but the coverage did not increase.

@nedbat
Copy link
Owner Author

nedbat commented May 4, 2018

@AbdealiJK Thanks for doing all this! One thing that looks wrong to me: the COVERAGE_PROCESS_START environment variable needs to refer to the location of the .covergerc file to use:

export COVERAGE_PROCESS_START=/path/to/.coveragerc

And you need to create a .pth file in the Python environment that is running the subprocesses.

@nedbat
Copy link
Owner Author

nedbat commented May 4, 2018

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


I am running it with my root anaconda - so, I think this is the right one. (Considering it was giving me an error of "invalid config path '1'" when i gave COVERAGE_PROCESS_START=1 - i believe it is the right one.

And I do not have any .coveragerc file (just default configs)

@nedbat
Copy link
Owner Author

nedbat commented May 8, 2018

Original comment by Abdeali Kothari (Bitbucket: AbdealiJK, GitHub: AbdealiJK)


Any thoughts on this Ned ?

I'm not sure if I'm doing something wrong for the subprocess thing. Or is the subprocess work only if the coverage run's python process creates the subprocess ?

@nedbat nedbat removed the 4.5 label Aug 17, 2018
@ketgo
Copy link

ketgo commented Dec 17, 2019

Hi,

Any update on this issue? I am facing the same problem when I run pytest-cov to test python methods that use @udf decorated nested methods.

Thanks!

@nedbat nedbat added exotic Unusual execution environment and removed task labels Jan 15, 2020
@nedbat
Copy link
Owner Author

nedbat commented Apr 21, 2021

@kristen-cape
Copy link

I'm running into the same problem. I tried your two suggestions, @nedbat , and had no luck.

I'm invoking coverage.py via pytest-cov. I'm using Python 3.8 and have the following pyspark, pytest, pytest-cov, and coverage modules

Name: pyspark
Version: 2.4.7

Name: pytest-cov
Version: 2.11.1

Name: coverage
Version: 5.4

@RaccoonForever
Copy link

Hello, running into the same problem too:

pyspark==3.2.1
pytest==7.1.1
pytest-cov==3.0.0

@nedbat
Copy link
Owner Author

nedbat commented May 6, 2022

If someone could provide very specific step-by-step instructions to reproduce the failure, that would help move this forward (though no guarantees...)

AndrewLane pushed a commit to AndrewLane/repro-coverage-issue-with-pyspark-udf that referenced this issue May 6, 2022
@AndrewLane
Copy link

If someone could provide very specific step-by-step instructions to reproduce the failure, that would help move this forward (though no guarantees...)

@nedbat I tried to provide that here: https://github.com/AndrewLane/repro-coverage-issue-with-pyspark-udf

@nedbat
Copy link
Owner Author

nedbat commented May 7, 2022

@AndrewLane experimenting a bit with this, my guess is that the code is running in a subprocess, but that process is started in a way that doesn't get coverage started on it, perhaps because it's started from Java.

@nedbat
Copy link
Owner Author

nedbat commented May 7, 2022

Here is what I have tried. I created a doit.sh file in my copy of the repo to configure subprocess measurement, and to simplify re-running it:

cd /home
rm -f debug.txt
pip install pytest==7.0.1 coverage==6.2
echo "import coverage; coverage.process_startup()" > /usr/local/lib/python3.6/site-packages/00coverage.pth
export COVERAGE_PROCESS_START=$(pwd)/.coveragerc
coverage run -m pytest tests/test.py --disable-pytest-warnings

I created a .coveragerc file in /home:

[run]
parallel = true
source = ./src

I changed code.py to add some debugging like this:

import os, sys
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

with open("debug.txt", "a") as f:
    print(f"outside: {os.getpid()}: trace: {sys.gettrace()}\n", file=f)


def translate(english):
    with open("debug.txt", "a") as f:
        print(f"inside: {os.getpid()}: trace: {sys.gettrace()}\n", file=f)

    if english == "1":
        return "uno"
    elif english == "2":
        return "dos"
    else:
        return f"Cannot translate {english}"


translation_udf = udf(lambda english: translate(english), StringType())

def transform_data(df):
    return df.withColumn("spanish", translation_udf(col("english")))

When I run (with source /home/doit.sh), I have a debug.txt that looks like this:

outside: 14: trace: <coverage.CTracer object at 0x7fb34c1483f0>

outside: 172: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 183: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 186: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 186: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 180: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 177: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 188: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 177: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 188: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 183: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 186: trace: <coverage.CTracer object at 0x7fc301112030>

outside: 136: trace: <coverage.CTracer object at 0x7fc301112030>

inside: 136: trace: <coverage.CTracer object at 0x7fc301112030>

and the data files are:

root@5fcaa35b647e:/home# ls -al .coverage*
-rw-r--r-- 1 root root 53248 May  7 12:50 .coverage
-rw-r--r-- 1 root root 53248 May  7 13:19 .coverage.5fcaa35b647e.118.982518
-rw-r--r-- 1 root root 53248 May  7 13:19 .coverage.5fcaa35b647e.14.449311
-rw-r--r-- 1 root root    37 May  7 12:46 .coveragerc

That should mean that process id's 118 and 14 recorded data, but the debug output doesn't show those ids. Also, the id of the CTracer object is the same for many different processes, so maybe we are dealing with something similar to #310?

@RaccoonForever
Copy link

Hi Ned, sorry for the lack of info.

I fastly made a repo to reproduce the lack of coverage on UDFs.

https://github.com/RaccoonForever/py-cov-potential-issue

Don't look at code, it was just to reproduce :)
Thanks for your help!

@RaccoonForever
Copy link

Does someone have a workaround ? :'(

@shsab
Copy link

shsab commented Jun 9, 2022

I have the same issue when testing pySpark UDFs using pytest. But as a workaround, the UDFs are python functions and I can create tests that specifically test that function only.

@RechaviaAmit
Copy link

python 3.8
pyspark==3.2.3
coverage ==6.5.0

Although this is not fully operational, I've uncovered something that might provide insight into how to make it work.
example:

file: module1.py

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

spark = SparkSession.builder \
    .appName("SimpleSparkUDF") \
    .getOrCreate()


data = [("John", 25)]
columns = ["Name", "Age"]


df = spark.createDataFrame(data, columns)

def add_one_udf(age):
    import os
    import coverage
    import module2

    # set up coverage   
    content = f"""
    [run]
    branch = True
    cover_pylib = False
    concurrency = multiprocessing,thread
    parallel = True
    data_file = {os.getcwd()}/.coverage
    """

    with open("coveragerc_temp", "w") as file:
        file.write(content.strip())

    os.environ["COVERAGE_PROCESS_START"] = "coveragerc_temp"

    cov = coverage.process_startup()
    
    print("This line isn't covered by coverage.py")
    module2.foo()    

    cov.stop()
    cov.save()

    coverage_data_files = [
        f"{current_directory}/{name}" for name in os.listdir(current_directory) if name.startswith(".coverage")
    ]

    # send back the .coverage files to my local machine
    ubprocess.run(["scp", "-o", "StrictHostKeyChecking=no", *coverage_data_files, os.environ["localhost"])

    return age + 1

add_one_udf_spark = udf(add_one_udf, IntegerType())

result = df.withColumn("AgePlusOne", add_one_udf_spark(df["Age"]))

result.show()

file: module2.py

def foo():
    print("This line is covered by coverage.py")
    print("This line is covered by coverage.py")
    print("This line is covered by coverage.py")

On my local host, I received 1 .coverage file. After mapping the paths to my local machine and executing coverage combine + coverage report, I can easily see that the lines from module2.py are covered (75% of the lines, excluding the function signature). However, it seems like module1.py isn't covered at all. Additionally, I tried to debug it with the trace flag in the debug section, and module1.py isn't mentioned at all.

Does anyone have insights into why module1.py isn't covered at all, unlike module2.py?

@nedbat
Copy link
Owner Author

nedbat commented Mar 13, 2024

Does anyone have insights into why module1.py isn't covered at all, unlike module2.py?

You have to show us the steps you used to run the code, and how you used coverage on it. module1 starts coverage then calls foo(). This is why the three lines in the body of foo() are covered. Are you using coverage in any other way? If not, then module1 would not be measured.

@RechaviaAmit
Copy link

RechaviaAmit commented Mar 13, 2024

@nedbat thanks for the fast response!
No, in this example, I don't use coverage in any other way.

This is why the three lines in the body of foo() are covered.

I totally understand why these 3 lines are covered.

If not, then module1 would not be measured.

I'm not sure I get It, why isn't module1 covered, but module2 is covered?
I'm initiating the coverage.process_startup() command on module1 within the PySpark UDF.

@nedbat
Copy link
Owner Author

nedbat commented Mar 13, 2024

Coverage can't measure code that ran before it was started. This is why the def line in module2 isn't covered. It will only measure code executed after coverage has been started, and additionally, only in functions called, not in the current function.

You are using the API in a really unusual way: process_startup() isn't meant for you to call from your code: it's for something that runs very early in the process, like sitecustomize or a .pth file.

@nedbat
Copy link
Owner Author

nedbat commented Mar 13, 2024

The best way to help get pyspark support is to provide a complete runnable example, with detailed explicit instructions.

@RechaviaAmit
Copy link

and additionally, only in functions called, not in the current function.

You are correct; this was my issue. Thank you very much!

For those attempting to edit the sitecustomize within the Spark executor, it didn't work for me. Please share if someone manages to collect coverage from the UDF itself using alternative methods.

The best way to help get pyspark support is to provide a complete runnable example, with detailed explicit instructions.

You are correct, unfortunately, it can be challenging to reproduce the Spark environment as it runs on distributed machines.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
exotic Unusual execution environment
Projects
None yet
Development

No branches or pull requests

7 participants