Skip to content

chore: generate basic spark function tests #16409

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 17, 2025

Conversation

shehabgamin
Copy link
Contributor

@shehabgamin shehabgamin commented Jun 14, 2025

Which issue does this PR close?

Rationale for this change

Following up on the discussion with @andygrove, @alamb, and @linhr (see #15914 (comment)), the goal here is to provide contributors with basic tests that can serve as a reference and starting point when implementing Spark functions.

The script I ran does not extract all the function tests from Sail’s gold data, as creating a comprehensive script to generate each .slt test would have taken too long. As a result, the tests added in this PR represent only a subset of the gold data tests. Future work can improve upon the script.

What changes are included in this PR?

Test files.

Are these changes tested?

N/A.

Are there any user-facing changes?

No.

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Jun 14, 2025
@shehabgamin
Copy link
Contributor Author

shehabgamin commented Jun 14, 2025

Not sure if it makes sense to commit the script I used, so I'll paste it here for now:

# WARNING:
# - This script extracts only basic, straightforward tests.
# - It is not comprehensive and will not capture most function tests.
# - Intended as a quick-and-dirty tool for generating minimal Spark function tests.
# - Run this script from the root directory of the Sail project.
import glob
import json
import os
import re

from pyspark.sql import SparkSession

# From project root in Sail: https://github.com/lakehq/sail
FUNCTIONS_PATH = "crates/sail-spark-connect/tests/gold_data/function/"

LICENSE = """\
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at

#   http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# This file was originally created by a porting script from:
#   https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function
# This file is part of the implementation of the datafusion-spark function library.
# For more information, please see:
#   https://github.com/apache/datafusion/issues/15914
"""


def extract_simple_function_arguments(query):
    """
    Extract arguments from simple function calls of pattern:
    SELECT SOME_FUNC(ARG0, ARG1, .... ARGN);
    Only accepts basic literal arguments - no arrays, nested functions, etc.
    Example queries NOT accepted:
        - query = "SELECT any(col) FROM VALUES (NULL), (true), (false) AS tab(col);"
        - query = "SELECT array_append(CAST(null as Array<Int>), 2);"
        - query = "SELECT array_append(array('b', 'd', 'c', 'a'), 'd');"
        - query = "SELECT * FROM explode(collection => array(10, 20));"
        - query = "SELECT cast('10' as int);"
    Example queries accepted:
        - query = "SELECT ceil(5);"
        - query = "SELECT ceil(3.1411, -3);"
        - query = "SELECT now();"
    """
    if any(f in query.lower() for f in ["cast", "map", "from", "raise_error", "regexp", "rlike", " in "]):
        return None
    pattern = r'SELECT\s+\w+\s*\(([^)]*)\)\s*;\s*'
    match = re.search(pattern, query, re.IGNORECASE | re.DOTALL)
    if not match:
        return None
    args_string = match.group(1).strip()
    # Empty function call
    if not args_string:
        return []
    # Filter out complex arguments - reject if contains brackets, parens, etc...
    if any(char in args_string for char in ['[', ']', '(', ')']):
        return None
    arguments = re.split(r',(?=(?:[^"\']*["\'][^"\']*["\'])*[^"\']*$)', args_string)
    return [arg.strip() for arg in arguments if arg.strip()]


def extract_function_name(query):
    pattern = r'SELECT\s+(\w+)\s*\(([^)]*)\)\s*;\s*'
    match = re.search(pattern, query, re.IGNORECASE | re.DOTALL)
    if match:
        return match.group(1).strip()
    return None


def create_typed_query(query, func_name, results, type_results):
    if not type_results:
        return [f"# Original Query: {query}", f"# PySpark 3.5.5 Result: {results}", query]
    typed_args = []
    for key, spark_type in type_results.items():
        if key.startswith('typeof(') and key.endswith(')'):
            arg = key[7:-1]
            if spark_type.lower().strip() == "string":
                arg = f"'{arg}'"
            typed_args.append(f"{arg}::{spark_type}")
    typed_query = f"SELECT {func_name}({', '.join(typed_args)});"
    return [f"# Original Query: {query}", f"# PySpark 3.5.5 Result: {results}", typed_query]


def main():
    spark = SparkSession.builder.getOrCreate()
    function_dict = {}
    json_files = glob.glob(os.path.join(FUNCTIONS_PATH, "*.json"))
    num_queries = 0
    for file_path in json_files:
        with open(file_path) as f:
            data = json.load(f)
        directory_name = os.path.basename(file_path).removesuffix('.json')
        if directory_name not in function_dict:
            function_dict[directory_name] = {}
        for test in data["tests"]:
            if len(test["input"]["schema"]["fields"]) != 1:
                # Skip generator tests with multiple fields
                continue
            if "exception" in test:
                # Skip tests that are expected to raise exceptions
                continue
            query = test["input"]["query"].strip()
            arguments = extract_simple_function_arguments(query)
            if arguments is not None:
                func_name = extract_function_name(query)
                if func_name is not None:
                    func_call = re.sub('select', '', query, flags=re.IGNORECASE).strip().rstrip(';').strip()
                    if arguments:
                        typeof_parts = [f"typeof({arg})" for arg in arguments]
                        combined_query = f"SELECT {func_call}, typeof({func_call}), {', '.join(typeof_parts)};"
                    else:
                        combined_query = f"SELECT {func_call}, typeof({func_call});"
                    print(f"ORIGINAL QUERY: {query}\nRUNNING QUERY: {combined_query}")
                    try:
                        result = spark.sql(combined_query).collect()
                    except Exception as e:
                        if "CANNOT_PARSE_DATATYPE" in str(e):
                            print(f"Skipping query due to unsupported datatype: {e}")
                            continue
                        else:
                            raise
                    if len(result) != 1:
                        spark.stop()
                        error = f"Unexpected result length: {len(result)} for query: {combined_query}"
                        raise ValueError(error)
                    result_row = result[0]
                    result_dict = {}
                    type_results = {}
                    for i in range(len(result_row)):
                        col_name = result_row.__fields__[i]
                        result_dict[col_name] = result_row[i]
                        if i >= 2:
                            type_results[col_name] = result_row[i]
                    typed_query = create_typed_query(query, func_name, result_dict, type_results)
                    if func_name.lower() not in function_dict[directory_name]:
                        function_dict[directory_name][func_name.lower()] = []
                    function_dict[directory_name][func_name.lower()].append(typed_query)
                    num_queries += 1
    print(f"Processed {num_queries} queries from {len(json_files)} JSON files.")
    base_dir = os.path.join("tmp", "slt")
    for directory, functions in function_dict.items():
        dir_path = os.path.join(base_dir, directory)
        os.makedirs(dir_path, exist_ok=True)
        for func_name, queries in functions.items():
            file_path = os.path.join(dir_path, f"{func_name}.slt")
            with open(file_path, 'w') as f:
                f.write(LICENSE)
                f.write("\n")
                for query_data in queries:
                    f.write(f"#{query_data[0]}\n")
                    f.write(f"#{query_data[1]}\n")
                    f.write("#query\n")
                    f.write(f"#{query_data[2]}\n")
                    f.write("\n")
    spark.stop()
    return function_dict


Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shehabgamin appreciate if you can provide more insight on the PR details. It looks like a lot of tests commented out mostly from spark domain and PR rationale didn't give enough details. 🤔

@shehabgamin
Copy link
Contributor Author

Thanks @shehabgamin appreciate if you can provide more insight on the PR details. It looks like a lot of tests commented out mostly from spark domain and PR rationale didn't give enough details. 🤔

@comphead Sorry about that, I just updated the PR description!

@comphead
Copy link
Contributor

Got it @shehabgamin

I'm seeing a lot of slt tests like

#S


 #E
 #query
 #L

which not very explanatory.
For testing spark integration it would probably be interesting something we did in Comet https://github.com/apache/datafusion-comet/blob/6bf80b107cc1574cb7f259719d0aa203e387efc4/spark/src/test/scala/org/apache/comet/CometExpressionCoverageSuite.scala#L48

So the test takes recent samples from internal Spark examples and run it, comparing the actual(Spark provided) and expected values

@comphead
Copy link
Contributor

@shehabgamin
Copy link
Contributor Author

Got it @shehabgamin

I'm seeing a lot of slt tests like

#S


 #E
 #query
 #L

which not very explanatory. For testing spark integration it would probably be interesting something we did in Comet https://github.com/apache/datafusion-comet/blob/6bf80b107cc1574cb7f259719d0aa203e387efc4/spark/src/test/scala/org/apache/comet/CometExpressionCoverageSuite.scala#L48

So the test takes recent samples from internal Spark examples and run it, comparing the actual(Spark provided) and expected values

@comphead I fixed the script, nice catch!

https://github.com/apache/datafusion-comet/blob/6bf80b107cc1574cb7f259719d0aa203e387efc4/spark/src/test/scala/org/apache/comet/CometExpressionCoverageSuite.scala#L182

This is how to get Spark examples from their internal repo

Sail generates Gold data tests using tests extracted directly from the Spark code base. However, we cannot directly port the exact SQL queries from Sail because DataFusion and Spark interpret SQL data types differently (Sail speaks Spark SQL). So, when creating the slt tests in DataFusion, explicit type casts are required. We found that this was not straightforward for contributors to do, which is why we created this script to generate the slt tests with the necessary type casts as a basic starting point.

@comphead
Copy link
Contributor

comphead commented Jun 15, 2025

I'm personally intrigued tbh but I'd say the DF core should be agnostic of specific data-driven architecture(like Spark) even if we do a lot of Spark integration like Sail or Comet.

imho data-driven arch should be living and addressing in some bridge project which take care on Apache Spark specifics comparing to DF, including INT96, decimals, nested types, some null handlings, etc..

@alamb @andygrove as initiators WDYT?

@alamb
Copy link
Contributor

alamb commented Jun 15, 2025

I'm personally intrigued tbh but I'd say the DF core should be agnostic of specific data-driven architecture(like Spark) even if we do a lot of Spark integration like Sail or Comet.

imho data-driven arch should be living and addressing in some bridge project which take care on Apache Spark specifics comparing to DF, including INT96, decimals, nested types, some null handlings, etc..

@alamb @andygrove as initiators WDYT?

It is my opinion that both standalone .slt tests (such as are in this PR) and more substantial "run queries in both systems and compare" style integration tests (such as are in the comet repo) are needed.

The value of standalone .slt tests is they keep the barrier to contribution low(er) -- the hope is that we'll get the function library moved / ported over with community help and having the tests waiting will keep the context required by new contributors low.

As valuable as having tests that actually start Spark, etc (as done in Comet) are, putting them in the main DataFusion repo would make it even harder to contribute to DataFusion due to having to set up the dependencies, understand spark errors, etc.

@comphead if you are suggesting creating a new repository / project for running the integration tests I think that is quite an interesting idea, and maybe we can make a separate ticket.

BTW there is quite a bit of more discussion about spark functions testing strategy here for anyone else following along

Suggestion

What I suggest we do is update the README.md file https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/test_files/spark/README.md, explaining what is going on

Maybe we can add a section something like the following

## Implementation Status

Implementing the `datafusion-spark` compatible functions project is still a work in progress. Many of the tests in this directory are commented out and are waiting for help with implementation

For more information please see:
* [The `datafusion-spark` Epic](https://github.com/apache/datafusion/issues/15914)
* The porting script (see here for script)

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much @shehabgamin

I think this is is a great step forward -- I had a few suggestions (for more comments / readme) but be could also do that as a follow on PR.

@comphead I hope the rationale for .slt only tests makes sense. I agree it is not sufficient for complete testing but I feel it is enough to get the process started.

We should not merge this PR until we get consensus.

# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend adding another header to each of these files to make they more self describing (and direct people to the tickets to help)

Perhaps something like

# This file was originally created by <script_name> from <gold file>
# and is part of implementing the datafusion-spark function library
# please see https://github.com/apache/datafusion/issues/15914 
# for more information

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh great idea!

## Implementation Status

Implementing the `datafusion-spark` compatible functions project is still a work in progress.
Many of the tests in this directory are commented out and are waiting for help with implementation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shehabgamin On a separate note if we planning people to help contributing to this crate we probably need to attach a documentation or sample PR how to implement a function and test it with DF and Spark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@comphead Yep! The next step is to make a PR that shows a function being ported over and enabling the relevant .slt file.

shehabgamin and others added 3 commits June 16, 2025 23:47
Co-authored-by: Oleks V <comphead@users.noreply.github.com>
Co-authored-by: Oleks V <comphead@users.noreply.github.com>
Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @shehabgamin

@alamb alamb merged commit 056f546 into apache:main Jun 17, 2025
28 checks passed
@alamb
Copy link
Contributor

alamb commented Jun 17, 2025

Thanks again @shehabgamin and @comphead

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants