Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failure Writing 20GB+ Files #51

Closed
wessankey opened this issue Jul 17, 2019 · 16 comments
Closed

Failure Writing 20GB+ Files #51

wessankey opened this issue Jul 17, 2019 · 16 comments

Comments

@wessankey
Copy link

wessankey commented Jul 17, 2019

I've been running into failures with certain files that are larger than 20 GB in size. Specifically, these errors come in two varieties:

  1. When writing the data as a CSV, a FileAlreadyExists exception will be thrown because it is attempting to write a file that has already been written.
  2. Sometimes, a java.io.IOException will be thrown with the message There are no available bytes in the input stream..

I'm loading data from a SAS file into a DataFrame, then writing it out as a CSV. A sample of what I'm doing is below:

val df = spark.read.format("com.github.saurfang.sas.spark")
    .option("header", true)
    .load("s3://bucket/file.sas7bdat")

df.write.csv("s3://bucket/output")
@thesuperzapper
Copy link
Collaborator

Some questions:

  • Can you please provide logs?
  • Dose this also happen when writing to other formats? (e.g. parquet)

@Tagar
Copy link
Contributor

Tagar commented Jul 17, 2019

On 1st one -

add overwrite option like

df.write.format('csv').mode('overwrite').save("s3://bucket/output")

@wessankey
Copy link
Author

On 1st one -

add overwrite option like

df.write.format('csv').mode('overwrite').save("s3://bucket/output")

Forgot to include that in my post, but I am using that option already.

@thesuperzapper
Copy link
Collaborator

@westonsankey logs?

@wessankey
Copy link
Author

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 33 more
Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: File already exists:s3://BUCKET/LOCATION/part-00001-xyz.csv
        at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.checkExistenceIfNotOverwriting(RegularUploadPlanner.java:36)
        at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.RegularUploadPlanner.plan(RegularUploadPlanner.java:30)
        at com.amazon.ws.emr.hadoop.fs.s3.upload.plan.UploadPlannerChain.plan(UploadPlannerChain.java:37)
        at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:601)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:810)
        at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:212)
        at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
        at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
        at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
        at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

@wessankey
Copy link
Author

Some questions:

  • Can you please provide logs?
  • Dose this also happen when writing to other formats? (e.g. parquet)

Added logs. It happens when writing to other formats as well (tested with Parquet).

@thesuperzapper
Copy link
Collaborator

@westonsankey can you provide the following:

  1. Information about your environment (Spark Version, EMR, etc.)
  2. The stack trace when you get the "There are no available bytes in the input stream." message. (And how you got it to happen)
  3. Dose the "File already exists" error always happen for all files, every time you run it?
  4. Can you read/write smaller SAS files in your Spark environment (smaller, but big enough to cause a split)? [You can force a split with maxSplitSize option]
  5. For any of the files having issues, can SAS read and process all the rows? (E.g. are you confident it's not corrupt)
  6. Can you provide/create a non-confidential SAS file which also displays this issue? [Don't worry too much about this one yet]

@wessankey
Copy link
Author

@thesuperzapper - I think the files might be corrupt, but I'm doing some more testing/research on my end. I'll provide you with that information if I determine that this is not the case. Appreciate the help thus far.

@thesuperzapper
Copy link
Collaborator

@westonsankey even if the files are corrupt, perhaps we can emit a nicer error message for such files.

@wessankey
Copy link
Author

wessankey commented Jul 23, 2019

@thesuperzapper

  1. EMR version 5.24.1 and Spark version 2.4.2. Cluster contains a master and one worker, both m3.xlarge (15 GB memory). Tested on a larger cluster as well with the same results.
  2. Stack trace below.
  3. It happens for certain files that are larger than 20 GB when I run it manually in the PySpark shell. When running the same code as an EMR step, I get the FileAlreadyExists exception.
  4. We have no issues with any files under 20 GB.
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2041)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2029)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2028)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2028)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2262)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2211)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2200)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 33 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.github.saurfang.sas.util.PrivateMethodCaller.apply(PrivateMethodExposer.scala:30)
        at com.github.saurfang.sas.parso.SasFileParserWrapper.readNext(ParsoWrapper.scala:81)
        at com.github.saurfang.sas.mapreduce.SasRecordReader.readNext$lzycompute$1(SasRecordReader.scala:159)
        at com.github.saurfang.sas.mapreduce.SasRecordReader.readNext$1(SasRecordReader.scala:153)
        at com.github.saurfang.sas.mapreduce.SasRecordReader.nextKeyValue(SasRecordReader.scala:175)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$ileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:244)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 10 more
Caused by: java.io.IOException: There are no available bytes in the input stream.
        at com.epam.parso.impl.SasFileParser.getBytesFromFile(SasFileParser.java:748)
        at com.epam.parso.impl.SasFileParser.readSubheaderSignature(SasFileParser.java:405)
        at com.epam.parso.impl.SasFileParser.processPageMetadata(SasFileParser.java:374)
        at com.epam.parso.impl.SasFileParser.processNextPage(SasFileParser.java:572)
        at com.epam.parso.impl.SasFileParser.readNextPage(SasFileParser.java:543)
        at com.epam.parso.impl.SasFileParser.readNext(SasFileParser.java:501)
        ... 30 more

@thesuperzapper
Copy link
Collaborator

@westonsankey
I wonder if its to do with Scala 2.12 (which is used in Spark 2.4.2 only), as we only compile for 2.11 right now. Can you try with Spark 2.4.3 or Spark 2.4.1?

This might become an issue, because we use a crazy hack to use the Parso library, and I doubt it works in Scala 2.12.

@wessankey
Copy link
Author

@thesuperzapper
Tried running on a couple older EMR/Spark versions but had no success.

@thesuperzapper
Copy link
Collaborator

@westonsankey

  1. Are you able to run it in local mode?
  2. Can you provide the worker stack trace, rather than just the driver one? (If not running in local mode)
  3. Just to confirm, are you using Scala or PySpark?
  4. I know it will be difficult, but could you create a 20gb file with a similar method to how you make your normal one, but with random data in it. (Unless the dataset can be public) [You don't have to do this right away, but it will be quite hard to debug without some kind of file]

@thesuperzapper
Copy link
Collaborator

@westonsankey could you also try setting maxSplitSize to larger than the file, and doing it with one server, I know it will take awhile, but this will let us confirm if it’s an issue with splitting. (I wonder if it something to do with page sizes larger than 1 MB)

@wessankey
Copy link
Author

@thesuperzapper - Tried setting the maxSplitSize to force a single partition, but that resulted in the same error.

I wrote a small Java program using the Parso library to iterate over the rows in the SAS file to see if there were any errors parsing a single row. Turns out that is the issue - I get the IOException indicating that there are no available bytes in the input stream on a single row. Given that, it seems like this is not an issue with spark-sas7bdat.

@thesuperzapper
Copy link
Collaborator

@westonsankey can you raise an issue here: https://github.com/epam/parso

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants