<a href="https://colab.research.google.com/github/nuttnice187/crashes/blob/main/sparkudf_cpp_binding.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction

Binding C++ code to a Spark UDF can yield significant performance gains compared to a standard PySpark UDF, but the actual benefit depends on how you integrate it and the workload characteristics.


In [1]:
!pip install pybind11

Collecting pybind11
  Downloading pybind11-3.0.2-py3-none-any.whl.metadata (10 kB)
Downloading pybind11-3.0.2-py3-none-any.whl (310 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.2/310.2 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pybind11
Successfully installed pybind11-3.0.2




`pybind11` is a lightweight, header-only library that exposes C++ types in Python and vice versa, mainly to create Python bindings of existing C++ code. Essentially, it allows you to call C++ functions and use C++ classes directly from Python, and also to call Python functions from C++.

Key features and benefits:

*   **Easy to Use:** It's designed to be very simple and intuitive, often requiring only a few lines of C++ code to expose an entire C++ class or function.
*   **Header-Only:** You don't need to compile `pybind11` itself; you just include its headers in your C++ project.
*   **Modern C++:** It leverages modern C++11 (or newer) features, making the binding code clean and efficient.
*   **Automatic Type Conversion:** It automatically handles conversions between many common C++ and Python data types (e.g., `std::vector` to Python lists, `std::string` to Python strings, C++ numbers to Python numbers).
*   **Low Overhead:** It's designed for high performance, with minimal overhead when calling between Python and C++.

# Define a function
Markdown highlights C++ syntax.

```cpp
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <vector>
#include <algorithm> // For std::max_element, std::find

bool hasPairSumMaxCpp(const std::vector<int>& elements) {
    // Create a mutable copy of the input elements vector.
    std::vector<int> mutable_elements = elements;

    // If the vector has fewer than 2 elements, no pair can be formed.
    if (mutable_elements.size() < 2) {
        return false;
    }

    // Find the maximum element in the copied vector.
    int max_val = *std::max_element(mutable_elements.begin(), mutable_elements.end());

    // Remove the first occurrence of this maximum element from the copied vector.
    auto it = std::find(mutable_elements.begin(), mutable_elements.end(), max_val);
    if (it != mutable_elements.end()) {
        mutable_elements.erase(it);
    }

    // After removing the max, check if we still have at least two elements to form a pair
    if (mutable_elements.size() < 2) {
        return false;
    }

    // Iterate through the remaining elements in the modified vector using nested loops.
    // Check if any two distinct elements sum up to the previously identified maximum element.
    for (size_t i = 0; i < mutable_elements.size(); ++i) {
        for (size_t j = i + 1; j < mutable_elements.size(); ++j) {
            if (mutable_elements[i] + mutable_elements[j] == max_val) {
                // If such a pair is found, return true immediately.
                return true;
            }
        }
    }

    // If no such pair is found after checking all possibilities, return false.
    return false;
}

namespace py = pybind11;

PYBIND11_MODULE(lib, m) {
    m.doc() = "pybind11 example plugin"; // optional module docstring

    // Update the m.def line for has_pair_sum_max_cpp to reflect the new signature and docstring.
    m.def("has_pair_sum_max_cpp", &hasPairSumMaxCpp, "Check if two distinct elements in the list sum up to the list's maximum, excluding one instance of the maximum itself.");
}
```

Python code highlight styling can make C++ hard to read. The `%%writefile <file_name>` command is a Jupyter (and thus Colab) cell magic.  Its primary purpose is to write the entire content of the cell, excluding the `%%writefile` line itself, to a specified file.

In [2]:
#@title Write C++ pybind plugin, `lib.cpp`
%%writefile lib.cpp
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <vector>
#include <algorithm> // For std::max_element, std::find

bool hasPairSumMaxCpp(const std::vector<int>& elements) {
    // Create a mutable copy of the input elements vector.
    std::vector<int> mutable_elements = elements;

    // If the vector has fewer than 2 elements, no pair can be formed.
    if (mutable_elements.size() < 2) {
        return false;
    }

    // Find the maximum element in the copied vector.
    int max_val = *std::max_element(mutable_elements.begin(), mutable_elements.end());

    // Remove the first occurrence of this maximum element from the copied vector.
    auto it = std::find(mutable_elements.begin(), mutable_elements.end(), max_val);
    if (it != mutable_elements.end()) {
        mutable_elements.erase(it);
    }

    // After removing the max, check if we still have at least two elements to form a pair
    if (mutable_elements.size() < 2) {
        return false;
    }

    // Iterate through the remaining elements in the modified vector using nested loops.
    // Check if any two distinct elements sum up to the previously identified maximum element.
    for (size_t i = 0; i < mutable_elements.size(); ++i) {
        for (size_t j = i + 1; j < mutable_elements.size(); ++j) {
            if (mutable_elements[i] + mutable_elements[j] == max_val) {
                // If such a pair is found, return true immediately.
                return true;
            }
        }
    }

    // If no such pair is found after checking all possibilities, return false.
    return false;
}

namespace py = pybind11;

PYBIND11_MODULE(lib, m) {
    m.doc() = "pybind11 example plugin"; // optional module docstring

    // Update the m.def line for has_pair_sum_max_cpp to reflect the new signature and docstring.
    m.def("has_pair_sum_max_cpp", &hasPairSumMaxCpp, "Check if two distinct elements in the list sum up to the list's maximum, excluding one instance of the maximum itself.");
}

Writing lib.cpp


The content directory shows the `.cpp` file.

In [3]:
%%bash
pwd
ls -a

/content
.
..
.config
lib.cpp
sample_data


### Build, compile shared-object module

In [4]:
!g++ --version

g++ (Ubuntu 11.4.0-1ubuntu1~22.04.2) 11.4.0
Copyright (C) 2021 Free Software Foundation, Inc.
This is free software; see the source for copying conditions.  There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.



The `g++` is the GNU C++ compiler, used in Bash to compile C++ source files into executables. It handles preprocessing, compilation, assembly, and linking in one step unless instructed otherwise.



In [5]:
%%bash
g++ -O3 -Wall -shared -std=c++17 -fPIC \
-I/usr/include/python3.12 \
-I/usr/local/lib/python3.12/dist-packages/pybind11/include lib.cpp \
-o lib.cpython-312-x86_64-linux-gnu.so

It's compiling your `lib.cpp` file into a Python-compatible shared library. Here's a breakdown of the command:

*   `-O3`: This flag enables a high level of optimization, aiming for faster execution of the compiled code.
*   `-Wall`: This flag turns on all common warning messages, which helps catch potential issues in the C++ code during compilation.
*   `-shared`: This is crucial for creating a shared library (also known as a shared object or `.so` file on Linux). Shared libraries can be dynamically linked by other programs, like Python in this case.
*   `-std=c++17`: Specifies that the C++ code should be compiled using the C++17 standard, which enables features introduced in that version of the language.
*   `-fPIC`: Stands for "Position-Independent Code." This is necessary when creating shared libraries, as it allows the code to be loaded at any memory address without modification.
*   `-I/usr/include/python3.12`: This flag adds the Python 3.12 include directory to the search path for header files. This is needed because `pybind11` relies on Python's C API.
*   `-I/usr/local/lib/python3.12/dist-packages/pybind11/include`: This adds the `pybind11` library's include directory to the search path, allowing the compiler to find `pybind11` headers.
*   `lib.cpp`: This is the input C++ source file that contains your `pybind11` module definition.
*   `-o lib.cpython-312-x86_64-linux-gnu.so`: This specifies the output file name for the compiled shared library. The naming convention `lib.<python_abi_tag>.so` (e.g., `lib.cpython-312-x86_64-linux-gnu.so`) is typical for Python extension modules, allowing Python to discover and import it as a module named `lib`.

In [6]:
%%bash
pwd
ls -a

/content
.
..
.config
lib.cpp
lib.cpython-312-x86_64-linux-gnu.so
sample_data


The content directory shows the `.so` file.

# Import built-in function from module

In [7]:
from lib import has_pair_sum_max_cpp

help(has_pair_sum_max_cpp)

Help on built-in function has_pair_sum_max_cpp in module lib:

has_pair_sum_max_cpp(...) method of pybind11_builtins.pybind11_detail_function_record_v1_system_libstdcpp_gxx_abi_1xxx_use_cxx11_abi_1 instance
    has_pair_sum_max_cpp(arg0: collections.abc.Sequence[typing.SupportsInt | typing.SupportsIndex]) -> bool

    Check if two distinct elements in the list sum up to the list's maximum, excluding one instance of the maximum itself.



In [8]:
has_pair_sum_max_cpp([23, 11, 12, 1, 2])

True

# Create test case expectation criterion

In [9]:
from enum import Enum

from pyspark.sql import Row, DataFrame, SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, ArrayType, BooleanType
)


class Stimpy(Enum):
    """
    Enumerate the test case attributes for Stimpy.
    """

    SAMPLE = [23, 11, 12, 1, 2]


class SpongeBob(Enum):
    """
    Enumerate the test case attributes for SpongeBob.
    """

    SAMPLE = [10, 8, 12, 1, 3]


class Zeldar(Enum):
    """
    Enumerate the test case attributes for Zeldar.
    """

    SAMPLE = [9, 7, 12, 1, 2]


class Generator:
    """
    Generate test data inputs and expectation for the test cases.
    """

    data: DataFrame

    def __init__(self, spark: SparkSession) -> None:
        """
        Initialize the Generator with a SparkSession.
        Args:
            spark (SparkSession): A SparkSession instance.
        """
        DATA = [
            Row(
                elements=employee.SAMPLE.value,
                expected=has_pair_sum_max_cpp(employee.SAMPLE.value)
            )
            for employee in (Stimpy, SpongeBob, Zeldar)
        ]

        SCHEMA = StructType([
            StructField("elements", ArrayType(IntegerType()), False),
            StructField("expected", BooleanType(), False),
        ])

        self.data = spark.createDataFrame(data=DATA, schema=SCHEMA)


In [10]:
spark = SparkSession.builder.config(
        key="spark.sql.execution.pythonUDF.arrow.enabled", value="true"
    ).getOrCreate()

Generator(spark).data.show()

+------------------+--------+
|          elements|expected|
+------------------+--------+
|[23, 11, 12, 1, 2]|    true|
| [10, 8, 12, 1, 3]|   false|
|  [9, 7, 12, 1, 2]|   false|
+------------------+--------+



# Translate the C++ binding into a UDF

In [11]:
from pandas import Series

from pyspark.sql.functions import pandas_udf, col
from pyspark.sql.types import BooleanType

@pandas_udf(BooleanType())
def checkMax(elements: Series) -> Series:
    """
    Applies the C++ function to each array of integers in the pandas Series.
    Args:
        elements (pd.Series): A pandas Series containing arrays of integers.
    Returns:
        pd.Series: A pandas Series containing the results of the C++ function.
    """
    return elements.apply(has_pair_sum_max_cpp)

# Compare actual udf with underlying C++  expectation

In [12]:
Generator(spark).data \
.withColumn("actual", checkMax("elements")) \
.show()

+------------------+--------+------+
|          elements|expected|actual|
+------------------+--------+------+
|[23, 11, 12, 1, 2]|    true|  true|
| [10, 8, 12, 1, 3]|   false| false|
|  [9, 7, 12, 1, 2]|   false| false|
+------------------+--------+------+



# Conclusion

Why bind C++ code?

## Avoids Python Overhead

Standard PySpark UDFs run in a separate Python process and require serialization/deserialization between JVM ↔ Python for every batch of rows.
C++ code can be bound directly to Spark via JNI (Java Native Interface) or Arrow-based native execution, avoiding the Python interpreter entirely.



## Vectorized Processing

If you implement your C++ logic to process batches of data (e.g., via Apache Arrow columnar format), you can operate on entire arrays at once instead of row-by-row.
This reduces function call overhead and leverages CPU cache locality.



## SIMD & Low-Level Optimizations

C++ allows use of SIMD intrinsics, memory pooling, and specialized algorithms that are not easily achievable in Python or even Java.
You can also use optimized libraries like Eigen, Boost, or Intel MKL.



## Better Memory Management

C++ gives you control over allocation and reuse of buffers, reducing GC pressure in the JVM.




## Typical Performance Gains

Standard Python UDF → C++ bound UDF:
5×–50× faster depending on:

Size of data batches
Complexity of computation
Whether vectorization is used


Pandas UDF (Arrow) → C++ bound UDF:
Gains are smaller (maybe 2×–5×) because Pandas UDFs already use Arrow for batch transfer, but C++ still avoids Python’s runtime overhead.


## Integration Approaches


### JNI-based UDF (Java ↔ C++ binding)

Write a Java wrapper that calls native C++ functions via System.loadLibrary.
Register the Java method as a Spark SQL UDF.
Best for tight integration and minimal overhead.



### Apache Arrow Flight / Gandiva

Use Arrow’s Gandiva engine to JIT-compile C++ expressions and run them inside Spark.
Ideal for vectorized, columnar operations.



### Spark Native Functions Plugin

Implement as a Spark native expression in C++ and register it as a Catalyst expression.
This is the fastest but requires modifying Spark internals.




### Example:
#### JNI-bound C++ UDF
Here’s a minimal example of binding C++ to Spark via Java:
```cpp
#include <jni.h>
#include <cmath>

extern "C" JNIEXPORT jdouble JNICALL
Java_com_example_NativeLib_myUdf(JNIEnv* env, jobject obj, jdouble value) {
    return std::sqrt(value) + 42.0; // Example computation
}
```

#### Java Wrapper
```java
package Java.com.example;

public class NativeLib {
    static {
        System.loadLibrary("native"); // Loads libnative.so
    }
    public static native double myUdf(double value);
}
```

#### Spark Registration
```java
spark.udf().register("myUdf", (Double x) -> NativeLib.myUdf(x), DataTypes.DoubleType);
```


## Best Practices for Maximum Speed

*   Batch process: Pass arrays/vectors to C++ instead of single values.

*   Minimize JNI calls: JNI overhead per call can be high; process in chunks.
*   Use Arrow columnar format for zero-copy data transfer.
*   Preallocate buffers to avoid repeated allocations.
*   Profile with realistic datasets before and after migration.

If your Spark UDF is CPU-bound and currently implemented in Python, moving it to C++ via JNI or Arrow can yield order-of-magnitude performance improvements, especially if you process data in vectorized batches.
