Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raydp.init_spark fails #89

Closed
yanivg10 opened this issue Feb 20, 2021 · 33 comments
Closed

raydp.init_spark fails #89

yanivg10 opened this issue Feb 20, 2021 · 33 comments

Comments

@yanivg10
Copy link

This line of code fails:
spark = raydp.init_spark(app_name="RayDP example",
num_executors=2,
executor_cores=2,
executor_memory="4GB")

I am getting the following errors (linux server with a standalone Spark cluster):

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.deploy.raydp.AppMasterEntryPoint.main(AppMasterEntryPoint.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.internal.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 13 more
Traceback (most recent call last):
File "/home/guryaniv/try_raydp_simple.py", line 4, in
spark = raydp.init_spark(app_name="RayDP example", num_executors=2, executor_cores=2, executor_memory="4GB")
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/context.py", line 122, in init_spark
return _global_spark_context.get_or_create_session()
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/context.py", line 68, in get_or_create_session
spark_cluster = self._get_or_create_spark_cluster()
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/context.py", line 62, in _get_or_create_spark_cluster
self._spark_cluster = SparkCluster()
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster.py", line 31, in init
self._set_up_master(None, None)
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster.py", line 37, in _set_up_master
self._app_master_bridge.start_up()
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster_master.py", line 52, in start_up
self._gateway = self._launch_gateway(extra_classpath, popen_kwargs)
File "/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster_master.py", line 115, in _launch_gateway
raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number

To run the code I am using spark-submit with the spark master:
spark-submit --master spark://:7077 raydp_example.py

@ConeyLiu
Copy link
Collaborator

Hi @yanivg10. In RayDP, we running spark on ray which means we do not need another resource manager (eg: standalone, yarn).

# first step, connect or init local ray cluster
ray.init(...)
# second step, startup spark cluster on top of the ray cluster
spark = raydp.init_spark(...)
# third step, using spark as normal
spark....
# stop spark
raydp.stop_spark(...)

And also, you should keep your SPARK_HOME environments should be compatible with the raydp required. Actually, you do not need to set SPARK_HOME because we use pyspark.

@yanivg10
Copy link
Author

I already tried these steps using the spark on ray setup and could not get init_spark to work. ray.init() works fine.
raydp.init_spark() throws the following exception (I am running the code within a conda "raydp" virtual environment I created):

Exception Traceback (most recent call last)
in
4 cores_per_executor = 1
5 memory_per_executor = "2GB"
----> 6 spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)

/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/context.py in init_spark(app_name, num_executors, executor_cores, executor_memory, configs)
120 _global_spark_context = _SparkContext(
121 app_name, num_executors, executor_cores, executor_memory, configs)
--> 122 return _global_spark_context.get_or_create_session()
123 except:
124 _global_spark_context = None

/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/context.py in get_or_create_session(self)
66 if self._spark_session is not None:
67 return self._spark_session
---> 68 spark_cluster = self._get_or_create_spark_cluster()
69 self._spark_session = spark_cluster.get_spark_session(
70 self._app_name,

/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/context.py in _get_or_create_spark_cluster(self)
60 if self._spark_cluster is not None:
61 return self._spark_cluster
---> 62 self._spark_cluster = SparkCluster()
63 return self._spark_cluster
64

/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster.py in init(self)
29 super().init(None)
30 self._app_master_bridge = None
---> 31 self._set_up_master(None, None)
32 self._spark_session: SparkSession = None
33

/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster.py in _set_up_master(self, resources, kwargs)
35 # TODO: specify the app master resource
36 self._app_master_bridge = RayClusterMaster()
---> 37 self._app_master_bridge.start_up()
38
39 def _set_up_worker(self, resources: Dict[str, float], kwargs: Dict[str, str]):

/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster_master.py in start_up(self, popen_kwargs)
50 return
51 extra_classpath = os.pathsep.join(self._prepare_jvm_classpath())
---> 52 self._gateway = self._launch_gateway(extra_classpath, popen_kwargs)
53 self._app_master_java_bridge = self._gateway.entry_point.getAppMasterBridge()
54 self._set_properties()

/opt/anaconda3/envs/raydp/lib/python3.6/site-packages/raydp/spark/ray_cluster_master.py in _launch_gateway(self, class_path, popen_kwargs)
113
114 if not os.path.isfile(conn_info_file):
--> 115 raise Exception("Java gateway process exited before sending its port number")
116
117 with open(conn_info_file, "rb") as info:

Exception: Java gateway process exited before sending its port number

@ConeyLiu
Copy link
Collaborator

@yanivg10 could you check whether there are any other pyspark programming? And check the port 25333 whether is occupied (with lsof -i:25333).

@yanivg10
Copy link
Author

This port is not occupied and there are no pyspark programs running

@ConeyLiu
Copy link
Collaborator

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging

This seems like you have an incompatible spark with raydp. raydp requires spark 3.0.0 or 3.0.1.

@yanivg10
Copy link
Author

I am using Pyspark 3.0.1

pyspark.version
'3.0.1'

@ConeyLiu
Copy link
Collaborator

Could you provide more logs or messages? I tested locally with ubuntu and works fine.

In [1]: import raydp

In [2]: raydp.__version__
Out[2]: '0.1.1'

In [3]: import pyspark

In [4]: pyspark.__version__
Out[4]: '3.0.1'

In [5]: spark = raydp.init_spark("test", 1, 1, "2g")
2021-02-20 11:22:03,084 INFO services.py:1195 -- View the Ray dashboard at http://127.0.0.1:8265
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/lxy/miniconda3/envs/ray/lib/python3.7/site-packages/ray-2.0.0.dev0-py3.7-linux-x86_64.egg/ray/jars/ray_dist.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/lxy/miniconda3/envs/ray/lib/python3.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-02-20 11:22:06 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/02/20 11:22:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
(raylet) SLF4J: Class path contains multiple SLF4J bindings.
(raylet) SLF4J: Found binding in [jar:file:/home/lxy/miniconda3/envs/ray/lib/python3.7/site-packages/ray-2.0.0.dev0-py3.7-linux-x86_64.egg/ray/jars/ray_dist.jar!/org/slf4j/impl/StaticLoggerBinder.class]
(raylet) SLF4J: Found binding in [jar:file:/home/lxy/miniconda3/envs/ray/lib/python3.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
(raylet) SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
(raylet) SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

In [6]: spark.range(0, 10).count()
Out[6]: 10

In [7]:

@yanivg10
Copy link
Author

I tried your example and here is what I get:

spark = raydp.init_spark("test", 1, 1, "2g")
2021-02-19 21:34:36,619 INFO services.py:1174 -- View the Ray dashboard at http://127.0.0.1:8266
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/ray/jars/ray_dist.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2021-02-19 21:34:42 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Error: Missing application resource.

Usage: spark-submit [options] <app jar | python file | R file> [app arguments]
Usage: spark-submit --kill [submission ID] --master [spark://...]
Usage: spark-submit --status [submission ID] --master [spark://...]
Usage: spark-submit run-example [options] example-class [example args]

Options:
--master MASTER_URL spark://host:port, mesos://host:port, yarn,
k8s://https://host:port, or local (Default: local[*]).
--deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
on one of the worker machines inside the cluster ("cluster")
(Default: client).
--class CLASS_NAME Your application's main class (for Java / Scala apps).
--name NAME A name of your application.
--jars JARS Comma-separated list of jars to include on the driver
and executor classpaths.
--packages Comma-separated list of maven coordinates of jars to include
on the driver and executor classpaths. Will search the local
maven repo, then maven central and any additional remote
repositories given by --repositories. The format for the
coordinates should be groupId:artifactId:version.
--exclude-packages Comma-separated list of groupId:artifactId, to exclude while
resolving the dependencies provided in --packages to avoid
dependency conflicts.
--repositories Comma-separated list of additional remote repositories to
search for the maven coordinates given with --packages.
--py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
on the PYTHONPATH for Python apps.
--files FILES Comma-separated list of files to be placed in the working
directory of each executor. File paths of these files
in executors can be accessed via SparkFiles.get(fileName).

--conf, -c PROP=VALUE Arbitrary Spark configuration property.
--properties-file FILE Path to a file from which to load extra properties. If not
specified, this will look for conf/spark-defaults.conf.

--driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
--driver-java-options Extra Java options to pass to the driver.
--driver-library-path Extra library path entries to pass to the driver.
--driver-class-path Extra class path entries to pass to the driver. Note that
jars added with --jars are automatically included in the
classpath.

--executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).

--proxy-user NAME User to impersonate when submitting the application.
This argument does not work with --principal / --keytab.

--help, -h Show this help message and exit.
--verbose, -v Print additional debug output.
--version, Print the version of current Spark.

Cluster deploy mode only:
--driver-cores NUM Number of cores used by the driver, only in cluster mode
(Default: 1).

Spark standalone or Mesos with cluster deploy mode only:
--supervise If given, restarts the driver on failure.
--kill SUBMISSION_ID If given, kills the driver specified.
--status SUBMISSION_ID If given, requests the status of the driver specified.

Spark standalone and Mesos only:
--total-executor-cores NUM Total cores for all executors.

Spark standalone and YARN only:
--executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
or all available cores on the worker in standalone mode)

YARN-only:
--queue QUEUE_NAME The YARN queue to submit to (Default: "default").
--num-executors NUM Number of executors to launch (Default: 2).
If dynamic allocation is enabled, the initial number of
executors will be at least NUM.
--archives ARCHIVES Comma separated list of archives to be extracted into the
working directory of each executor.
--principal PRINCIPAL Principal to be used to login to KDC, while running on
secure HDFS.
--keytab KEYTAB The full path to the file that contains the keytab for the
principal specified above. This keytab will be copied to
the node running the Application Master via the Secure
Distributed Cache, for renewing the login tickets and the
delegation tokens periodically.

Traceback (most recent call last):
File "", line 1, in
File "/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/raydp/context.py", line 122, in init_spark
return _global_spark_context.get_or_create_session()
File "/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/raydp/context.py", line 74, in get_or_create_session
self._configs)
File "/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 69, in get_spark_session
spark_builder.appName(app_name).master(self.get_cluster_url()).getOrCreate()
File "/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/pyspark/sql/session.py", line 186, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/pyspark/context.py", line 378, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/pyspark/context.py", line 133, in init
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
File "/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/pyspark/context.py", line 327, in _ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
File "/home/guryaniv/.conda/envs/raydp2/lib/python3.7/site-packages/pyspark/java_gateway.py", line 105, in launch_gateway
raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number

@ConeyLiu
Copy link
Collaborator

Oh, I see. You can not submit the job with spark-submit script. You just need to run the python file directly.

@yanivg10
Copy link
Author

I am running the python script directly, not using spark-submit. Could it be related to the ray version? Which ray version do you recommend? I'm using 1.2.0, but it looks like you are using 2.0.0.dev0

@ConeyLiu
Copy link
Collaborator

Did you set SPARK_HOME environments?

@carsonwang
Copy link
Collaborator

@yanivg10 , it seems there is a problem to start the java process in your environment. Do you have JDK installed and have the environment variable JAVA_HOME set properly? Please try to run "java -version" and then run "pyspark" to see if pyspark itself can work properly.

@yanivg10
Copy link
Author

Thanks. This was indeed related to the JAVA_HOME variable. It should not be set to any path for the code to run.

I am able to run your taxi fare prediction example, but Torch is throwing errors while the training session runs. It seems related to issue #74 and I added a comment there with the output log. Can you take a look at that?

@ConeyLiu
Copy link
Collaborator

Closes this. You can reopen if has further problems @yanivg10.

@dalgual
Copy link

dalgual commented Mar 24, 2022

@carsonwang

I have a similar error at: spark = raydp.init_spark("test", 1, 1, "2g").
Do you have any step-by-step tutorial page or link to reproduce your codes? Do you have any yaml example to share as well to launch raydp at AWS?

(1) If I run it at ipython3, I have the following error:
Py4JJavaError: An error occurred while calling o0.setProperties.
: java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:460)
at java.util.Properties.setProperty(Properties.java:166)
at java.lang.System.setProperty(System.java:798)
at org.apache.spark.deploy.raydp.AppMasterJavaBridge.$anonfun$setProperties$1(AppM
(2) If I run "python test_ray_init.py", I have the following error:
Traceback (most recent call last):
File "test_ray_init.py", line 14, in
spark = raydp.init_spark("test", 1, 1, "2g")
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 126, in init_spark
return _global_spark_context.get_or_create_session()
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 70, in get_or_create_session
spark_cluster = self._get_or_create_spark_cluster()
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 63, in _get_or_create_spark_cluster
self._spark_cluster = SparkCluster(self._configs)
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 34, in init
self._set_up_master(None, None)
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 40, in _set_up_master
self._app_master_bridge.start_up()
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 56, in start_up
self._set_properties()
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 158, in _set_properties
self._app_master_java_bridge.setProperties(jvm_properties)
...

@yanivg10 , it seems there is a problem to start the java process in your environment. Do you have JDK installed and have the environment variable JAVA_HOME set properly? Please try to run "java -version" and then run "pyspark" to see if pyspark itself can work properly.

@carsonwang
Copy link
Collaborator

@dalgual , we don't have yaml or docker image published right now. Did you create your own image based on Ray's docker image and have RayDP and java installed properly? There is an example RayDP docker file for reference https://github.com/oap-project/raydp/tree/master/docker

@dalgual
Copy link

dalgual commented Mar 24, 2022

@carsonwang
I launch EC2 instances as follows. Do I miss anything in the following?:
ray up -y config.raydp.yaml

------ config.raydp.yaml ----------------
cluster_name: ray-dp-bigdai

max_workers: 2
upscaling_speed: 1.0

provider:
type: aws
region: us-west-2
availability_zone: us-west-2a,us-west-2b

Whether to allow node reuse. If set to False, nodes will be terminated

instead of stopped.

cache_stopped_nodes: True # If not present, the default is True.

autoscaling_mode: default

auth:
ssh_user: ubuntu

available_node_types:
ray.head.default:
resources: {}
node_config:
InstanceType: m5.xlarge
ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30
# You can provision additional disk space with a conf as follows
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 100
#g4dn.2xlarge, 1 GPU, 8 vCPUs, 32 GiB of memory, 225 NVMe SSD, up to 25 Gbps network performance
ray.worker.default:
min_workers: 2
max_workers: 2
resources: {}
node_config:
InstanceType: g4dn.2xlarge
ImageId: ami-0050625d58fa27b6d #ami-0a2363a9cff180a64 # Deep Learning AMI (Ubuntu) Version 30

head_node_type: ray.head.default

setup_commands:
- pip install "raydp"

@kira-lin
Copy link
Collaborator

Unfortunately, we don't have such yaml now.
Are you using raydp 0.4.1 and ray 1.11.0? Also can you provide complete error message when running python script? Is it the same as ipython3 case?

@dalgual
Copy link

dalgual commented Mar 24, 2022

@kira-lin
yes, here it is:
ray, version 1.11.0

ubuntu@ip-172-30-0-170:~/examples$ cat test_ray_init.py
import raydp
import ray

raydp.version

import pyspark

pyspark.version

ray.init(address='auto', _redis_password='5241590000000000')
spark = raydp.init_spark("test", 1, 1, "2g")

ubuntu@ip-172-30-0-170:~/examples$ python test_ray_init.py
Traceback (most recent call last):
File "test_ray_init.py", line 14, in
spark = raydp.init_spark("test", 1, 1, "2g")
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 126, in init_spark
return _global_spark_context.get_or_create_session()
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 70, in get_or_create_session
spark_cluster = self._get_or_create_spark_cluster()
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 63, in _get_or_create_spark_cluster
self._spark_cluster = SparkCluster(self._configs)
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 34, in init
self._set_up_master(None, None)
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 40, in _set_up_master
self._app_master_bridge.start_up()
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 56, in start_up
self._set_properties()
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 158, in _set_properties
self._app_master_java_bridge.setProperties(jvm_properties)
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in call
answer, self.gateway_client, self.target_id, self.name)
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o0.setProperties.
: java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:460)
at java.util.Properties.setProperty(Properties.java:166)
at java.lang.System.setProperty(System.java:798)
at org.apache.spark.deploy.raydp.AppMasterJavaBridge.$anonfun$setProperties$1(AppMasterJavaBridge.scala:32)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at org.apache.spark.deploy.raydp.AppMasterJavaBridge.setProperties(AppMasterJavaBridge.scala:31)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

@kira-lin
Copy link
Collaborator

kira-lin commented Mar 24, 2022

I've verified that indeed raydp-0.4.1 does not work with ray 1.11.0. Can you please try to use previous versions of ray? Or you can use raydp-nightly, too.

@dalgual
Copy link

dalgual commented Mar 24, 2022

@kira-lin Thanks it works ;-)

@dalgual
Copy link

dalgual commented Mar 25, 2022

@kira-lin @carsonwang I run XGBoost using raydp by reading a dataset at s3. Do you have any example code to read S3 using ray? I got the following error

Traceback (most recent call last):
File "xgboost_ray_nyctaxi.py", line 25, in
.load(NYC_TRAIN_CSV)
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/sql/readwriter.py", line 158, in load
return self._df(self._jreader.load(path))
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1322, in call
answer, self.gateway_client, self.target_id, self.name)
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/ubuntu/anaconda3/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o61.load.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "s3"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:747)
at scala.collection.immutable.List.map(List.scala:293)
at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:745)
at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:577)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:408)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:274)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:245)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:245)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)

(RayDPSparkMaster pid=6477) 2022-03-25 01:52:02,284 INFO RayAppMaster [Thread-2]: Stopping RayAppMaster

@kira-lin
Copy link
Collaborator

Are you able to read it using pyspark? Seems like reading/writing s3 requires some config, and some extra package, like hadoop-aws. Maybe you can refer to this

@dalgual
Copy link

dalgual commented Mar 25, 2022

Does ray and raydp access data at the same node/server?
Is there any way to read files from other data source such as s3, object storage of Azure & GCP?

@kira-lin
Copy link
Collaborator

raydp is spark on ray. Anything you can do on spark should also be possible with raydp. Reading s3 should be possible with the library I provided above. This is another compact tutorial, hope it helps.

@dalgual
Copy link

dalgual commented Mar 25, 2022

@kira-lin Thanks. I found out that the following works:

data = ray.data.read_csv("s3://my_bucket/test.csv")

@dalgual
Copy link

dalgual commented Mar 25, 2022

@kira-lin

I cannot open ray dashboard when I launch a cluster with raydp and xgboost_ray together using "pip install xgboost_ray raydp-nightly" - I can with raydp only.

Do you have any idea how I can open ray dashboard or make it run?

@kira-lin
Copy link
Collaborator

That's strange. Do you mean ray dashboard or spark dashboard? Ray dashboard should be available without raydp. It should start once you create the cluster

@dalgual
Copy link

dalgual commented Mar 28, 2022

@kira-lin I can run PySpark xgboost-ray code now with CPU. However, even though I have the following code with "gpu_hist", I dont have any clue if the code runs using GPU - the code actually works well with CPU but not with GPU. Dashboard only shows the resouces of CPU. How I can tell if the code runs in GPU? Do you have any example code link with GPU?
I also list a result of ray status below:

#g4dn.2xlarge, 1 GPU, 8 vCPUs, 32 GiB of memory, 225 NVMe SSD, up to 25 Gbps network performance
app_name = "Spark with RayDP"
num_executors = 1 #1
cores_per_executor = 1 #4, 2, 1
memory_per_executor = "2GB" # "2GB" "1GB" "500M"

spark = raydp.init_spark(app_name, num_executors, cores_per_executor, memory_per_executor)
...
"tree_method": "gpu_hist",
...
ray_params=RayParams(
num_actors=3,
gpus_per_actor=1))

ubuntu@ip-172-30-0-247:~/tutorials$ ray status
======== Autoscaler status: 2022-03-28 06:19:15.800303 ========
Node status

Healthy:
1 ray.head.default
Pending:
172.30.0.37: ray.worker.default, uninitialized
172.30.0.67: ray.worker.default, uninitialized
172.30.0.50: ray.worker.default, uninitialized
Recent failures:
(no failures)

Resources

Usage:
1.0/4.0 CPU
2.00/8.886 GiB memory
0.00/4.443 GiB object_store_memory

Demands:
{'CPU': 1.0, 'GPU': 1.0} * 3 (SPREAD): 1+ pending placement groups

@kira-lin
Copy link
Collaborator

@dalgual
It seems like your cluster is not ready. Only the head node is up, others are pending. Is it the status when you run the code? If so, the resource you require (3xGPU) cannot be met. I guess you should specify gpu in resource when you config the cluster. We don't have much experience on using gpu in ray, maybe you should ask people in the xgboost_ray repo.

@yunju63
Copy link

yunju63 commented Jun 26, 2023

@kira-lin

I got a similar issue, I would appreciate it if you could check:
(I downgraded raydp and pyspark, should I also downgrade ray?)

Environment

$java --version

openjdk 11.0.19 2023-04-18
OpenJDK Runtime Environment (build 11.0.19+7-post-Ubuntu-0ubuntu118.04.1)
OpenJDK 64-Bit Server VM (build 11.0.19+7-post-Ubuntu-0ubuntu118.04.1, mixed mode, sharing)

$echo $JAVA_HOME

/usr/lib/jvm/java-11-openjdk-amd64

$pip freeze | grep ray

ray==2.4.0
raydp==0.6.0

$pip freeze | grep spark

pyspark==3.1.3

Error messages

when run raydp.init_spark(app_name='RayDP Example',
num_executors=4,
executor_cores=2,
executor_memory='7GB')

File "python/ray/_raylet.pyx", line 870, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 921, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 877, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 881, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 821, in ray._raylet.execute_task.function_executor
  File "/usr/local/lib/python3.6/dist-packages/ray/_private/function_manager.py", line 670, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/ray/util/tracing/tracing_helper.py", line 460, in _resume_span
    return method(self, *_args, **_kwargs)
  File "/usr/local/lib/python3.6/dist-packages/raydp/spark/ray_cluster_master.py", line 55, in start_up
    self._set_properties()
  File "/usr/local/lib/python3.6/dist-packages/ray/util/tracing/tracing_helper.py", line 460, in _resume_span
    return method(self, *_args, **_kwargs)
  File "/usr/local/lib/python3.6/dist-packages/raydp/spark/ray_cluster_master.py", line 170, in _set_properties
    self._app_master_java_bridge.setProperties(jvm_properties)
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.6/dist-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o0.setProperties.
: java.lang.NullPointerException
	at java.base/java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
	at java.base/java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
	at java.base/java.util.Properties.put(Properties.java:1340)
	at java.base/java.util.Properties.setProperty(Properties.java:228)
	at java.base/java.lang.System.setProperty(System.java:898)
	at org.apache.spark.deploy.raydp.AppMasterJavaBridge.$anonfun$setProperties$1(AppMasterJavaBridge.scala:32)
	at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:234)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
	at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:468)
	at org.apache.spark.deploy.raydp.AppMasterJavaBridge.setProperties(AppMasterJavaBridge.scala:31)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)

@kira-lin
Copy link
Collaborator

@yunju63 You should use raydp-nightly to work with Ray 2.4.0. Spark 3.1.3 is fine. Run pip install --pre -U raydp and try again

@yunju63
Copy link

yunju63 commented Jun 26, 2023

@kira-lin It works, thank you :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants