Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SparkDataSet with a relative local file path doesn't work on jupyter notebook/lab #47

Closed
gotin opened this issue Jul 8, 2019 · 9 comments

Comments

@gotin
Copy link

gotin commented Jul 8, 2019

Description

SparkDataSet with a relative local file path doesn't work on jupyter notebook/lab

Context

We can have a SparkDataSet entry which filepath has a relative local file path in catalog.yml which looks like this;

something:
  type: kedro.contrib.io.pyspark.SparkDataSet
  file_format: parquet
  filepath: data/01_intermediate/something.parquet
  save_args:
    mode: overwrite

And when something.parquet is placed under <project_directory>/data/01_intermediate/something.parquet properly, kedro run successfully can load this parquet as long as the pipeline uses the 'something' data.

But on a jupyter notebook invoked by kedro jupyter notebook command, the following script doesn't load 'something' as expected.

io.load('something')

Instead, it raises an exception looks like the following;

Py4JJavaError: An error occurred while calling o25.load.
: org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/go_kojima/sample_kedro_project/notebooks/data/01_intermediate/something.parquet

Reading spark_data_set.py (of kedro) and readwriter.py (of pyspark), I think it is caused by spark.read.load implementation. And apparently spark.read.load tries to read the data which is located at /Users/go_kojima/sample_kedro_project/notebooks/data/01_intermediate/something.parquet mistakenly, somehow, spark.read.load tries to resolve a given relative filepath referring from the directory of the notebook.

Steps to Reproduce

As I showed above,

  1. put some spark readable data on your local file system
  2. put a corresponding data entry using a relative file path on catalog.yml
  3. invoke jupyter notebook by using kedro jupyter notebook command at the root directory of the kedro project
  4. execute io.load('something') on a jupyter notebook

Expected Result

load a 'something' dataframe

Actual Result

raises exceptions

Py4JJavaError: An error occurred while calling o25.load.
: org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/go_kojima/sample_kedro_project/notebooks/data/01_intermediate/something.parquet

Your Environment

Include as many relevant details about the environment in which you experienced the bug:

  • Kedro version used (pip show kedro or kedro -V):
    kedro, version 0.14.1
    (anaconda3-2019.03)

  • Python version used (python -V):
    Python 3.7.3
    (anaconda3-2019.03)

  • Operating system and version:
    macOS Mojave version 10.14.5

My personal solution

Modifies the filepath as a absolute file path when a relative local file path is given in the SparkDataSet's init function code looks like the following;

class SparkDataSet(AbstractDataSet, ExistsMixin):

    def __init__(
        self,
        filepath: str,
        file_format: str = "parquet",
        load_args: Optional[Dict[str, Any]] = None,
        save_args: Optional[Dict[str, Any]] = None,
    ) -> None:
        import re
        from os.path import abspath, curdir

        ### original version
        # self._filepath = filepath

        ### modified version
        def is_relative_path(path):
            def is_url():
                url_pattern = r'^\S+://'
                return not not re.match(url_pattern, path)
            def is_abspath():
                abspath_pattern = r'^/'
                return not not re.match(abspath_pattern, path)
            return not(is_url() or is_abspath())

        def file_url(rpath):
            return 'file://%s/%s' % (abspath(curdir), rpath)

        self._filepath = file_url(filepath) if is_relative_path(filepath) else filepath

        self._file_format = file_format
        self._load_args = load_args if load_args is not None else {}
        self._save_args = save_args if save_args is not None else {}
@gotin gotin added the Issue: Bug Report 🐞 Bug that needs to be fixed label Jul 8, 2019
@DmitriiDeriabinQB
Copy link
Contributor

Hi @gotin, thank you for submitting this issue. The reason for this behaviour is that Jupyter Notebook/Lab server sets up current working directory to the directory where the notebook is saved, therefore relative paths in the catalog config now point to non-existing locations. Here are several options to mitigate this:

Option 1: You can run kedro jupyter notebook or kedro jupyter lab from your terminal. You need to be at the root of your projects for these commands to run successfully. kedro will set your current working directory to the project root regardless of your notebook location, so relative paths should continue working.

Option 2: Alternatively, if you require to use vanilla Jupyter Notebook/Lab server, you can have a look at kedro_project_loader.py here. Using this script should mitigate your issue for all notebooks inside your project.

Option 3: You can also fix this manually by changing the current working directory to the root of your project at the top of your notebook.

@DmitriiDeriabinQB DmitriiDeriabinQB added Type: Discussion and removed Issue: Bug Report 🐞 Bug that needs to be fixed labels Jul 8, 2019
@gotin
Copy link
Author

gotin commented Jul 8, 2019

@DmitriiDeriabinQB , thanks for replying!

As for Option1, I actually run kedro jupyter (notebook|lab) at the root directory of my project, but the result was what I wrote above. I didn't run kedro jupyter (notebook|lab) at the 'notebook' directory, it was the root directory of the project.
Actually I tried to run '!pwd' in a cell of a notebook, it shows the root directory of my project.

So option1 and option3 didn't work. option2 isn't what I require so far.

@gotin
Copy link
Author

gotin commented Jul 8, 2019

By the way, this happened only when I use SparkDataSet with relative local filepath. pandas type DataSets didn’t bring this issue even if relative local filepath were given.

@DmitriiDeriabinQB
Copy link
Contributor

So far I wasn't able to reproduce this issue, since io.load('something') works just fine with the following dataset definition:

something:
  type: kedro.contrib.io.pyspark.SparkDataSet
  file_format: parquet
  filepath: data/01_raw/something.parquet
  save_args:
    mode: overwrite

Please note that I've changed the folder from 01_intermediate to 01_raw as per default project structure, but it shouldn't affect anything.

This is somehow related to the dataset path definition for something since in the error message that you've posted there is a notebooks folder, which shouldn't be there unless your current working directory is /Users/go_kojima/sample_kedro_project/notebooks. So can you please start you notebook and run:

import os
print(os.getcwd())
print(io._data_sets['something']._filepath)
io.load('something')

And then paste the output here please?

@gotin
Copy link
Author

gotin commented Jul 8, 2019

The output was below;

/Users/go_kojima/dev/projects/AZ/az_pipeline
data/01_intermediate/something.parquet
2019-07-08 22:41:09,790 - kedro.io.data_catalog - INFO - Loading data from `something` (SparkDataSet)...
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
~/.pyenv/versions/anaconda3-2019.03/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

~/.pyenv/versions/anaconda3-2019.03/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o23.load.
: org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/go_kojima/dev/projects/AZ/az_pipeline/notebooks/data/01_intermediate/something.parquet;
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:355)
	at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
~/.pyenv/versions/anaconda3-2019.03/lib/python3.7/site-packages/kedro/io/core.py in load(self)
    204             logging.getLogger(__name__).debug("Loading %s", str(self))
--> 205             return self._load()
    206         except DataSetError:

~/.pyenv/versions/anaconda3-2019.03/lib/python3.7/site-packages/kedro/contrib/io/pyspark/spark_data_set.py in _load(self)
    132         return self._get_spark().read.load(
--> 133             self._filepath, self._file_format, **self._load_args
    134         )

~/.pyenv/versions/anaconda3-2019.03/lib/python3.7/site-packages/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    165         if isinstance(path, basestring):
--> 166             return self._df(self._jreader.load(path))
    167         elif path is not None:

~/.pyenv/versions/anaconda3-2019.03/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 

~/.pyenv/versions/anaconda3-2019.03/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):

AnalysisException: 'Path does not exist: file:/Users/go_kojima/dev/projects/AZ/az_pipeline/notebooks/data/01_intermediate/something.parquet;'

The above exception was the direct cause of the following exception:

DataSetError                              Traceback (most recent call last)
<ipython-input-1-735b34e1a33c> in <module>
      2 print(os.getcwd())
      3 print(io._data_sets['something']._filepath)
----> 4 io.load('something')

~/.pyenv/versions/anaconda3-2019.03/lib/python3.7/site-packages/kedro/io/data_catalog.py in load(self, name)
    244                 type(self._data_sets[name]).__name__,
    245             )
--> 246             return self._data_sets[name].load()
    247 
    248         raise DataSetNotFoundError("DataSet '{}' not found in the catalog".format(name))

~/.pyenv/versions/anaconda3-2019.03/lib/python3.7/site-packages/kedro/io/core.py in load(self)
    212                 str(self), str(exc)
    213             )
--> 214             raise DataSetError(message) from exc
    215 
    216     def save(self, data: Any) -> None:

DataSetError: Failed while loading data from data set SparkDataSet(file_format=parquet, filepath=data/01_intermediate/something.parquet, save_args={'mode': overwrite}).
'Path does not exist: file:/Users/go_kojima/dev/projects/AZ/az_pipeline/notebooks/data/01_intermediate/something.parquet;'

@gotin
Copy link
Author

gotin commented Jul 8, 2019

I'm gonna try to find a minimum way to reproduce this issue

@gotin
Copy link
Author

gotin commented Jul 9, 2019

I just realized that this issue doesn't happen on the other projects I made from scratch. Maybe something wrong hides in the project happening this issue, which I haven't found.
But anyway, I think I should close this issue. Once I found the cause of this issue and it may happen for other projects, I'll raise it as a new issue here.

@gotin gotin closed this as completed Jul 9, 2019
@DmitriiDeriabinQB
Copy link
Contributor

@gotin, thank you for looking into it. Please keep us updated if you manage to reproduce the issue.

@gotin
Copy link
Author

gotin commented Jul 10, 2019

I just found the cause of this issue.

In a python script(xxx.py) under [project dir]/src//nodes/, there was SparkSession initialization fragment which looks like the following;

# in xxx.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

This was the cause.

In [project dir]/src/[project name]/run.py, SparkSession initialization fragment codes were put, so kedro run command execution didn't suffer from this issue.

# in run.py

from pyspark.sql import SparkSession
def init_spark_session(aws_access_key=None, aws_secret_key=None):
    spark = (SparkSession.builder.master("local[*]")
             .appName("kedro")
             .config("spark.executor.memory", "24G")
             .config("spark.executor.cores", "10")
             .config('spark.driver.memory','4G')
             .config("spark.sql.execution.arrow.enabled", "true")
             .config("spark.driver.maxResultSize", "3g")
    return spark

def main(
    tags: Iterable[str] = None,
    env: str = None,
    runner: str = None,
):

    # Load Catalog
    conf = get_config(project_path=str(Path.cwd()), env=env)
    catalog = create_catalog(config=conf)

    spark = init_spark_session()
    # Load the pipeline
    pipeline = create_pipeline()
    pipeline = pipeline.only_nodes_with_tags(*tags) if tags else pipeline

But for notebook, the node function defininig python script having the SparkSession initialization showed above was loaded during notebook initialization process, so the SparkSession was initialized with the notebook's directory as the working directory.

To avoid this issue, I moved SparkSession initialization fragment into the inside of function definitions of the node function defining script, as like the following;

from .. import run

def func1(df, params):
  spark = run.init_spark_session()
  # some scripts using spark session

This solved the issue I have been facing.

I hope this comment helps some one facing same issue in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants