Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A similar interface to write CSV #21

Closed
ms-tg opened this issue Oct 30, 2014 · 15 comments
Closed

A similar interface to write CSV #21

ms-tg opened this issue Oct 30, 2014 · 15 comments

Comments

@ms-tg
Copy link

ms-tg commented Oct 30, 2014

We really wanted an interface which was like

case class Input(a: Int, b: BigDecimal, c: String, d: Option[String])
val p = mkParser[Input]
p.read(stream) //=> Stream[Input]

case class Output(...)
val w = mkWriter[Output]
stream.foreach(w.write)           // stream = Stream(Output(...), ...)
// OR, w.writeStream(stream)

Is there any interface in working to make this library something more like this? Or is there perhaps another library that is already more like this interface?

@marklister
Copy link
Owner

What about scala-pickling? It does serialization to json and some other formats and one could probably write a csv backend quite easily.

To write something that converted a Product to csv doesn't sound too difficult:

scala> def stringify (a:Any)= 
     |  a match{
     |   case s:String => "\""+s+"\""
     |   case a:Any => a.toString
     | }
stringify: (a: Any)String

scala> def toCsv(p:Product) = p.productIterator.map(stringify(_)).mkString(",")
toCsv: (p: Product)String

scala> case class T(a:Int, b:String)
defined class T

scala> T(1,"hello")
res0: T = T(1,hello)

scala> toCsv(res0)
res1: String = 1,"hello"

Of course the devil is in the detail. CSV quoting can get quite complicated.

I might do this for product-collections since it's so simple.

@marklister
Copy link
Owner

I pushed branch CsvWriter with support for Csv output:

scala> import com.github.marklister.collections.io.CsvOutputUtils._
import com.github.marklister.collections.io.CsvOutputUtils._

scala> CollSeq((1,2,3.5,"hello"),
     | (5,6,7.7,"\"dude\""))
res0: com.github.marklister.collections.immutable.CollSeq4[Int,Int,Double,String] =
CollSeq((1,2,3.5,hello),
        (5,6,7.7,"dude"))

scala> res0.csvIterator.toList
res2: List[String] = List(1,2,3.5,"hello", 5,6,7.7,"""dude""")

scala> res0.csvIterator.toStream
res3: scala.collection.immutable.Stream[String] = Stream(1,2,3.5,"hello", ?)

scala> res0.csvIterator
res4: Iterator[String] = non-empty iterator

scala> val w= new java.io.StringWriter
w: java.io.StringWriter =

scala> res0.writeCsv(w)

scala> w.close

scala> w.toString
res7: String =
"1,2,3.5,"hello"
5,6,7.7,"""dude"""
"

scala> CsvParser[Int,Int,Double,String].parse(new java.io.StringReader(res7))
res8: com.github.marklister.collections.immutable.CollSeq4[Int,Int,Double,String] =
CollSeq((1,2,3.5,hello),
        (5,6,7.7,"dude"))

Experimental for now but it's only about 15 loc and fairly non-controversial so I see it making it into the distribution.

@marklister
Copy link
Owner

And it'll work on a Sequence of case classes too.

@ms-tg
Copy link
Author

ms-tg commented Oct 31, 2014

@marklister Thanks for looking into this, @flicken and I will take a look and see how this works for us.

Did we miss it, or does the reader (and this new writer) API support lazy streams? In other words, can CollSeq and other APIs be made lazy, for dealing with multiple million row CSV files as streams?

Thanks, and cheers!
-Marc

@marklister
Copy link
Owner

@ms-tg,

Did we miss it, or does the reader (and this new writer) API support lazy streams?

I've added an experimental writer that supports Iterator which can be turned into a Stream using toStream or even toSeq. You need to check the CsvWriter branch of the repo. The reader is still not lazy.

dealing with multiple million row CSV

The project that demonstrates dealing with that scale of data is Saddle. It still reads everything into memory but uses some tricks (and arrays) to make this a little more efficient. I wrote product-collections after trying Saddle, primarily because I didn't like its I/O (too complicated and doesn't respect types). But lots of people use it and it looks really good.

The other project to look into might be framian. I don't know that much about it but I think that it's written in response to some aspects of Saddle that didn't work for them. There are lots of folks working on it and it looks like it's an open sourced company component. I don't know how it scales. They don't have any documentation (well I just added a link to their scaladoc) but there's a video around describing how it works.

Similarly I don't know how product-collections scales -- I use it on 1000s of rows data without any difficulty... But I'm pretty sure it won't work well on millions of rows data. It's underlying construction is scalaWrappedArray:

scala> CollSeq((1,2,3)).underlying
res10: Seq[Product3[Int,Int,Int]] = WrappedArray((1,2,3))

Thanks for your interest! Please don't take my "out of scope" responses on the other issues as lack of interest. I just want to keep this product focused on what it does well: easy, strongly typed, medium to small data coupled with intuitive I/O.

If I can see a way to easily hive off the I/O I'll do that. As it it the new Writer can serialize a Sequence of case classes to CSV because that's basically for free.

@ms-tg
Copy link
Author

ms-tg commented Oct 31, 2014

Hi @marklister thank you very much for your kind attention to our questions! :)

We need to process some 5-10 million row CSVs, and wanted something with the elegant API that you have.

Correct me if I am wrong, but it looks to me as though your previous "out of scope" response was perhaps not the final word, if I understand what you've posted under issue #20 correctly? 👍

Since opencsv supports streaming (test assumption: doesn't it?), we were hoping that by simply changing some assumptions about types in this library, that we'd be able to get it quickly where we need to go. It seems that, by introducing the Scala Iterator, you are thinking along those same lines?

/cc @flicken @waseemtaj

@marklister
Copy link
Owner

@ms-tg @flicken @waseemtaj

I don't really have the tooling or the bandwidth to test. Would you mind testing CsvOutput (just writeCsv on a Stream should be Ok) for acceptable performance or bottlenecks? Changes now merged into master. I'll try to push a snapshot later.

This was referenced Nov 1, 2014
@ms-tg
Copy link
Author

ms-tg commented Nov 5, 2014

Hi @marklister: the current CsvOutputUtils, unfortunately, causes the entire Stream to be materialized and held in memory. The only way to allow GC to collect the earlier parts of a Scala stream is to not hold a reference to the original Stream.

That's why folks use either @tailrec methods or while loops to look only at head and tail, and discard references to what's already been seen.

We'll try to get something that works and let you know.

@marklister
Copy link
Owner

Streams sure are tricky. I had hoped as everything worked through an Iterator that this problem would be negated.

@marklister
Copy link
Owner

@ms-tg,

I ran this simplistic test:

package com.github.marklister.collections.io
import Utils.CsvOutput

object MemTest {

  def fakeData = new Iterable[Product]() {
    override def iterator: Iterator[Product] = new AbstractIterator[Product] {
      var n=0
      override def next() = {n+=1;(1,2,3,4,5)}
      override def hasNext: Boolean = n<100000000
    }
  }

  def main (args:Array[String]):Unit={
    System.gc()
    val start = Runtime.getRuntime.freeMemory()
    var f = new java.io.FileWriter("/dev/null")
    fakeData.writeCsv(f)
    println(fakeData.head)
    f.close
    f=null
    System.gc
    val end=Runtime.getRuntime.freeMemory()
    println ("Memory usage:"+((end-start)/1024/1024)+ "Mb")
  }
}

This completes (100 mill rows) and indicates as low as 5mb final usage. So the implict class per se is not gobbling memory. Can it be holding a reference to the head of the underlying Iterable?

CsvOutputUtils is looking only for an Iterable[Product]:

implicit class CsvOutput(c: Iterable[Product]) {

and Iterable only has one method:

def iterator: Iterator[Product]

Here's where the head is accessed:

    def writeCsv(w: java.io.Writer,separator:String=","):Unit ={
      csvIterator(separator).map(_ + "\r\n")
        .foreach(w.write(_))
    }

So the head is only accessed once on the first call to next on the Iterator and then written to the Writer and discarded.

My research indicates that if one was holding a val pointing at a Stream the memoized elements can't be garbage collected. This might be the issue? Can you reference the Stream using a def?

I should also probably reference the underlying Iterable in the implicit class by name not by value but I don't think this will make any difference to this case.

@marklister
Copy link
Owner

From the scaladoc

Note that some operations, including drop, dropWhile, flatMap or collect may process a large number of intermediate elements before returning. These necessarily hold onto the head, since they are methods on Stream, and a stream holds its own head. For computations of this sort where memoization is not desired, use Iterator when possible.

And wrt def:

One must be cautious of memoization; you can very quickly eat up large amounts of memory if you're not careful. The reason for this is that the memoization of the Stream creates a structure much like scala.collection.immutable.List. So long as something is holding on to the head, the head holds on to the tail, and so it continues recursively. If, on the other hand, there is nothing holding on to the head (e.g. we used def to define the Stream) then once it is no longer being used directly, it disappears.

@ms-tg
Copy link
Author

ms-tg commented Nov 6, 2014

Thanks, yup that makes sense. What we found is that if the Stream is reference held in the implicit class then it doesn't work, but we re-implemented your implicit class to accept a TraversableOnce[Product] instead of a Seq. By doing it that way, we can pass it an Iterator (whether the one we get from CsvParser or another from Stream#iterator), and processing it that way allows garbage collection and does not blow out memory.

If we don't do that, unfortunately, passing a Stream directly to the implicit class will prevent GC and blow out memory.

@marklister
Copy link
Owner

Reproduced with my test case using Stream.continually. I'll investigate. I'd like to get this working as is because it's simple and neat.

@marklister
Copy link
Owner

The interface is written and Ok therefore closing this enhancement request. Information on GC (which I'm pretty sure is fixed) should go onto #24.

@ms-tg
Copy link
Author

ms-tg commented Nov 7, 2014

@marklister Thank you 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants