Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement for MS Excel: Reading, Combining, Writing excel files #12

Closed
jkommeren opened this issue May 19, 2017 · 13 comments
Closed

Enhancement for MS Excel: Reading, Combining, Writing excel files #12

jkommeren opened this issue May 19, 2017 · 13 comments

Comments

@jkommeren
Copy link

Hello,

Currently your library supports reading excel files and writing them, which is excellent!

What I would really like to do is:

Create 1 page with calculations, referring other sheets (250+), with just raw data (480 cells per sheet)

What I have:

250+ dfs of raw data (integers, strings), 1 df for each sheet to be output

According to the documentation data can be written in two ways:

  1. You can have a dataframe with columns of simple datatypes (no map, no list, no struct) that should be written in rows of an Excel sheet. You can define the sheetname by using the option "write.spark.defaultsheetname" (default is "Sheet1"). In this way, you can only write values, but no formulas, comments etc.
  2. You can have a dataframe with arrays where each element corresponds to the schema defined above. In this case you have full control where the data ends, you can use formulas, comments etc.

How I envisioned this could work with your current library, to avoid creating 480 cells each for 250+ dfs of sources:

  • Use method 1 to write each of the 250+ dfs as a separate xlsx, with a unique name for each sheet using write.spark.defaultsheetname
  • Write the front page with formulas using method 2. to a separate xlsx
  • Read these files as dfs, and use union to create 1 df (with multiple sheets)
  • Write them as a single xlsx

This did not work unfortunately, because reading an xlsx will return a df that cannot be directly written as an xlsx with this library.

My question:

Is there an easy way to do this currently? (I'm not that familiar with Spark yet, so there could be something I'm missing!) If not, do you think it should be able to do this?

To me it makes sense to be able to import data, make some adjustments, and export it again.

What do you think?

Keep up the good work!

Kind regards,

Joris

@jkommeren jkommeren changed the title Excel: Reading, Combining, Writing excel files Enhancement for MS Excel: Reading, Combining, Writing excel files May 19, 2017
@jornfranke
Copy link
Member

I think your use case is very valid.

However, I do not understand the issue with the current solution.
I have not tested it, but you can read an .xlsx into a DF and this DF can again be written as .xslx. In fact, it what designed to support this (I admin the documentation is a little bit sparse).
So your approach should work already. Did you try it?

Another alternative might be in the future the templating feature (cf. #3). Using this feature you can load an Excel file and modify selected cells. The idea behind this feature was more that you use Excel to create nice drawings, formats etc. and then use HadoopOffice to fill this "template" with data/formulas.

@jkommeren
Copy link
Author

Thanks for validating the use case.

Yes I've tried it. Reading any xlsx file to a DF it cannot be output to an excel file again (directly). Example:

val sRdd = spark.sparkContext.parallelize(Seq(
  Seq("","","1","A1","Sheet2"),
  Seq("","This is a comment","2","A2","Sheet2"),
  Seq("","","3","A3","Sheet2"),
  Seq("","","AVERAGE('Sheet2'!A2:A3)","B1","Sheet1"))).repartition(1)
val df= sRdd.toDF()
df.write.format("org.zuinnote.spark.office.excel").mode(SaveMode.Overwrite).save("/user/spark/output/")
val df2 = spark.read.format("org.zuinnote.spark.office.excel").load("/user/spark/output/")
df2.write.format("org.zuinnote.spark.office.excel").mode(SaveMode.Overwrite).save("/user/spark/output2/")

//df.show()
try {
    df.write.format("org.zuinnote.spark.office.excel").mode(SaveMode.Overwrite).save("/user/spark/output/")
} catch {
    case e: Exception => {
         val sw = new StringWriter
       e.printStackTrace(new PrintWriter(sw))
        println(sw.toString)
    } 
}

results in an error similar to:

org.apache.spark.SparkException: Task failed while writing rows
	at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: scala.MatchError: WrappedArray([1,,1,A1,Sheet2]) (of class scala.collection.mutable.WrappedArray$ofRef)
	at org.zuinnote.spark.office.excel.ExcelOutputWriter$$anonfun$write$1.apply$mcVI$sp(ExcelOutputWriter.scala:77)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
	at org.zuinnote.spark.office.excel.ExcelOutputWriter.write(ExcelOutputWriter.scala:70)
	at org.apache.spark.sql.execution.datasources.OutputWriter.writeInternal(fileSourceInterfaces.scala:119)
	at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:255)
	at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
	at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348)
	at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
	... 8 more

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143)
  ... 79 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
  at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
  at org.apache.spark.scheduler.Task.run(Task.scala:86)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
  ... 3 more
Caused by: scala.MatchError: WrappedArray([1,,1,A1,Sheet2]) (of class scala.collection.mutable.WrappedArray$ofRef)
  at org.zuinnote.spark.office.excel.ExcelOutputWriter$$anonfun$write$1.apply$mcVI$sp(ExcelOutputWriter.scala:77)
  at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
  at org.zuinnote.spark.office.excel.ExcelOutputWriter.write(ExcelOutputWriter.scala:70)
  at org.apache.spark.sql.execution.datasources.OutputWriter.writeInternal(fileSourceInterfaces.scala:119)
  at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:255)
  at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
  at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1348)
  at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258)
  ... 8 more

Probably because it returns a df with wrappedArrays when it's reading the xlsx

@jornfranke
Copy link
Member

I see. I will look at the error later, might be an issue.

However, why do you need to write the file when you directly load it afterwards?

For example, it is possible create one DF where you put all Excel cells inside (they can be part of different sheets, does not matter) and then write it to one file. No need for different DFs or writing/reloading. Example:
val sRdd = spark.sparkContext.parallelize(Seq(
Seq("","","1","A1","Sheet2"),
Seq("","This is a comment","2","A2","Sheet3"),
Seq("","","3","A3","Sheet4"),
...

@jornfranke
Copy link
Member

Additionally, you could do a simple select after loading:
df.select("rows").write.format("org.zuinnote.spark.office.excel").mode(SaveMode.Overwrite).save
or
df.select(explode(df("rows")).alias("rows")).write.format("org.zuinnote.spark.office.excel").mode(SaveMode.Overwrite).save

I would have to test it, but let me know if you have the chance to test it before me.

@jkommeren
Copy link
Author

jkommeren commented May 19, 2017

However, why do you need to write the file when you directly load it afterwards?

There is 2 reasons for this:

  1. This way I don't have to split the df up in separate cells (I can write them as rows directly to a single sheet with custom name)
  2. The first xlsx / csv (just 1 here, but in production there's 250 of them) is generated by a different process. Of course there are other ways to store it, but this way I figured all I needed to do is read it at the final step (then every cell / row does have different sheet names), do a union with the other xlsx files (each having a different sheet name), write it to a single xlsx, done.

(I'm not saying it should work like this by the way! All I want is an easy / fast way to create a single xlsx based on multiple dfs which represent rows and collumns with raw values, saved as separate sheets in the xlsx, with 1 the front page containing formulas that link to the sheets of raw data. I thought it would already be possible by this hacky way)

So a way could be: an easy way to store a list of rows of data (df) across multiple sheets in 1 xlsx, instead of only being able to specify the 1 "default" sheetname (For example by using the first/last column in the df as the sheet name)
... And allow a mixture of cell based entries (formulas) and simple row based entries

Update: But I think your explode method does what I need :D (=easily split it up in cells) I'll test it tomorrow!

@jornfranke
Copy link
Member

I understand. This is fine then to store and load them again. Please let me know if explode works! I have to correct also my earlier statement - as you already found out - just loading and directly storing does not work, but explode (or a similar statement) should work.

@jkommeren
Copy link
Author

Nope didn't work:

I added the line to the code above

scala> df2.select(explode(df2("rows")).alias("rows")).write.format("org.zuinnote.spark.office.excel").mode(SaveMode.Overwrite).save("/user/spark/output2/")
17/05/22 07:05:50 WARN AzureFileSystemThreadPoolExecutor: Disabling threads for Delete operation as thread count 0 is <= 1
17/05/22 07:05:51 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 13, 10.0.0.22, executor 1): org.apache.spark.SparkException: Task failed while writing rows
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: [1,,1,A1,Sheet2] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
        at org.zuinnote.spark.office.excel.ExcelOutputWriter$$anonfun$write$1.apply$mcVI$sp(ExcelOutputWriter.scala:77)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at org.zuinnote.spark.office.excel.ExcelOutputWriter.write(ExcelOutputWriter.scala:70)
        at org.apache.spark.sql.execution.datasources.OutputWriter.writeInternal(OutputWriter.scala:93)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:245)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

There's a small difference between the two dataframes

scala> df2.select(explode(df2("rows")).alias("rows")).show()
+--------------------+
|                rows|
+--------------------+
|    [1,,1,A1,Sheet2]|
|[2,This is a comm...|
|    [3,,3,A3,Sheet2]|
|              [,,,,]|
|[2.5,,AVERAGE('Sh...|
+--------------------+


scala> df.show()
+--------------------+
|               value|
+--------------------+
| [, , 1, A1, Sheet2]|
|[, This is a comm...|
| [, , 3, A3, Sheet2]|
|[, , AVERAGE('She...|
+--------------------+

One shows spaces and the other doesn't. I've seen it before though, no clue where. Think we're close though!

@jornfranke
Copy link
Member

jornfranke commented May 22, 2017 via email

@jkommeren
Copy link
Author

jkommeren commented May 22, 2017

Maybe this helps? I did some printschemas

df2's printschema before explode:

root
|-- rows: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- formattedValue: string (nullable = true)
|    |    |-- comment: string (nullable = true)
|    |    |-- formula: string (nullable = true)
|    |    |-- address: string (nullable = true)
|    |    |-- sheetName: string (nullable = true)

df2's printschema after explode:

root
|-- rows: struct (nullable = true)
|    |-- formattedValue: string (nullable = true)
|    |-- comment: string (nullable = true)
|    |-- formula: string (nullable = true)
|    |-- address: string (nullable = true)
|    |-- sheetName: string (nullable = true)

printschema of dataframe that is compatible ("df") in example:

root
 |-- value: array (nullable = true)
 |    |-- element: string (containsNull = true)

So it seems one is an array the other a struct, which would explain the different show outputs. You probably knew this already though.

@jkommeren
Copy link
Author

Okay so I got it working, but it is in no way pretty code.

val sRdd = spark.sparkContext.parallelize(Seq(
  Seq("","","1","A1","Sheet2"),
  Seq("","This is a comment","2","A2","Sheet2"),
  Seq("","","3","A3","Sheet2"),
  Seq("","","AVERAGE('Sheet2'!A2:A3)","B1","Sheet1"))).repartition(1)
val df= sRdd.toDF()
df.write.format("org.zuinnote.spark.office.excel").mode(SaveMode.Overwrite).save("/user/spark/output/")


// read
val df2 = spark.read.format("org.zuinnote.spark.office.excel").load("/user/spark/output/")

// explode
val df2fixed = df2.select(explode(df2("rows")).alias("rows"))

// get values from struct to root
val df2fixed2 = df2fixed.select(df2fixed.col("rows.formattedValue"),df2fixed.col("rows.comment"), df2fixed.col("rows.formula"), df2fixed.col("rows.address"), df2fixed.col("rows.sheetName"))

// stick em together
val df3 = df2fixed2.withColumn("con", concat(df2fixed2("formattedValue"),lit(","), df2fixed2("comment"), lit(","), df2fixed2("formula"), lit(","),df2fixed2("address"),lit(","), df2fixed2("sheetName"))).select("con")

// filter out any empty values, and split them to an array
val df5 = df3.filter(x => x.getAs[String](0) !=",,,,").map(x => x.getAs[String](0).split(','))

// output
df5.write.format("org.zuinnote.spark.office.excel").mode(SaveMode.Overwrite).save("/user/spark/output2/")

Maybe this gives you some direction to go with :)

@jornfranke
Copy link
Member

jornfranke commented May 22, 2017 via email

@jkommeren
Copy link
Author

jkommeren commented May 22, 2017

No prob! Thanks for looking into it! (and for your quick responses too!)

@jornfranke
Copy link
Member

val sRdd = sc.parallelize(Seq(Seq("","","1","A1","Sheet1"),Seq("","This is a comment","2","A2","Sheet1"),Seq("","","3","A3","Sheet1"),Seq("","","A2+A3","B1","Sheet1"))).repartition(1)
	val df= sRdd.toDF()
  df.show
  df.printSchema
	df.write
      .format("org.zuinnote.spark.office.excel")
    .option("write.locale.bcp47", "de")
    .save(outputFile)
    val df2=sqlContext.read
    .format("org.zuinnote.spark.office.excel")
    .option("read.locale.bcp47", "de")
.load(outputFile)
  df2.show
  df2.printSchema
  val df3=df2.flatMap(theRow => {


      val spreadSheetCellDAORow=theRow.getAs[WrappedArray[Row]](0)
      val returnArray=new Array[Array[String]](spreadSheetCellDAORow.size)
      var i=0
      for (y <- spreadSheetCellDAORow) {

          val spreadSheedCellDAOArray = new Array[String](5)
          spreadSheedCellDAOArray(0)=y.getString(0)
          spreadSheedCellDAOArray(1)=y.getString(1)
          spreadSheedCellDAOArray(2)=y.getString(2)
          spreadSheedCellDAOArray(3)=y.getString(3)
          spreadSheedCellDAOArray(4)=y.getString(4)
          returnArray(i)=spreadSheedCellDAOArray
          i+=1

    }
    returnArray
  })
     df3.show
     df3.printSchema
     df3.write
         .format("org.zuinnote.spark.office.excel")
       .option("write.locale.bcp47", "de")
       .save(outputFile+".2")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants