Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SS]《2.2 Structured Streaming 之 Sink 解析》讨论区 #32

Open
lw-lin opened this issue Jan 1, 2017 · 3 comments
Open

[SS]《2.2 Structured Streaming 之 Sink 解析》讨论区 #32

lw-lin opened this issue Jan 1, 2017 · 3 comments

Comments

@lw-lin
Copy link
Owner

lw-lin commented Jan 1, 2017

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

@ShiyuChai
Copy link

ShiyuChai commented Sep 11, 2017

当使用structured streaming的foreach sink时,如果希望对partition内的value进行批量输出时,有什么好的办法么。

val query = words.writeStream.foreach(new ForeachWriter[String] {

      val arr = scala.collection.mutable.ArrayBuffer[String]()
      var conn: Connection = _
      override def open(partitionId: Long, version: Long): Boolean = {
        conn = DBUtil.getConn
        true
      }

      override def close(errorOrNull: Throwable) = {
        DBUtil.insert(conn,arr.toArray)
        conn.close()
      }

      override def process(value: String) = {
        arr += value
      }

    }).start()

目前采用的是在 process 中把数据添加到集合,再在close方法内进行批量写入。有其他更为优雅的写法么,谢谢

@lw-lin
Copy link
Owner Author

lw-lin commented Sep 12, 2017

@Dreamtecher

非常好的问题 —— 但目前的版本(as of 2017.09, Spark 2.2.0)我没有想到更好的写法。

@ShiyuChai
Copy link

@lw-lin
好的,谢谢您的回复~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants