Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cannot set globals_dict or globals.yml for template configloader when deploying on AWS step functions #2860

Closed
Harsh-Maheshwari opened this issue Jul 29, 2023 · 8 comments
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed

Comments

@Harsh-Maheshwari
Copy link

Description

Below is the configuration for my settings file :

CONFIG_LOADER_CLASS = TemplatedConfigLoader
CONFIG_LOADER_ARGS = {
    "globals_dict": {
        "s3_bucket_name": "s3://ABC/",
        "report_to_generate": os.getenv("REPORT_NAME")
    }
}

Note: I need REPORT_NAME variable to be different every time the AWS Step function is called as this affects my catalog

My Catalog file looks like this:

report_df:
  <<: *csv
  filepath: ${s3_bucket_name}/${report_to_generate}_input.csv
  credentials: dev_s3

I can not use this setup when deploying the project on AWS Step Functions as I am not able to set the environment variable with every new invocation of the Step Function and for all the lambdas involved in it

Expected Result

I should be able to add a new globals dict every time I call the AWS step function

Your Environment

  • Kedro version : kedro==0.18.7
  • Python version : 3.10
  • Operating system and version: Ubuntu 22.04 on EC2
@noklam
Copy link
Contributor

noklam commented Jul 30, 2023

@Harsh-Maheshwari This should solve your problem. #1782 (comment)

class MyTemplatedConfigLoader(TemplatedConfigLoader):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if self.runtime_params:
            self._config_mapping.update(self.runtime_params)

CONFIG_LOADER_CLASS = MyTemplatedConfigLoader # TemplatedConfigLoader
CONFIG_LOADER_ARGS = {
    "globals_dict": {
        "s3_bucket_name": "s3://ABC/",
        "report_to_generate": os.getenv("REPORT_NAME")
    }
}

@noklam noklam added the Issue: Bug Report 🐞 Bug that needs to be fixed label Jul 30, 2023
@noklam
Copy link
Contributor

noklam commented Jul 30, 2023

Note for team discussion:

I cannot find the issue (maybe it's not created)? We should merge the runtime_params for TemplatedConfigLoader, as we are doing it for OmegaConfigLoader. #2531

@Harsh-Maheshwari
Copy link
Author

@noklam
Thanks for the response, I think this would enable me to overwrite the "report_to_generate": os.getenv("REPORT_NAME") with the value provided in --params via the cli or via the extra_params in the session object.

But this would still not solve the issue for AWS Step functions. From the documentation I gather the lambda handler should be defined as :

from unittest.mock import patch
def handler(event, context):
    from kedro.framework.project import configure_project
    configure_project("spaceflights_step_functions")
    node_to_run = event["node_name"]
    with patch("multiprocessing.Lock"):
        from kedro.framework.session import KedroSession
        with KedroSession.create(env="aws") as session:
            session.run(node_names=[node_to_run])

And now if I want to add runtime parameters to the session object I need to pass them as input to the Step Function (and each lambda consecutively), but there is no such option given in thedocumentation

please note this piece of code from the documentation below:

# **I can not add any input variable taken from lets say AWS console for Step functions** 
        sfn.StateMachine(
            self,
            self.project_name,
            definition=definition,
            timeout=core.Duration.seconds(5 * 60),
        )

The complete deploy.py as given on Kedro Documentation

import re
from pathlib import Path

from aws_cdk import aws_stepfunctions as sfn
from aws_cdk import aws_s3 as s3
from aws_cdk import core, aws_lambda, aws_ecr
from aws_cdk.aws_lambda import IFunction
from aws_cdk.aws_stepfunctions_tasks import LambdaInvoke
from kedro.framework.project import pipelines
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.pipeline.node import Node


def _clean_name(name: str) -> str:
    """Reformat a name to be compliant with AWS requirements for their resources.

    Returns:
        name: formatted name.
    """
    return re.sub(r"[\W_]+", "-", name).strip("-")[:63]


class KedroStepFunctionsStack(core.Stack):
    """A CDK Stack to deploy a Kedro pipeline to AWS Step Functions."""

    env = "aws"
    project_path = Path.cwd()
    erc_repository_name = project_path.name
    s3_data_bucket_name = (
        "spaceflights-step-functions"  # this is where the raw data is located
    )

    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        self._parse_kedro_pipeline()
        self._set_ecr_repository()
        self._set_ecr_image()
        self._set_s3_data_bucket()
        self._convert_kedro_pipeline_to_step_functions_state_machine()

    def _parse_kedro_pipeline(self) -> None:
        """Extract the Kedro pipeline from the project"""
        metadata = bootstrap_project(self.project_path)

        self.project_name = metadata.project_name
        self.pipeline = pipelines.get("__default__")

    def _set_ecr_repository(self) -> None:
        """Set the ECR repository for the Lambda base image"""
        self.ecr_repository = aws_ecr.Repository.from_repository_name(
            self, id="ECR", repository_name=self.erc_repository_name
        )

    def _set_ecr_image(self) -> None:
        """Set the Lambda base image"""
        self.ecr_image = aws_lambda.EcrImageCode(repository=self.ecr_repository)

    def _set_s3_data_bucket(self) -> None:
        """Set the S3 bucket containing the raw data"""
        self.s3_bucket = s3.Bucket(
            self, "RawDataBucket", bucket_name=self.s3_data_bucket_name
        )

    def _convert_kedro_node_to_lambda_function(self, node: Node) -> IFunction:
        """Convert a Kedro node into an AWS Lambda function"""
        func = aws_lambda.Function(
            self,
            id=_clean_name(f"{node.name}_fn"),
            description=str(node),
            code=self.ecr_image,
            handler=aws_lambda.Handler.FROM_IMAGE,
            runtime=aws_lambda.Runtime.FROM_IMAGE,
            environment={},
            function_name=_clean_name(node.name),
            memory_size=256,
            reserved_concurrent_executions=10,
            timeout=core.Duration.seconds(15 * 60),
        )
        self.s3_bucket.grant_read_write(func)
        return func

    def _convert_kedro_node_to_sfn_task(self, node: Node) -> LambdaInvoke:
        """Convert a Kedro node into an AWS Step Functions Task"""
        return LambdaInvoke(
            self,
            _clean_name(node.name),
            lambda_function=self._convert_kedro_node_to_lambda_function(node),
            payload=sfn.TaskInput.from_object({"node_name": node.name}), 
        )

    def _convert_kedro_pipeline_to_step_functions_state_machine(self) -> None:
        """Convert Kedro pipeline into an AWS Step Functions State Machine"""
        definition = sfn.Pass(self, "Start")

        for i, group in enumerate(self.pipeline.grouped_nodes, 1):
            group_name = f"Group {i}"
            sfn_state = sfn.Parallel(self, group_name)
            for node in group:
                sfn_task = self._convert_kedro_node_to_sfn_task(node)
                sfn_state.branch(sfn_task)

            definition = definition.next(sfn_state)

        sfn.StateMachine(
            self,
            self.project_name,
            definition=definition,
            timeout=core.Duration.seconds(5 * 60),
        )


app = core.App()
KedroStepFunctionsStack(app, "KedroStepFunctionsStack")
app.synth()

@noklam
Copy link
Contributor

noklam commented Jul 30, 2023

I suspect the multiprocessing.Lock patch is no longer needed, could you try and report back?

Could you simply update the handler definition?

with KedroSession.create(env="aws",extra_params=<your_params>) as session:
            session.run(node_names=[node_to_run])

If this is not an option, I suppose you can always add this into the event object

    def _convert_kedro_node_to_sfn_task(self, node: Node) -> LambdaInvoke:
        """Convert a Kedro node into an AWS Step Functions Task"""
        return LambdaInvoke(
            self,
            _clean_name(node.name),
            lambda_function=self._convert_kedro_node_to_lambda_function(node),
            payload=sfn.TaskInput.from_object({"node_name": node.name}),  # Insert the extra information you need and retrieve it from `handler`?
        )

@Harsh-Maheshwari
Copy link
Author

For every new REPORT I want to generate I will need to deploy the step function again and again,
I was hoping to use the input json in the step function to just change to whichever report I want to generate.

@merelcht
Copy link
Member

Note for team discussion:

I cannot find the issue (maybe it's not created)? We should merge the runtime_params for TemplatedConfigLoader, as we are doing it for OmegaConfigLoader. #2531

@noklam It's this issue I think #2640

@Harsh-Maheshwari
Copy link
Author

@noklam
multiprocessing.Lock patch is no longer needed, I have tried and run a lambda function without it.
And I was able to pass the parameters to the event object and pass them to each lambda function in the Step function
Thanks for that.

@noklam @merelcht

We should merge the runtime_params for TemplatedConfigLoader, as we are doing it for OmegaConfigLoader

Since from kedro 0.19 we are going to shift to OmegaConfigLoader, I decided to use that in my project,
For the Omega Conf Loader I don't think I can use --params to overwrite catalog parameters like "_report_name".
It only works for parameters.yml type of files, not the catalog.yml with variables defined as "_report_name" inside it

@merelcht
Copy link
Member

merelcht commented Sep 6, 2023

The TemplatedConfigLoader will be removed in Kedro 0.19.0. We are currently focussed on making the OmegaConfigLoader fully fit for purpose, so please report any issues you have with it so we can fix it before the 0.19.0 release. I'll close this issue as not planned.

@merelcht merelcht closed this as not planned Won't fix, can't repro, duplicate, stale Sep 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Issue: Bug Report 🐞 Bug that needs to be fixed
Projects
Archived in project
Development

No branches or pull requests

3 participants