Skip to content

Commit

Permalink
20191204 idc sync (#86)
Browse files Browse the repository at this point in the history
* fix temp push

* update version 5.2.1-SNAPSHOT

* fix test case

* fix jetty version,and fix rest api for dataInfoIds

* fix hashcode test

* fix working to init bug

* fix start task log

* fix Watcher can't get providate data,retry and finally return new

* add data server list api

* add server list api

* remove log

* fix isssue 21

* add query by id function

* fix issue 22

* delay client off process and sync data process to working status

* fix data connet meta error

* fix inject NotifyDataSyncHandler

* fix start log

* add send sub log

* fix subscriber to send log

* bugfix: #27

* bugfix: #27

* feature: Add monitoring logs #29

* feature: Add monitoring logs #29
(1) bugfix CommonResponse
(2) format

* bugfix: During meta startup, leader may not register itself #30

* bugfix: Sometimes receive "Not leader" response from leader in OnStartingFollowing() #31

* temp add

* add renew request

* data snapshot module

* add calculate digest service

* fix word cache clientid

* data renew module

* data renew/expired module

* add renew datuem request

* add WriteDataAcceptor

* session renew/expired module

* 1. bugfix ReNewDatumHandler: getByConnectId -> getOwnByConnectId
2. reactor DatumCache from static to instance

* add blacklist wrapper and filter

* upgrade jraft version to 1.2.5

* blacklist ut

* add clientoff delay time

* bugfix: The timing of snapshot construction is not right

* rename: ReNew -> Renew

* fix blacklist test case

* rename: unpub -> unPub

* add threadSize and queueSize limit

* bugfix: revert SessionRegistry

* fix sub fetch retry all error,and reset datainfoid version

* fix client fast chain breakage data can not be cleaned up”

* (1) remove logback.xml DEBUG level;
(2) dataServerBootstrapConfig rename;
(3) print conf when startup

* update log

* fix update zero version,and fix log

* add clientOffDelayMs default value

* fix clientOffDelayMs

* Task(DatumSnapshot/Pub/UnPub) add retry strategy

* bugfix DataNodeServiceImpl: retryTimes

* (1)cancelDataTaskListener duplicate
(2)bugfix DataNodeServiceImpl and SessionRegistry

* refactor datum version

* add hessian black list

* bugfix: log "retryTimes"

* bugfix DatumLeaseManager:  Consider the situation of connectId lose after data restart; ownConnectId should calculate dynamically

* add jvm blacklist api

* fix file name

* some code optimization

* data:refactor snapshot

* fix jetty version

* bugfix DatumLeaseManager: If in a non-working state, cannot clean up because the renew request cannot be received at this time.

* remove SessionSerialFilterResource

* WriteDataProcessor add TaskEvent log; Cache print task update

* data bugfix: snapshot must notify session

* fix SubscriberPushEmptyTask default implement

* merge new

* fix protect

* 1. When the pub of connectId is 0, no clearance action is triggered.
2. Print map. size regularly
3. Delete the log: "ConnectId (% s) expired, lastRenewTime is% s, pub. size is 0"

* DataNodeExchanger: print but ignore if from renew module, cause renew request is too much

* reduce log of renew

* data bugfix: Data coverage is also allowed when versions are equal. Consistent with session design.

* DatumCache bugfix: Index coverage should be updated after pubMap update

* DatumSnapshotHandler: limit print; do not call dataChangeEventCenter.onChange if no diff

* bugfix unpub npe (pub maybe already clean by DatumLeaseManager);LIMITED_LIST_SIZE_FOR_PRINT change to 30

* some code refactor

* add code comment

* fix data working to init,and fix empty push version

* consider unpub is isWriteRequest, Reduce Snapshot frequency

* RefreshUpdateTime is at the top, otherwise multiple snapshot can be issued concurrently

* update config: reduce retryTimes, increase delayTime, the purpose is to reduce performance consumption

* put resume() in finally code block, avoid lock leak

* modify renewDatumWheelTaskDelay and datumTimeToLiveSec

* When session receives a connection and generates renew tasks, it randomly delays different times to avoid everyone launching renew at the same time.

* data: add executor for handler
session: bugfix snapshot
session: refactor wheelTimer of renew to add executor

* add get data log

* snapshot and lastUpdateTimestamp: Specific to dataServerIP

* 1. DataServer: RenewDatumHandler must return GenericResponse but not CommonResponse, or else session will class cast exception
2. No need to update timestamp after renew
3. snapshot: Need to specify DataServerIP

* add logs

* 1. dataServer: reduce log of snapshotHandler
2. update logs

* dataServer: renew logic should delay for some time after status is WORKING, cause Data is processed asynchronously after synchronization from other DataServer

* bugfix bean; update log

* ignore renew request log

* fix UT

* fix .travis.yml

* fix version 5.3.0-SNAPSHOT

* fix online notify connect error

* fix push confirm error,and fix datum update version,pub threadpool config,add accesslimit service

* fix push confirm error,and fix datum update version,pub threadpool config,add accesslimit service (#45)

* add switch renew and expire

* implement renew enable/disable switch

* fix data client exechange log

* fix datum fetch connect error

* bugfix CacheService: set version zero when first sub and get datum error

* fix clean task for fetch

* bugfix DatumCache: Forget to clean up the index in datumCache.putSnapshot

* Session&Data increase WordCache use

* code optimize

* WordCache: registerId do not add WordCache

* fix fetch datum word cache

* fix NotifyFetchDatumHandler npe

* fix test case time

* fix test cast

* fix test case

* fix tast case

* fix ut case: StopPushDataSwitchTest

* ut case:renew module

* fix ut case:TempPublisherTest

* fix version,and merge new

* bugfix ut case: increase sleep time

* fix ut case:RenewTest

* fix version and fix callback executor,fix log error

* fix ut case:RenewTest format

* fix pom version

* fix ut case:do not run parallelly

* refactor providerdata process

* Memory optimization:Datum.processDatum

* add session notify test

* copy from mybank:
1. Update Subscriber: support for push context
2. increase queueSize of checkPushExecutor
3. fix the isolation function of Gzone and Rzone

* Modify the deny policy of accessDataExecutor of SessionServer

* remove useless code

* fix call back

* fix meta methodhandle cache

* fix push confirm success

* Change the communication between session and data to multi connection

* resolve compile error

* fix processor

* BoltClient: the creation of ConnectionEventAdapter should be inheritable

* fix currentTimeMillis product from source

* add client Invalid check task

* use multiple RpcClient instances instead of one RpcClient with multiple connections,and start a heartbeat thread to ensure connection pool because bolt does not maintain the number of connection pools

* refactor TaskListener and use map instead of list in DefaultTaskListenerManager; refactor getSingleTaskDispatcher()

* DataChangeRequestHandler:optimize performance

* refactor: Heartbeat between session and data

* fix: Synex-wh#20 (review)

* update

* BoltClient use just one RpcClient;
remove heartbeat between session and data;

* SyncDataCallback reduce ThreadSize for saving cpu

* reduce NOTIFY_SESSION_CALLBACK_EXECUTOR threadSize

* 1. filter out the unPubs of datum when first put, Otherwise, "syncData" or "fetchData" get Datum may contains unPubs, which will result something error
2. add jul-to-slf4j for some lib which use jul log, e.g. hessian

* update for idc sync:
1. add a interface DatumStorage and implemented by LocalDatumStorage
2. remove Sync from BackUpNotifier
3. add RemoteDataServerChangeEvent

* 1. NotifyProvideDataChange support multiple nodeTypes
2. refactor provideData code of DataServer, just like SessionServer
3. remove GetChangeListRequestHandler to enterprise version because it's about multiple data centers

* use getClientRegion() instead of getSessionServerRegion() for push

* bugfix LocalDatumStorage#getVersions

* bugfix DataDigestResource api

* bugfix DataDigestResource api

* fix BoltClient: remove unnecessary code

* give more thread for getOtherDataCenterNodeAndUpdate, because otherwise it would rejected if too much task

* grefresh for keep connect other dataServers: should use dataServerCache but not DataServerNodeFactory

* revert "delay cache invalid in DataChangeFetchTask&DataChangeFetchCloudTask",because if the old datum is not invalid, the new subscriber will get the old datum directly from the cache

* bugfix MetaStoreService&DataStoreService: "return" -> "continue"

* fix Memory waste of ServerDataBox

* revert MetaDigestResource api

* Request add method "getTimeout"

* bugfix: remove @ConditionalOnMissingBean for fetchDataHandler

* fix compile error

* RequestException: limit message size

* bugfix: empty dataServerList cause NPE because calculateOldConsistentHash return null

* trigger github ci

* trigger github ci

* fix ut

* update version to 5.4.0

Co-authored-by: Synex-wh <241809311@qq.com>
  • Loading branch information
atellwu and Synex-wh committed Feb 14, 2020
1 parent c3b99f3 commit f05811c
Show file tree
Hide file tree
Showing 109 changed files with 2,005 additions and 1,011 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -7,7 +7,7 @@

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。SOFARegistry 最早源自于淘宝的 ConfigServer,十年来,随着蚂蚁金服的业务发展,注册中心架构已经演进至第五代。目前 SOFARegistry 不仅全面服务于蚂蚁金服的自有业务,还随着蚂蚁金融科技服务众多合作伙伴,同时也兼容开源生态。SOFARegistry 采用 AP 架构,支持秒级时效性推送,同时采用分层架构支持无限水平扩展。

## 功能特性
## 功能特性

- 支持服务发布与服务订阅
- 支持服务变更时的主动推送
Expand Down
2 changes: 1 addition & 1 deletion client/all/pom.xml
Expand Up @@ -6,7 +6,7 @@

<groupId>com.alipay.sofa</groupId>
<artifactId>registry-client-all</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>

<name>${project.groupId}:${project.artifactId}</name>
<url>http://github.com/alipay/sofa-registry</url>
Expand Down
2 changes: 1 addition & 1 deletion client/api/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-client-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion client/impl/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-client-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion client/log/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-client-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion client/pom.xml
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<packaging>pom</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down
2 changes: 1 addition & 1 deletion server/common/model/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-common</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Expand Up @@ -16,15 +16,15 @@
*/
package com.alipay.sofa.registry.common.model;

import com.fasterxml.jackson.annotation.JsonIgnore;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

import com.fasterxml.jackson.annotation.JsonIgnore;

/**
*
* @author zhuoyu.sjw
Expand Down Expand Up @@ -93,7 +93,7 @@ public Object getObject() {
* @throws ClassNotFoundException the class not found exception
*/
public Object extract() throws IOException, ClassNotFoundException {
if (isInBytes()) {
if (object == null && isInBytes()) {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
if (serialization != SERIALIZED_BY_JAVA) {
throw new IOException("Unsupported serialization type: " + serialization);
Expand Down
Expand Up @@ -16,9 +16,11 @@
*/
package com.alipay.sofa.registry.common.model.metaserver;

import com.alipay.sofa.registry.common.model.Node.NodeType;

import java.io.Serializable;
import java.util.Set;

import com.alipay.sofa.registry.common.model.Node.NodeType;
import com.google.common.collect.Sets;

/**
*
Expand All @@ -27,13 +29,13 @@
*/
public class NotifyProvideDataChange implements Serializable {

private String dataInfoId;
private String dataInfoId;

private Long version;
private Long version;

private DataOperator dataOperator;
private DataOperator dataOperator;

private NodeType nodeType = NodeType.SESSION;
private Set<NodeType> nodeTypes;

/**
* constructor
Expand All @@ -42,9 +44,15 @@ public class NotifyProvideDataChange implements Serializable {
* @param dataOperator
*/
public NotifyProvideDataChange(String dataInfoId, Long version, DataOperator dataOperator) {
this(dataInfoId, version, dataOperator, Sets.newHashSet(NodeType.SESSION));
}

public NotifyProvideDataChange(String dataInfoId, Long version, DataOperator dataOperator,
Set<NodeType> nodeTypes) {
this.dataInfoId = dataInfoId;
this.version = version;
this.dataOperator = dataOperator;
this.nodeTypes = nodeTypes;
}

/**
Expand Down Expand Up @@ -106,17 +114,17 @@ public void setDataOperator(DataOperator dataOperator) {
*
* @return property value of nodeType
*/
public NodeType getNodeType() {
return nodeType;
public Set<NodeType> getNodeTypes() {
return nodeTypes;
}

/**
* Setter method for property <tt>nodeType</tt>.
*
* @param nodeType value to be assigned to property nodeType
* @param nodeTypes value to be assigned to property nodeType
*/
public void setNodeType(NodeType nodeType) {
this.nodeType = nodeType;
public void setNodeTypes(Set<NodeType> nodeTypes) {
this.nodeTypes = nodeTypes;
}

@Override
Expand All @@ -125,7 +133,7 @@ public String toString() {
sb.append("dataInfoId='").append(dataInfoId).append('\'');
sb.append(", version=").append(version);
sb.append(", dataOperator=").append(dataOperator);
sb.append(", nodeType=").append(nodeType);
sb.append(", nodeTypes=").append(nodeTypes);
sb.append('}');
return sb.toString();
}
Expand Down
2 changes: 1 addition & 1 deletion server/common/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-server-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion server/common/util/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-common</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion server/consistency/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-server-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion server/distribution/data/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-distribution</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion server/distribution/integration/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-distribution</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion server/distribution/meta/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-distribution</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion server/distribution/pom.xml
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-server-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion server/distribution/session/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-distribution</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion server/pom.xml
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-parent</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion server/remoting/api/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-remoting</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Expand Up @@ -16,11 +16,11 @@
*/
package com.alipay.sofa.registry.remoting;

import com.alipay.sofa.registry.common.model.store.URL;

import java.net.InetSocketAddress;
import java.util.Collection;

import com.alipay.sofa.registry.common.model.store.URL;

/**
*
* @author shangyu.wh
Expand Down
Expand Up @@ -25,7 +25,8 @@
*/
public class RequestException extends Exception {

private Request request;
private static final int MAX_BODY_SIZE = 512;
private Request request;

/**
* constructor
Expand Down Expand Up @@ -81,8 +82,13 @@ public RequestException(Throwable cause) {
public String getMessage() {
StringBuilder sb = new StringBuilder();
if (request != null) {
String requestBody = request.getRequestBody() != null ? request.getRequestBody()
.toString() : "null";
if (requestBody.length() > MAX_BODY_SIZE) {
requestBody = requestBody.substring(0, MAX_BODY_SIZE);
}
sb.append("request url: ").append(request.getRequestUrl()).append(", body: ")
.append(request.getRequestBody()).append(", ");
.append(requestBody).append(", ");
}
sb.append(super.getMessage());
return sb.toString();
Expand Down
Expand Up @@ -16,11 +16,11 @@
*/
package com.alipay.sofa.registry.remoting.exchange.message;

import java.util.concurrent.atomic.AtomicInteger;

import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.remoting.CallbackHandler;

import java.util.concurrent.atomic.AtomicInteger;

/**
* The interface Request.
*
Expand Down Expand Up @@ -60,4 +60,8 @@ default CallbackHandler getCallBackHandler() {
default AtomicInteger getRetryTimes() {
return new AtomicInteger();
}

default Integer getTimeout() {
return null;
}
}
2 changes: 1 addition & 1 deletion server/remoting/bolt/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-remoting</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Expand Up @@ -120,7 +120,6 @@ public Channel connect(URL url) {
throw new IllegalArgumentException("Create connection url can not be null!");
}
try {
getBoltConnection(rpcClient, url);
Connection connection = getBoltConnection(rpcClient, url);
BoltChannel channel = new BoltChannel();
channel.setConnection(connection);
Expand Down
2 changes: 1 addition & 1 deletion server/remoting/http/pom.xml
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>registry-remoting</artifactId>
<version>5.4.0-SNAPSHOT</version>
<version>5.4.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down

0 comments on commit f05811c

Please sign in to comment.