Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CAT之Server端源码分析(1) #2

Open
lytofb opened this issue Feb 10, 2017 · 0 comments
Open

CAT之Server端源码分析(1) #2

lytofb opened this issue Feb 10, 2017 · 0 comments

Comments

@lytofb
Copy link
Owner

lytofb commented Feb 10, 2017

Cat Server 主要类介绍

Server的主要入口是cat-core包中的RealTimeConsumer类

RealTimeConsumer类以及RealTimeConsumer的依赖类层级结构如下

cat_analyze_server

Cat Server功能为解码消息,解码后按照固定时间间隔分片,将消息分发到各个Analyzer的消费队列中,然后由各自的Analyzer进行消费

TCPSocketReceiver,DefaultMessageHandler

TCPSocketReceiver主要负责使用netty建立server端,接受到tcp请求后将其解码,通过DefaultMessageHandler将Message交由RealTimeConsumer消费

RealTimeConsumer

在内部初始化PeriodManager,并启动periodManager的线程,该线程会不断根据时间间隔生成新的Period对象,并启动Period对象内的多个PeriodTask线程,PeriodTask线程会根据持有的Anaylyzer和MessageQueue进行消费

当RealTimeConsumer终止时会调用doCheckPoint方法

PeriodManager,PeriodStrategy

PeriodManager主要是以时间切片作为策略来拆分整体数据的,所以PeriodManager中包含的List类型是根据PeriodStrategy中的时间策略获得的。PeriodManager实现Task接口,他的主要任务是在规定的存活期内,每隔一段固定的时间都会创建新的Period对象,并启动Period对象内的多个消费线程

Period

Period中主要包含了一个类型为Map < String, List < PeriodTask > >的属性,该属性根据MessageAnalyzerManager构建。Map < String, List < PeriodTask > >属性是一个在该Period时间片内,不同类型的Analyzer与各个PeriodTask之间的对应关系,因为偶尔有同一个Analyzer会有多个PeriodTask一同消费,根据Hash进行分配的情况,所以value的类型为List。

PeriodTask是消费Message的消费单元,每个PeriodTask中包含了一个queue,一个analyzer,PeriodTask会一直从queue中取出Message让analyzer进行消费

distribute方法实现了将Message分发到该Period中所有PeriodTask中的功能

start方法启动各个PeriodTask线程,对各个PeriodTask的queue中的Message开始消费

finish方法调用各个PeriodTask的finish方法

MessageAnalyzerManager

持有Map < Long, Map < String, List < MessageAnalyzer > > >,该属性包含了各个Analyzer的实例,每个实例可以通过——时间片——analyzer类型/名字来获得,analyzer的数量由各个MessageAnalyzer中getAnalyzerCount获得

PeriodTask

PeriodTask实现Task接口,每个PeriodTask会持有自己专属的analyzer和queue,在线程启动后会调用analyzer的consume方法来消费queue。在调用finish时会调用checkPoint方法,执行analyzer实现的检查点方法

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant