Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to configure the spark session #3545

Closed
gpierard opened this issue Jan 23, 2024 · 4 comments
Closed

Unable to configure the spark session #3545

gpierard opened this issue Jan 23, 2024 · 4 comments
Labels
Community Issue/PR opened by the open-source community

Comments

@gpierard
Copy link

gpierard commented Jan 23, 2024

Description

I am unable to set the kedro spark session config via spark.yml

Context / steps to reproduce

On Kedro 0.17.6, I am trying to setup a spark session with the following config, but it seems that this config is not correctly applied to the spark session, which causes out of memory errors.

# conf/base/spark.yml
spark.scheduler.mode: FAIR
spark.driver.memory: 15g
spark.executor.memory: 15g
spark.executor.cores: 10
spark.driver.memory: 100g
spark.executor.memory: 20g
spark.driver.maxResultSize: 20g
spark.ui.port: xxxx
spark.ui.enabled: true]
# src/mypackage/hooks.py
class SparkHooks:
    @hook_impl
    def after_context_created(self, context) -> None:
        """Initialises a SparkSession using the config
        defined in project's conf folder.
        """

        # Load the spark configuration in spark.yaml using the config loader
        parameters = context.config_loader.get("spark*", "spark*/**")

        spark_conf = SparkConf().setAll(parameters.items())
        # print("spark_conf" + spark_conf.toDebugString())
        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder.appName(context.project_path.name)
            .master("local[50]")  # Set local mode with 6 CPU cores
            .enableHiveSupport()
            .config(conf=spark_conf)
        )
        _spark_session = spark_session_conf.getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")
src/mypackage/settings.py
"""Project settings."""
from mypackage.hooks import PipelineMonitoringHooks, ProjectHooks, SparkHooks

# Instantiate and list your project hooks here
HOOKS = (ProjectHooks(), PipelineMonitoringHooks(), SparkHooks(),)

Steps to Reproduce

kedro ipython
spark  = catalog.datasets.mydataset._get_spark()
spark.conf.get('spark.driver.maxResultSize')
# java.util.NoSuchElementException: spark.driver.maxResultSize

Note that context.config_loader.get("spark*", "spark*/**") returns correctly

{'spark.driver.maxResultSize': '20g',
 'spark.scheduler.mode': 'FAIR',
 'spark.driver.memory': '100g',
 'spark.executor.memory': '20g',
 'spark.executor.cores': 10,
 'spark.ui.port': xxxx,
 'spark.ui.enabled': True}

How to correctly apply this config to the spark session please?
related: stackoverflow question

Your Environment

  • Kedro version used (pip show kedro or kedro -V): 0.17.6
  • Python version used (python -V): 3.8.5
  • Operating system and version: Ubuntu 20.04.6 LTS
@gpierard gpierard changed the title Unable to set the kedro session config Unable to set the spark session config Jan 23, 2024
@gpierard gpierard changed the title Unable to set the spark session config Unable to configure the spark session Jan 23, 2024
@gpierard gpierard reopened this Jan 23, 2024
@astrojuanlu
Copy link
Member

@gpierard
Copy link
Author

gpierard commented Jan 23, 2024

Hi @gpierard, is this related to kedro-org/kedro-starters#210, https://stackoverflow.com/q/77859763/554319 and https://stackoverflow.com/q/77846687/554319 ?

Hi, it is related to both stackoverflow questions but the kedro starters issue is fully separate (and now solved).
This issue here is fully related to spark issue on 0.17.6 and yes I agree it is very old, but that's the current version we have at work.

One other funny thing is, if I unregister the spark hooks, and basically remove any spark-related config in settings.py and hooks.py, I still get the same behaviour ! A spark session is started somewhere, it (obviously) doesn't have the spark.yml configuration applied and I have no idea how and what creates it inside kedro ipython. I have removed references to any custom context as well.

# src/packagename/hooks.py
"""Project hooks."""
import sys
from glob import glob
from pathlib import Path
from typing import Any, Dict, Iterable, Optional

import statsd
from kedro.config import ConfigLoader, TemplatedConfigLoader
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
from kedro.pipeline.node import Node
from kedro.versioning import Journal
from pyspark import SparkConf
from pyspark.sql import SparkSession

class ProjectHooks:
    @hook_impl
    def register_config_loader(
        self, conf_paths: Iterable[str], env: str, extra_params: Dict[str, Any]
    ) -> ConfigLoader:
        return TemplatedConfigLoader(
            conf_paths + glob(str(Path(__file__).parent) + "/**/conf", recursive=True),
            globals_pattern="*globals.yml",  # read the globals dictionary from project config
        )

    @hook_impl
    def register_catalog(
        self,
        catalog: Optional[Dict[str, Dict[str, Any]]],
        credentials: Dict[str, Dict[str, Any]],
        load_versions: Dict[str, str],
        save_version: str,
        journal: Journal,
    ) -> DataCatalog:
        return DataCatalog.from_config(
            catalog, credentials, load_versions, save_version, journal
        )

class PipelineMonitoringHooks:
    def __init__(self):
        self._timers = {}
        self._client = statsd.StatsClient(prefix="kedro")

    @hook_impl
    def before_node_run(self, node: Node) -> None:
        node_timer = self._client.timer(node.name)
        node_timer.start()
        self._timers[node.short_name] = node_timer

    @hook_impl
    def after_node_run(
        self, node: Node, inputs: Dict[str, Any], outputs: Dict[str, Any]
    ) -> None:
        self._timers[node.short_name].stop()
        for dataset_name, dataset_value in inputs.items():
            self._client.gauge(dataset_name + "_size", sys.getsizeof(dataset_value))

    @hook_impl
    def after_pipeline_run(self):
        self._client.incr("run")

# REMOVED ALL SPARK CONFIG
# class SparkHooks:
#     @hook_impl
#     def after_context_created(self, context) -> None:
#         """Initialises a SparkSession using the config
#         defined in project's conf folder.
#         """

#         # Load the spark configuration in spark.yaml using the config loader
#         parameters = context.config_loader.get("spark*", "spark*/**")

#         spark_conf = SparkConf().setAll(parameters.items())
#         # print("spark_conf" + spark_conf.toDebugString())
#         # Initialise the spark session
#         spark_session_conf = (
#             SparkSession.builder.appName(context.project_path.name)
#             .master("local[50]")  # Set local mode with 6 CPU cores
#             .enableHiveSupport()
#             .config(conf=spark_conf)
#         )
#         _spark_session = spark_session_conf.getOrCreate()
#         _spark_session.sparkContext.setLogLevel("INFO")

"""Project settings."""
from <packagename>.hooks import PipelineMonitoringHooks, ProjectHooks #, SparkHooks # REMOVED SPARKHOOKS REFERENCE
# from <packagename>.custom_context import KedroContext # NO CUSTOM CONTEXT

# Instantiate and list your project hooks here
HOOKS = (ProjectHooks(), PipelineMonitoringHooks() ) #, SparkHooks(),) # REMOVED SPARKHOOKS REFERENCE

# List the installed plugins for which to disable auto-registry
# DISABLE_HOOKS_FOR_PLUGINS = ("kedro-viz",)

# Define where to store data from a KedroSession. Defaults to BaseSessionStore.
# from kedro.framework.session.store import ShelveStore
# SESSION_STORE_CLASS = ShelveStore

# Define keyword arguments to be passed to `SESSION_STORE_CLASS` constructor
# SESSION_STORE_ARGS = {
#     "path": "./sessions"
# }

# Define custom context class. Defaults to `KedroContext`
# CONTEXT_CLASS = KedroContext

# Define the configuration folder. Defaults to `conf`
# CONF_ROOT = "conf"
# src/spark.yml
spark.driver.maxResultSize: 30g
spark.scheduler.mode: FAIR
spark.driver.memory: 15g
spark.executor.memory: 15g
spark.executor.cores: 10
spark.driver.memory: 100g
spark.executor.memory: 20g
spark.driver.maxResultSize: 20g
spark.ui.port: xxxx
spark.ui.enabled: true

No idea what creates the spark session but it seems to be initiated by catalog.load or catalog.datasets.mydataset._get_spark(). Any help welcome.

kedro ipython
spark  = catalog.datasets.mydataset._get_spark()
spark.conf.get('spark.driver.maxResultSize')
# java.util.NoSuchElementException: spark.driver.maxResultSize

@astrojuanlu astrojuanlu added the Community Issue/PR opened by the open-source community label Jan 24, 2024
@merelcht
Copy link
Member

Hi @gpierard , do you still need help solving this issue?

@merelcht
Copy link
Member

I'm closing this issue for now. Feel free to re-open if this is still an issue you need help with.

@merelcht merelcht closed this as not planned Won't fix, can't repro, duplicate, stale Mar 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Community Issue/PR opened by the open-source community
Projects
Archived in project
Development

No branches or pull requests

3 participants