Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PySpark persist StorageLevel.MEMORY_AND_DISK) using kedro #2765

Closed
javier-rosas opened this issue Jul 5, 2023 · 4 comments
Closed

PySpark persist StorageLevel.MEMORY_AND_DISK) using kedro #2765

javier-rosas opened this issue Jul 5, 2023 · 4 comments
Labels
Community Issue/PR opened by the open-source community Issue: Feature Request New feature or improvement to existing feature

Comments

@javier-rosas
Copy link

javier-rosas commented Jul 5, 2023

Description

I need to persist data in memory using PySpark's StorageLevel class: from pyspark import StorageLevel. I am aware of the MemoryDataSet type, but I am running a Databricks cluster with Spark. Unfortunately, the dataframes are too big for MemoryDataSet, so I was hoping I could use PySparks StorageLevel class.

Context

Here is an example of a possible implementation. Is this possible? Notice the use of result.persist(StorageLevel.MEMORY_AND_DISK) in the node functions.

from pyspark import StorageLevel
from kedro.pipeline import node, Pipeline

# Define your functions
def node1_function(input_data):
    # Your function implementation goes here
    result = perform_some_operation(input_data)
    result.persist(StorageLevel.MEMORY_AND_DISK)
    return result

def node2_function(data_from_node1):
    # Your function implementation goes here
    result = perform_some_other_operation(data_from_node1)
    result.persist(StorageLevel.MEMORY_AND_DISK)
    return result

def node3_function(data_from_node2):
    # Your function implementation goes here
    pass

# Define your pipeline
def create_pipeline():
    # Define your nodes
    node1 = node(
        func=node1_function,
        inputs="input_data",  # This should match the name in your DataCatalog
        outputs="node1_output"  # This will be in-memory
    )

    node2 = node(
        func=node2_function,
        inputs="node1_output",  # This should match the output of node1
        outputs="node2_output"  # This will be in-memory
    )

    node3 = node(
        func=node3_function,
        inputs="node2_output",  # This should match the output of node2
        outputs="node3_output"  # This should match the name in your DataCatalog
    )

    return Pipeline([node1, node2, node3])
@javier-rosas javier-rosas added the Issue: Feature Request New feature or improvement to existing feature label Jul 5, 2023
@AhdraMeraliQB AhdraMeraliQB added the Community Issue/PR opened by the open-source community label Jul 6, 2023
@astrojuanlu
Copy link
Member

Hello @javier-rosas, sorry for the delay. In principle I don't see anything in your code that should fail, but I am not a PySpark expert. Could you please try it out and let us know if it works?

@ruben-s
Copy link

ruben-s commented Dec 7, 2023

Hi @javier-rosas,

I'm researching the use of spark in combination with kedro and came on your issue.

If I understand correct, then "the dataframes are too big for MemoryDataSet" would indicate that the dataframe is retrieved from the spark cluster to the driver program (which runs kedro), and the memory available to the driver is insufficient to contain the dataset.

Using the pyspark StorageLevel class, would result in the dataframe not being 'downloaded' to the driver program (running Kedro), but persisted from the spark cluster itself.
Am I correct in this understanding?

If yes, then I suspect that using the catalog will always result in dataframes being 'sent to' and 'pulled from' the cluster towards the driver program running kedro? (Unless you use dummy/memory datasets - documented in https://docs.kedro.org/en/stable/integrations/pyspark_integration.html#spark-and-delta-lake-interaction -> i.e. data operations outside of the kedro DAG).
It seems that the pyspark StorageLevel Class persist method is such a data operation outside of the kedro DAG?

Any feed-back welcome.

Best regards,
Ruben

@merelcht
Copy link
Member

merelcht commented Jul 8, 2024

Hi @javier-rosas, do you still need help resolving this issue?

@merelcht
Copy link
Member

I'm closing this issue now. Feel free to re-open if the problem persists!

@merelcht merelcht closed this as not planned Won't fix, can't repro, duplicate, stale Jul 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community Issue: Feature Request New feature or improvement to existing feature
Projects
Archived in project
Development

No branches or pull requests

5 participants