Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String #130

Closed
xbmatrix opened this issue Apr 18, 2019 · 4 comments

Comments

@xbmatrix
Copy link

xbmatrix commented Apr 18, 2019

I am running the tfrecord converter example using spark-connector_2.11-1.10.0.jar(pyspark --jars ~/spark-connector_2.11-1.10.0.jar), it works. Then, I add an string vector fileld called 'StrVectorCol', I got an exception. The spark version is 2.4.0-cdh6.1.1.

from pyspark.sql.types import *

path = "test-output.tfrecord"

fields = [StructField("id", IntegerType()), StructField("IntegerCol", IntegerType()),
          StructField("LongCol", LongType()), StructField("FloatCol", FloatType()),
          StructField("DoubleCol", DoubleType()), StructField("VectorCol", ArrayType(DoubleType(), True)),
          StructField("StringCol", StringType()), StructField("StrVectorCol", ArrayType(StringType(), True))]
schema = StructType(fields)
test_rows = [[11, 1, 23, 10.0, 14.0, [1.0, 2.0], "r1", ["a1", "b1"]], [21, 2, 24, 12.0, 15.0, [2.0, 2.0], "r2", ["a2"]]]
rdd = spark.sparkContext.parallelize(test_rows)
df = spark.createDataFrame(rdd, schema)
df.write.format("tfrecords").option("recordType", "Example").save(path)

I got an exception.

hadoo-002, executor 1): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:155)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
        at org.apache.spark.sql.catalyst.util.GenericArrayData.getUTF8String(GenericArrayData.scala:75)
        at org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8.apply(InternalRow.scala:136)
        at org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8.apply(InternalRow.scala:136)
        at org.apache.spark.sql.catalyst.util.ArrayData.toArray(ArrayData.scala:178)
        at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$.org$tensorflow$spark$datasources$tfrecords$serde$DefaultTfRecordRowEncoder$$encodeFeature(DefaultTfRecordRowEncoder.scala:132)
        at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$$anonfun$encodeExample$1.apply(DefaultTfRecordRowEncoder.scala:64)
        at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$$anonfun$encodeExample$1.apply(DefaultTfRecordRowEncoder.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.tensorflow.spark.datasources.tfrecords.serde.DefaultTfRecordRowEncoder$.encodeExample(DefaultTfRecordRowEncoder.scala:61)
        at org.tensorflow.spark.datasources.tfrecords.DefaultSource$$anonfun$2.apply(DefaultSource.scala:59)
        at org.tensorflow.spark.datasources.tfrecords.DefaultSource$$anonfun$2.apply(DefaultSource.scala:56)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:129)
        at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139)
        ... 10 more
@manuzhang
Copy link

What you pass in for StrVectorCol is a Python tuple and StructType in Spark rather than ArrayType

@MiladShahidi
Copy link

Hi @xbmatrix . Did you figure this out?

@MiladShahidi
Copy link

Here is the solution I found. You need to cast strings to ByteArray. I used this function before writing to TFRecord:

def strings_to_binary(df):
    """
    This function casts all StringType columns in a Spark DataFrame to BinaryType.

    The Spark-Tensorflow connector does not accept Array(StringType) columns when writing TFRecords.
    It expects Array(BinaryType), hence the need to cast string columns to binary before collecting them into lists
    and writing to TFRecords.
    """
    for col_name in df.columns:
        if isinstance(df.schema[col_name].dataType, StringType):
            df = df.withColumn(col_name, fn.col(col_name).cast(BinaryType()))

    return df

I kept the columns as strings so I could manipulate them with functions that expected StringType and converted them to BinaryType just before writing.

@xbmatrix
Copy link
Author

This problem is caused by version

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants