Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StructType and DDL extraction from Pandera schemas #1570

Merged

Conversation

filipeo2-mck
Copy link
Contributor

@filipeo2-mck filipeo2-mck commented Apr 12, 2024

Using Pandera schemas for data quality checks are great but we need to get the PySpark DataFrame correctly loaded first, with the correct column types we are expecting to validate later.
Relying on automatic Spark's inferSchema = True when loading data files (CSV and parquet, for example) is not reliable, so this PR tries to address this by allowing the extraction of a PySpark schema from existing Pandera schemas/models, in two ways:

  • A StructType object

  • A more compact/simple DDL-like schema:

    binary BINARY,byte TINYINT,text STRING

Both extractions above can be used to create or read files in Spark, as in these examples:

  1. Creating a dataframe:

    spark.createDataFrame([], schema)  # be `schema` a StructType or a DDL-like string
  2. Reading an existing file:

    customSchema = StructType([
        StructField("IDGC", StringType(), True),        
        StructField("SEARCHNAME", StringType(), True),
        StructField("PRICE", DoubleType(), True)
    ])
    df = spark.read.load('/file.csv', format="csv", schema=customSchema)

Specific tests for these were added, representing most common scenarios/datatypes used. The output of the unit test test_pyspark_read shows the default behavior between reading a sample CSV file with schema inference (non-deterministic) and using the approach enabled by this PR (deterministic):
image

This PR tries to address both open issues: #1327 and #1434.

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
Copy link

codecov bot commented Apr 12, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 83.15%. Comparing base (4df61da) to head (f011ea7).
Report is 73 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff             @@
##             main    #1570       +/-   ##
===========================================
- Coverage   94.29%   83.15%   -11.14%     
===========================================
  Files          91      114       +23     
  Lines        7024     8505     +1481     
===========================================
+ Hits         6623     7072      +449     
- Misses        401     1433     +1032     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
@filipeo2-mck
Copy link
Contributor Author

filipeo2-mck commented Apr 12, 2024

Hey @cosmicBboy , not sure why the CI broke. I don't have permissions to restart it from the failed one:
image

@filipeo2-mck filipeo2-mck marked this pull request as ready for review April 12, 2024 18:32
Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
@cosmicBboy
Copy link
Collaborator

@filipeo2-mck yeah I can manually restart these. Need to figure out why the hashes don't match... I see this fairly often with other PRs.

@cosmicBboy
Copy link
Collaborator

@NeerajMalhotra-QB @jaskaransinghsidana would appreciate your review on this PR!

:returns: StructType object with current schema fields.
"""
fields = [
StructField(column, self.columns[column]._dtype.type, True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be able to handle a complex/nested struct method ?

Copy link
Contributor Author

@filipeo2-mck filipeo2-mck Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does and I have just added a test for it.
When we get the annotation from the schema/model, the entire set of annotated pyspark.sql.types is copied to the StructType object output.

Copy link

@AbhishekBhatiaQB AbhishekBhatiaQB Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor comment 🙂

@filipeo2-mck I think it's better to access the public attribute dtype, since _dtype is a protected attribute, and meant to be used inside the ColumnSchema class only

Suggested change
StructField(column, self.columns[column]._dtype.type, True)
StructField(column, self.columns[column].dtype.type, True)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tks for noting that, I just changed it :)

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
@jaskaransinghsidana
Copy link
Contributor

@NeerajMalhotra-QB @jaskaransinghsidana would appreciate your review on this PR!

LGTM!

@NeerajMalhotra-QB
Copy link
Collaborator

@NeerajMalhotra-QB @jaskaransinghsidana would appreciate your review on this PR!

sorry @cosmicBboy I missed github notification about this message. I will try to review it soon. Thanks

@NeerajMalhotra-QB
Copy link
Collaborator

this looks great, @filipeo2-mck
As discussed, please add a negative and positive tests with dummy data to explain the situation you are fixing.

…rk_schema_generation

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
@filipeo2-mck
Copy link
Contributor Author

Hi @NeerajMalhotra-QB !
The suggested test cases were added, showing the Pandera usage I'm trying to enable with this PR, along negative test cases. A screenshot was also added to this PR description.
Thanks for your suggestions 👍

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
…rk_schema_generation

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
Copy link
Collaborator

@NeerajMalhotra-QB NeerajMalhotra-QB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@filipeo2-mck
Copy link
Contributor Author

Hello @cosmicBboy! Approvals were granted, happy if you can evaluate and/or merge it :)
Thank you!

@cosmicBboy
Copy link
Collaborator

hey @filipeo2-mck would you mind rebasing on main? It should address the failing unit test

…rk_schema_generation

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
…mck/pandera into feat/pyspark_schema_generation

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
@filipeo2-mck
Copy link
Contributor Author

hey @filipeo2-mck would you mind rebasing on main? It should address the failing unit test

Done, @cosmicBboy , I hope that everything is OK now :)
Thank you!

@cosmicBboy
Copy link
Collaborator

Looks like test is failing:
https://github.com/unionai-oss/pandera/actions/runs/8815578745/job/24197845359?pr=1570#step:15:1540

You can test this locally by running the nox test:

nox -db mamba --envdir .nox-mamba -s "tests(extra='pyspark', pydantic='2.3.0', python='3.10', pandas='2.2.0')"

(You need nox and mamba installed)

…rk_schema_generation

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
@filipeo2-mck
Copy link
Contributor Author

Looks like test is failing: https://github.com/unionai-oss/pandera/actions/runs/8815578745/job/24197845359?pr=1570#step:15:1540

You can test this locally by running the nox test:

nox -db mamba --envdir .nox-mamba -s "tests(extra='pyspark', pydantic='2.3.0', python='3.10', pandas='2.2.0')"

(You need nox and mamba installed)

Hi @cosmicBboy ! Sorry for the delay.
I took a look at the CI run:

  • it looks like it's happening only with the Windows runners (linux and macos ran fine with this config):
    image
    image

  • This Windows task hanged for 1 hour+ and ended with a HADOOP_HOME unset error, probably an issue with Spark installation:
    image

  • I don't have a Windows machine to test it locally and, as I'm using pytest's tmp_dir functionality to save the temporary file, I don't see what could be wrong with PR code.

Would you mind to rerun the CI from start one time, only to make sure it's not a transient issue with GH Windows runners, please?

return schema


def test_pyspark_read(schema_with_simple_datatypes, tmp_path, spark):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@filipeo2-mck can we mark this as skipped if the os is windows? Looks like we do this elsewhere in the tests

@pytest.mark.skipif(
    platform.system() == "Windows",
    reason="skipping due to issues with opening file names for temp files.",
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Thank you for checking it. The Windows jobs are running now.

Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
Signed-off-by: Filipe Oliveira <filipe_oliveira@mckinsey.com>
@cosmicBboy
Copy link
Collaborator

Thanks for the contribution @filipeo2-mck !

@cosmicBboy cosmicBboy merged commit cf09ae2 into unionai-oss:main Apr 27, 2024
67 of 68 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants