Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SS]《1.1 Structured Streaming 实现思路与实现概述》讨论区 #29

Open
lw-lin opened this issue Jan 1, 2017 · 9 comments
Open

Comments

@lw-lin
Copy link
Owner

lw-lin commented Jan 1, 2017

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

@ar5art
Copy link

ar5art commented Mar 22, 2017

@lw-lin 你好,请教一个问题。
structured streaming的数据源为kafka,并设置任务triggers为1小时。streaming会每间隔一小时确认一次新到达的数据,这里是每间隔一小时去kafka中取新到达的数据,还是隔一小时确认一次streaming的input table中新追加的数据?kafka中的数据会实时追加到input table中吗?如果我想设置任务每一小时跑一次,但是要实时把kafka拉取到input table中并做一次预处理,可以做到吗?
谢谢!

@lw-lin
Copy link
Owner Author

lw-lin commented Mar 27, 2017

@ar5art

是每间隔一小时去kafka中取新到达的数据,还是隔一小时确认一次streaming的input table中新追加的数据?

是前者

kafka中的数据会实时追加到input table中吗?

呃,不会

如果我想设置任务每一小时跑一次,但是要实时把kafka拉取到input table中并做一次预处理,可以做到吗?

用 1min 做 trigger,实时从 kafka 中拉数据、并写出到 HDFS dir x
另起一个任务,用 1hour 做 trigger,从 HDFS dir x 读入,并另外写出?

@zhouyan8603
Copy link

您好,我有个问题想请教下,我当前用的是spark streaming处理每个页面每小时的新增用户数问题的,具体逻辑是:批次读取stream,然后解析stream中的日志事件,用每个事件的pageid和uid拼接为key到数据库表user里查找用户是否存在,如果不存在(说明是该pageid的新用户),则更新user.new表,将对应的pageid下的时间段下的用户数+1。这个流程会有个严重的缺点,就是每条log解析出来的事件和uid都需要去到表里查找,所以表的请求量很大。现在想用structured streaming优化这个方案,目前的想法是先group by pageid,uid agg: floor(min(time)/3600)*3600 as event_time as min_table创建一个memory 表,然后再在这个表的基础上group by pageid,event_time,agg: count(distinct uid)创建一个writestream,然后输出到外部表hbase。但是感觉这种方案不可行,起码memory表会越来越大,另外如何保证尽可能少的对hbase的输出也是个问题?谢谢

@lw-lin
Copy link
Owner Author

lw-lin commented Sep 8, 2017

@zhouyan8603

不管是 spark streaming, 还是 structured streaming, 都可以先做一步 pageid, uid 的聚合,再往外写。structured streaming 的 memory table 确实需要有个过期机制(比如只记录最近三天、或一周的所有用户),否则 oom。

上面说的是精确记录 distinct(id) 的做法。如果不需要精确记录(比如可以接受误差在 5% 以内),那么可以考虑用基于概率的算法,占用空间非常小。比如一些概率算法的索引:http://blog.csdn.net/bagba/article/details/51822189。

@zhouyan8603
Copy link

@lw-lin
感谢回复,另外想了解下structured streaming的应用场景,比如我的问题里提到用memory table这种做法是否适合生产环境?structured streaming的面世是不是不需要以前的那种ETL的批处理了呢?还是说只是实时消费的一种方案?

@lw-lin
Copy link
Owner Author

lw-lin commented Sep 9, 2017

@zhouyan8603
(1) Structured Steaming 基于 Dataset/DataFrame API, Spark Streaming 基于 RDD API,所以 Structured Steaming 能用 SQL 操作实时的 streaming 数据,而且性能会高些,这些都是 Dataset/DataFrame 带来的收益。
(2) 所以应用场景的话,如果需要 SQL 支持,或者比现有 Spark Streaming 更高的性能,可尝试 Structured Streaming。
(3) 你这个场景,Structured Streaming 当然可以用于生产环境,但是如前面我所说,一定要设置一个 memory table 的过期清除机制;额外多说一句,你这个场景,Spark Streaming 也都能做。
(4) Spark 1.x 的批处理是基于 RDD 的,现在 Spark 2.x 的批处理是基于 Dataset/DataFrame 的;Spark
1.x 的 streaming 是基于 RDD 的,Spark 2.x 的批处理是基于 Dataset/DataFrame 的。Spark 2.x 还是需要批处理 + streaming,只不过都是基于 Dataset/DataFrame 的。

@zhouyan8603
Copy link

@lw-lin 万分感谢!我自己再好好考虑下

@cxzdy
Copy link

cxzdy commented May 6, 2019

博主,好想看你出个CoolPlayFlink系列,讲解方式解决了我很多思考,非常受用。

@kuncle
Copy link

kuncle commented Jul 10, 2019

博主你好,
这里第 (5) 步需要分两种情况讨论
(i) 如果上次执行在 (5) 结束前即失效,那么本次执行里 sink 应该完整写出计算结果
(ii) 如果上次执行在 (5) 结束后才失效,那么本次执行里 sink 可以重新写出计算结果(覆盖上次结果),也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了)

需要如何判断第五步是否执行成功?这个需要下游系统支持还是?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants