Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can I use spark.createDataFrame() with a list of ObjectRef from various remote Ray workers? #164

Closed
klwuibm opened this issue Jul 7, 2021 · 19 comments

Comments

@klwuibm
Copy link

klwuibm commented Jul 7, 2021

I am trying to see if I can create a Spark DataFrame with a list of ObjectRef from several remote Ray tasks that have put large objects into their local plasma stores. I know that Spark can take a Pandas dataframe and turn it into a Spark dataframe. However, I don't want the Spark driver to collect all the large objects from various Ray workers in remote nodes, because it can easily cause the Spark driver to be out of memory. But, it turns out that ObjectRef is not one of the supported DataTypes in Spark.createDataFrame(). Is there any workaround?

Here is the code that I was trying to do:

import ray
import pandas as pd
import numpy as np

ray.shutdown()
ray.init()

@ray.remote
def create_small_dataframe(i):
    pdf = pd.DataFrame(data=np.random.randint(5*i, size=(3, 4)))
    pdf_ref = ray.put(pdf)
    return pdf_ref
  
# The driver program uses Ray.remote to let the remote actors/task
# to use Ray.put() to create/store partitions of dataframe locally in 
# their plasma store.  The Object_refs of these partitions are returned
# back to the driver
obj_ref1 = ray.get(create_small_dataframe.remote(1))
obj_ref2 = ray.get(create_small_dataframe.remote(2))

# print(ray.get(obj_ref1))
# print(ray.get(obj_ref2))

# The drive then collects all obj_refs into a list on the driver
obj_ref_list = [obj_ref1, obj_ref2]


# Now, the driver can call SparkSession.createDataFrame

import raydp

raydp.stop_spark()

spark = raydp.init_spark('dataframe_with_obj_ref',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='1G')

from pyspark.sql.types import BinaryType, StructType, StructField

from ray._raylet import ObjectRef

schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])

sdf = spark.createDataFrame(obj_ref_list, schema)

And the following are the error messages:

TypeError                                 Traceback (most recent call last)
<ipython-input-3-2ad94baefe18> in <module>
     30 schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
     31 
---> 32 sdf = spark.createDataFrame(obj_ref_list, schema)
     33 
     34 

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    603             return super(SparkSession, self).createDataFrame(
    604                 data, schema, samplingRatio, verifySchema)
--> 605         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    606 
    607     def _create_dataframe(self, data, schema, samplingRatio, verifySchema):

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
    628             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    629         else:
--> 630             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    631         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    632         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    446         # make sure data could consumed multiple times
    447         if not isinstance(data, list):
--> 448             data = list(data)
    449 
    450         if schema is None or isinstance(schema, (list, tuple)):

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in prepare(obj)
    610 
    611             def prepare(obj):
--> 612                 verify_func(obj)
    613                 return obj
    614         elif isinstance(schema, DataType):

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in verify(obj)
   1406     def verify(obj):
   1407         if not verify_nullability(obj):
-> 1408             verify_value(obj)
   1409 
   1410     return verify

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in verify_struct(obj)
   1393                     verifier(d.get(f))
   1394             else:
-> 1395                 raise TypeError(new_msg("StructType can not accept object %r in type %s"
   1396                                         % (obj, type(obj))))
   1397         verify_value = verify_struct

TypeError: StructType can not accept object ObjectRef(a67dc375e60ddd1affffffffffffffffffffffff0100000002000000) in type <class 'ray._raylet.ObjectRef'>
@ConeyLiu
Copy link
Collaborator

ConeyLiu commented Jul 8, 2021

Hi @klwuibm, the feature is not supported yet. @kira-lin is working to support it.

@kira-lin
Copy link
Collaborator

kira-lin commented Jul 9, 2021

You might want to take a look at ray.experiment.data, to which we are currently adding support. But it requires ray-nightly, so for now you can probably use our MLDataset. You need to use ray.cloudpickle.dumps(obj_ref) when you want to put it into a dataframe, and ray.cloudpickle.loads before you ray.get. Besides, things returned from create_small_datataframe.remote() is a ref to a ref to the data, you could just return pdf instead.

@klwuibm
Copy link
Author

klwuibm commented Jul 9, 2021

@kira-lin thanks for the comments. I wonder if you could clarify two questions. (1) Is there any connection between MLDataset and ray.cloudpickle.dumps()? I assume they are not related. (2) Your suggestion of serializing obj_ref with ray.cloudpickle.dumps() before calling spark.createDataFrame(), do you mean that I store the Pandas dataframe created by a remote task onto a file on disk? Namely, are you suggesting that I write the remotely created Pandas dataframe onto disks? And then spark can read them back to create Spark datafame?

@kira-lin
Copy link
Collaborator

  1. yes they are not related. I was just suggesting maybe you can try our MLDataset, or if you want to solve the problem, you should use cloudpickle
  2. No, cloudpickle.dumps just serialize the reference and returns a binary bytes object. Then you can store it in the dataframe, with schema binary type. cloudpickle.loads loads a reference from the binary bytes, that's how you can use ray.get to get data again.

@klwuibm
Copy link
Author

klwuibm commented Jul 12, 2021

@kira-lin Thanks very much. Appreciated the help. Here is what I did, but still not working as I expected.

import cloudpickle

@ray.remote
def create_small_dataframe(i):
    pdf = pd.DataFrame(data=np.random.randint(5*i, size=(3, 4)))
    return ray.cloudpickle.dumps(pdf)
  
# The driver program uses Ray.remote to let the remote actors/task
# to use Ray.put() to create/store partitions of dataframe locally in 
# their plasma store.  The Object_refs of these partitions are returned
# back to the driver

obj_ref1 = ray.get(create_small_dataframe.remote(1))
obj_ref2 = ray.get(create_small_dataframe.remote(2))

print(obj_ref1)
#print(ray.get(obj_ref2))

#obj_ref_hex1 = ray.cloudpickle.dumps(obj_ref1).hex()
#obj_ref_hex2 = ray.cloudpickle.dumps(obj_ref2).hex()

# The drive then collects all obj_refs into a list on the driver
obj_ref_list = [obj_ref1, obj_ref2]


# Now, the driver can call SparkSession.createDataFrame

import raydp

raydp.stop_spark()

spark = raydp.init_spark('dataframe_with_obj_ref',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='1G')

from pyspark.sql.types import BinaryType, StructType, StructField


schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])

sdf = spark.createDataFrame(obj_ref_list, schema)

The error messages are:

b'\x80\x05\x95\x89\x02\x00\x00\x00\x00\x00\x00\x8c\x11pandas.core.frame\x94\x8c\tDataFrame\x94\x93\x94)\x81\x94}\x94(\x8c\x04_mgr\x94\x8c\x1epandas.core.internals.managers\x94\x8c\x0cBlockManager\x94\x93\x94)\x81\x94(]\x94(\x8c\x18pandas.core.indexes.base\x94\x8c\n_new_Index\x94\x93\x94\x8c\x19pandas.core.indexes.range\x94\x8c\nRangeIndex\x94\x93\x94}\x94(\x8c\x04name\x94N\x8c\x05start\x94K\x00\x8c\x04stop\x94K\x04\x8c\x04step\x94K\x01u\x86\x94R\x94h\rh\x10}\x94(h\x12Nh\x13K\x00h\x14K\x03h\x15K\x01u\x86\x94R\x94e]\x94\x8c\x12numpy.core.numeric\x94\x8c\x0b_frombuffer\x94\x93\x94(\x96`\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bK\x04K\x03\x86\x94\x8c\x01F\x94t\x94R\x94a]\x94h\rh\x10}\x94(h\x12Nh\x13K\x00h\x14K\x04h\x15K\x01u\x86\x94R\x94a}\x94\x8c\x060.14.1\x94}\x94(\x8c\x04axes\x94h\n\x8c\x06blocks\x94]\x94}\x94(\x8c\x06values\x94h+\x8c\x08mgr_locs\x94\x8c\x08builtins\x94\x8c\x05slice\x94\x93\x94K\x00K\x04K\x01\x87\x94R\x94uaust\x94b\x8c\x04_typ\x94\x8c\tdataframe\x94\x8c\t_metadata\x94]\x94\x8c\x05attrs\x94}\x94ub.'

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-35-ff5408f1769a> in <module>
     33 schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
     34 
---> 35 sdf = spark.createDataFrame(obj_ref_list, schema)
     36 
     37 

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    603             return super(SparkSession, self).createDataFrame(
    604                 data, schema, samplingRatio, verifySchema)
--> 605         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    606 
    607     def _create_dataframe(self, data, schema, samplingRatio, verifySchema):

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
    628             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    629         else:
--> 630             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    631         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    632         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _createFromLocal(self, data, schema)
    446         # make sure data could consumed multiple times
    447         if not isinstance(data, list):
--> 448             data = list(data)
    449 
    450         if schema is None or isinstance(schema, (list, tuple)):

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in prepare(obj)
    610 
    611             def prepare(obj):
--> 612                 verify_func(obj)
    613                 return obj
    614         elif isinstance(schema, DataType):

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in verify(obj)
   1406     def verify(obj):
   1407         if not verify_nullability(obj):
-> 1408             verify_value(obj)
   1409 
   1410     return verify

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in verify_struct(obj)
   1393                     verifier(d.get(f))
   1394             else:
-> 1395                 raise TypeError(new_msg("StructType can not accept object %r in type %s"
   1396                                         % (obj, type(obj))))
   1397         verify_value = verify_struct

TypeError: StructType can not accept object b'\x80\x05\x95\x89\x02\x00\x00\x00\x00\x00\x00\x8c\x11pandas.core.frame\x94\x8c\tDataFrame\x94\x93\x94)\x81\x94}\x94(\x8c\x04_mgr\x94\x8c\x1epandas.core.internals.managers\x94\x8c\x0cBlockManager\x94\x93\x94)\x81\x94(]\x94(\x8c\x18pandas.core.indexes.base\x94\x8c\n_new_Index\x94\x93\x94\x8c\x19pandas.core.indexes.range\x94\x8c\nRangeIndex\x94\x93\x94}\x94(\x8c\x04name\x94N\x8c\x05start\x94K\x00\x8c\x04stop\x94K\x04\x8c\x04step\x94K\x01u\x86\x94R\x94h\rh\x10}\x94(h\x12Nh\x13K\x00h\x14K\x03h\x15K\x01u\x86\x94R\x94e]\x94\x8c\x12numpy.core.numeric\x94\x8c\x0b_frombuffer\x94\x93\x94(\x96`\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x94\x8c\x05numpy\x94\x8c\x05dtype\x94\x93\x94\x8c\x02i8\x94\x89\x88\x87\x94R\x94(K\x03\x8c\x01<\x94NNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00t\x94bK\x04K\x03\x86\x94\x8c\x01F\x94t\x94R\x94a]\x94h\rh\x10}\x94(h\x12Nh\x13K\x00h\x14K\x04h\x15K\x01u\x86\x94R\x94a}\x94\x8c\x060.14.1\x94}\x94(\x8c\x04axes\x94h\n\x8c\x06blocks\x94]\x94}\x94(\x8c\x06values\x94h+\x8c\x08mgr_locs\x94\x8c\x08builtins\x94\x8c\x05slice\x94\x93\x94K\x00K\x04K\x01\x87\x94R\x94uaust\x94b\x8c\x04_typ\x94\x8c\tdataframe\x94\x8c\t_metadata\x94]\x94\x8c\x05attrs\x94}\x94ub.' in type <class 'bytes'>

@kira-lin
Copy link
Collaborator

What you want to do is probably this:

import ray
import pandas as pd
@ray.remote
def create_small_dataframe(i):
    return pd.DataFrame(data=np.random.randint(5*i, size=(3, 4)))

# these are ObjectRef[pd.Dataframe]
obj_ref1 = create_small_dataframe.remote(1)
obj_ref2 = create_small_dataframe.remote(2)
# use cloudpickle to serialize them
ser_obj_ref1 = ray.cloudpickle.dumps(obj_ref1)
ser_obj_ref2 = ray.cloudpickle.dumps(obj_ref2)
obj_refs = [[ser_obj_ref1], [ser_obj_ref2]]
# start spark
import raydp
raydp.stop_spark()
spark = raydp.init_spark('dataframe_with_obj_ref',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='1G')
# create the dataframe
from pyspark.sql.types import BinaryType, StructType, StructField
schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
sdf = spark.createDataFrame(obj_refs, schema)
raydp.stop_spark()
ray.shutdown()

What do you want to do with this dataframe? You can check out this PR #166 since it's very similar.

@klwuibm
Copy link
Author

klwuibm commented Jul 13, 2021

@kira-lin thanks very much. I will take a look at the #166 PR. The idea is to pass various Pandas dataframes (created by remote functions) via ObjectRef to Spark dataframe without moving large chunks of the data from various worker nodes to the driver node in order for the driver to execute spark.createDataFrame(). If the data is large, the driver node can be out of memory. My understanding is that only spark driver can execute spark.createDataFrame() and the data must be in the driver's main memory.

@kira-lin
Copy link
Collaborator

oh yes, I see. Just FYI, in ray-nightly, a feature similar to this is under development, in ray.experimental.data. If you don't need it to be a spark dataframe, you might just use their Ray Dataset, and they have a from_pandas function. You can also use to_spark after you use from_pandas, but to_spark has not been merged yet.

@klwuibm
Copy link
Author

klwuibm commented Jul 20, 2021

@kira-lin Thanks for your help in getting Spark to successfully create a dataframe with a list of serialized objectref. After that, I would like to, on the Spark side, get the original dataframe by ray.get(ray.cloudpickle.loads()). Here is what I am trying to do after sdf = spark.createDataFrame(obj_refs, schema), but I ran into some difficulties. Any help would be greatly appreciated.

# create the dataframe
from pyspark.sql.types import BinaryType, StructType, StructField
schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])
sdf = spark.createDataFrame(obj_refs, schema)
sdf.show()

# converting a spark dataframe into an RDD, and then use flatMap to expand from one row of 
# serialized ObjectRef to multiple rows of original dataframe. 
myrdd = sdf.rdd.flatMap(lambda x: ray.get(ray.cloudpickle.loads(x['Pandas_df_ref'])))
mydf = myrdd.toDF(['column-1', 'column-2', 'column-3', 'column-4'])
mydf.show()

raydp.stop_spark()
ray.shutdown()

I got an error that puzzles me: it seems to indicate that ray is not started. I don't think I have stopped ray at that point. Here are some of the error messages.

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, 192.168.1.6, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 605, in main
    process()
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/serializers.py", line 271, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/rdd.py", line 1440, in takeUpToNumLeft
    yield next(iterator)
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-8-d07d992c915a>", line 46, in <lambda>
  File "/opt/anaconda3/lib/python3.8/site-packages/ray/serialization.py", line 45, in _object_ref_deserializer
    worker.check_connected()
  File "/opt/anaconda3/lib/python3.8/site-packages/ray/worker.py", line 199, in check_connected
    raise RaySystemError("Ray has not been started yet. You can "
ray.exceptions.RaySystemError: System error: Ray has not been started yet. You can start Ray with 'ray.init()'.

@kira-lin
Copy link
Collaborator

The python function in flatMap is performed by pyspark workers, and these are not controlled by raydp. Therefore, these processes are not connected to ray, hence the exception. You need to connect to ray by ray.client().connect or ray.init(address='auto') before you use ray.get in the function.

Besides, if you see many connecting to ray after you do so, just disable it by setting loglevel to warn. I am also investigating this problem.

@klwuibm
Copy link
Author

klwuibm commented Jul 21, 2021

@kira-lin I think the problem is that an executor of PySpark doesn't have access to Ray, even if I do a ray.init(address='auto', ignore_rayinit_error = True) before

myrdd = sdf.rdd.flatMap(lambda x: ray.get(ray.cloudpickle.loads(x['Pandas_df_ref']))).

It fails again on the next step:

mydf = myrdd.toDF(['column-1', 'column-2', 'column-3', 'column-4'])

I think due to lazy evaluation in Spark, the execution of flatMap() is not done until myrdd.toDF(). However, even if I add another ray.init(address='auto', ignore_rayinit_error = True) before myrdd.toDF(), it still fails similarly.

My question to you is: Is there a way in RayDP to pass Ray access from the PySpark driver to the PySpark workers during the spark initialization? Or only the spark driver has access to Ray?

I think in order to support good integration between Ray and Spark, the workers of PySpark need to be able to access Ray, especially for accessing data in the local plasma store. This way we can pass ObjectRef efficiently and seamlessly between Ray and Spark. thoughts?

@kira-lin
Copy link
Collaborator

Have you tried this:

def map_func(x):
    # command for executors to connect to ray cluster
    # ray.init will also work
    ray.client().connect()
    # actual work using ray
    ray.get(ray.cloudpickle.loads(x['Pandas_df_ref']))
myrdd = sdf.rdd.flatMap(map_func)

A normal process can access ray if it connects to the ray cluster by ray.init or ray.client().connect(). In RayDP, spark executor in java is spawned as a ray actor process, thus it can access ray. But for spark python executors, we did not do the same thing. We'll consider whether to add this feature.

@klwuibm
Copy link
Author

klwuibm commented Jul 22, 2021

@kira-lin Many thanks again. I tried your suggestion of initiating ray in the map_func(). I had to add a return() to the map_func(), however. Here is what I did:

obj_ref_schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])

sdf = spark.createDataFrame(data = obj_ref_list, schema = obj_ref_schema)
sdf.show()

def map_func(x):
    # command for executors to connect to ray cluster
    # ray.init will also work
    import pandas as pd
    ray.init(address='auto', ignore_reinit_error = True)
    # actual work using ray
    return(ray.get(ray.cloudpickle.loads(x['Pandas_df_ref'])))
    
myrdd = sdf.rdd.flatMap(lambda x: map_func(x))


for x in myrdd.collect():
    print(x)

mydf = myrdd.toDF(['column-1', 'column-2', 'column-3', 'column-4'])

mydf.show()

However, I found that the original Pandas dataframe didn't seem to be returned back to the RDD by the flatMap() function. I try to print out what is in myrdd. It seems weird. Here are what I got.

ObjectRef(a67dc375e60ddd1affffffffffffffffffffffff0100000001000000)
type of obj_ref1: <class 'ray._raylet.ObjectRef'>
type of obj_ref_list:  <class 'list'>
+--------------------+
|       Pandas_df_ref|
+--------------------+
|[80 05 95 A5 00 0...|
|[80 05 95 A5 00 0...|
+--------------------+

(raylet) 2021-07-22 10:58:54,399	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) 2021-07-22 10:58:54,400	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600

column-1
column-2
column-3
column-4
column-1
column-2
column-3
column-4

(raylet) 2021-07-22 10:58:55,492	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) 2021-07-22 10:58:55,492	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) 2021-07-22 10:58:55,492	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) (raylet) 2021-07-22 10:58:55,492	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) (raylet) 2021-07-22 10:58:55,492	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) (raylet) 2021-07-22 10:58:55,492	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600
(raylet) (raylet) (raylet) 2021-07-22 10:58:55,492	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 9.74.11.232:43600

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-3-d0cf4dc6d7da> in <module>
     58     print(x)
     59 
---> 60 mydf = myrdd.toDF(['column-1', 'column-2', 'column-3', 'column-4'])
     61 
     62 mydf.show()

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in toDF(self, schema, sampleRatio)
     59         [Row(name=u'Alice', age=1)]
     60         """
---> 61         return sparkSession.createDataFrame(self, schema, sampleRatio)
     62 
     63     RDD.toDF = toDF

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in createDataFrame(self, data, schema, samplingRatio, verifySchema)
    603             return super(SparkSession, self).createDataFrame(
    604                 data, schema, samplingRatio, verifySchema)
--> 605         return self._create_dataframe(data, schema, samplingRatio, verifySchema)
    606 
    607     def _create_dataframe(self, data, schema, samplingRatio, verifySchema):

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _create_dataframe(self, data, schema, samplingRatio, verifySchema)
    626 
    627         if isinstance(data, RDD):
--> 628             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    629         else:
    630             rdd, schema = self._createFromLocal(map(prepare, data), schema)

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _createFromRDD(self, rdd, schema, samplingRatio)
    423         """
    424         if schema is None or isinstance(schema, (list, tuple)):
--> 425             struct = self._inferSchema(rdd, samplingRatio, names=schema)
    426             converter = _create_converter(struct)
    427             rdd = rdd.map(converter)

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/session.py in _inferSchema(self, rdd, samplingRatio, names)
    403 
    404         if samplingRatio is None:
--> 405             schema = _infer_schema(first, names=names)
    406             if _has_nulltype(schema):
    407                 for row in rdd.take(100)[1:]:

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/types.py in _infer_schema(row, names)
   1065 
   1066     else:
-> 1067         raise TypeError("Can not infer schema for type: %s" % type(row))
   1068 
   1069     fields = [StructField(k, _infer_type(v), True) for k, v in items]

TypeError: Can not infer schema for type: <class 'str'>



@kira-lin
Copy link
Collaborator

You can try to use sdf.mapInPandas instead of rdd flatmap. Here is a doc. This step is quite similar to the to_spark function in the previously mentioned PR(arrow table and pandas dataframe are convertible), you can refer to it.

@klwuibm
Copy link
Author

klwuibm commented Jul 23, 2021

Here is what I tried:

obj_ref_schema = StructType([StructField('Pandas_df_ref', BinaryType(), True)])

sdf = spark.createDataFrame(data = obj_ref_list, schema = obj_ref_schema)
sdf.show()

def map_func(x):
    # command for executors to connect to ray cluster
    # ray.init will also work
    import pandas as pd
    ray.init(address='auto', ignore_reinit_error = True)
    # actual work using ray
    yield(ray.get(ray.cloudpickle.loads(x['Pandas_df_ref'])))

mySchema = StructType([StructField('column-1', IntegerType(), True),
                       StructField('column-2', IntegerType(), True),
                       StructField('column-3', IntegerType(), True),
                       StructField('colum-4', IntegerType(), True)])
    
mydf = sdf.mapInPandas(map_func, mySchema)

mydf.show()


And here is what I got:

ObjectRef(753565f917242c11ffffffffffffffffffffffff0100000001000000)
type of obj_ref1: <class 'ray._raylet.ObjectRef'>
type of obj_ref_list:  <class 'list'>
+--------------------+
|       Pandas_df_ref|
+--------------------+
|[80 05 95 A5 00 0...|
|[80 05 95 A5 00 0...|
+--------------------+

(raylet) 2021-07-23 10:43:43,660	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:43,660	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:44,537	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:44,537	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:45,638	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:45,638	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:46,504	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551
(raylet) 2021-07-23 10:43:46,504	INFO worker.py:640 -- Connecting to existing Ray cluster at address: 192.168.1.6:39551

---------------------------------------------------------------------------
PythonException                           Traceback (most recent call last)
<ipython-input-12-86ea798eccd3> in <module>
     61 mydf = sdf.mapInPandas(map_func, mySchema)
     62 
---> 63 mydf.show()
     64 
     65 

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    438         """
    439         if isinstance(truncate, bool) and truncate:
--> 440             print(self._jdf.showString(n, 20, vertical))
    441         else:
    442             print(self._jdf.showString(n, int(truncate), vertical))

/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    132                 # Hide where the exception came from that shows a non-Pythonic
    133                 # JVM exception message.
--> 134                 raise_from(converted)
    135             else:
    136                 raise

/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/utils.py in raise_from(e)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 605, in main
    process()
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/pandas/serializers.py", line 258, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/pandas/serializers.py", line 88, in dump_stream
    for batch in iterator:
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/sql/pandas/serializers.py", line 251, in init_stream_yield_batches
    for series in iterator:
  File "/opt/anaconda3/lib/python3.8/site-packages/pyspark/worker.py", line 359, in func
    for result_batch, result_type in result_iter:
  File "<ipython-input-12-86ea798eccd3>", line 54, in map_func
TypeError: 'map' object is not subscriptable




@kira-lin
Copy link
Collaborator

Notice that x passed to map_func would be a iterator of pandas dataframe. If not clear, please search mapInPandas in the doc. The function should look like this one in the PR:

def _convert_blocks_to_dataframe(blocks):
    # connect to ray
    if not ray.is_initialized():
        ray.client().connect()
    for block in blocks:
        dfs = []
        for b in block["ref"]:
            ref = ray.cloudpickle.loads(b)
            data = ray.get(ref)
            dfs.append(data.to_pandas())
        yield pd.concat(dfs)

@klwuibm
Copy link
Author

klwuibm commented Jul 26, 2021

@kira-lin thanks. I think I got it to work in my map_func() with the following:

def map_func(blocks):
    # command for executors to connect to ray cluster
    # ray.init will also work
    import pandas as pd
    if not ray.is_initialized():
        ray.init(address='auto', ignore_reinit_error = True)
    # actual work using ray
    for block in blocks:
        dfs =[]
        for b in block['Pandas_df_ref']:
            data = ray.get(ray.cloudpickle.loads(b))
            dfs.append(data)
        yield pd.concat(dfs)

The only difference is I didn't do data.to_pandas() as you did. I think it is because you use Dataset as an intermediary, while I put the Pandas dataframe into ray by the remote task. I wonder what implications I might encounter without using Dataset as the intermediary. One potential issue would be garbage collection - namely the Pandas dataframe stored in the local plasma store by a remote task might not be available when the Spark mapInPandas() function is executed. Are there any others?

More importantly, in your pull request, do you know if data locality is observed when your _convert_blocks_to_dataframe(blocks) is executed by the mapInPands()? Namely, is data = ray.get(ref) inside the _convert_blocks_to_dataframe(blocks) getting all its data in the local plasma store? Or is it possible that this ray.get() might need to go to many remote plasma stores?

My original motivation is to avoid movement of big data from one node to another node in converting a Pandas dataframe into a Spark dataframe, by only passing the ray.ObjectRef to the Spark driver. This is especially true if all the Pandas dataframes are big data and have been stored in the local plasma stores of different nodes. I certainly would like the execution of sdf.mapInPandas() to be mostly involving local plasma data when the map_func() is executed. I think you probably would like the same for your create_spark_dataframe_from_ray().

@kira-lin
Copy link
Collaborator

I wonder what implications I might encounter without using Dataset as the intermediary.

I think there is not big difference. We need to use to_pandas because the data stored in Ray Dataset is arrow table. If you store pandas dataframe, that would also be fine. As long as the owner of your dataframe in plasma is alive and objects have not gone out of scope, they should be available. For this part you can refer to ray doc.

Namely, is data = ray.get(ref) inside the _convert_blocks_to_dataframe(blocks) getting all its data in the local plasma store? Or is it possible that this ray.get() might need to go to many remote plasma stores?

You are right, it is possible that data would be fetched from remote nodes in this implementation. If we instead use remote tasks and objectref as its arguments, ray will try to schedule based on locality. But pyspark workers are not aware of this information, thus it has no locality. This is not ready for huge dataset, and we are also looking into other better ways to implement this, maybe spark Datasource.

@kira-lin
Copy link
Collaborator

close as stale. This has been fixed by implementing getPreferredLocations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants