# 簡介

使用 Change Tracking 同步數據: http://www.tracefact.net/tech/102.html<br>
Kafka Connect 實時讀取 MSSQL 數據到 Kafka: http://www.tracefact.net/tech/087.html

# Change Tracking

In [None]:
USE [DataSync]
GO

CREATE TABLE [dbo].[users] (
	[user_id] [int] IDENTITY(1, 1) NOT NULL,
	[user_name] [varchar](50) NOT NULL,
	CONSTRAINT [PK_users] PRIMARY KEY CLUSTERED ( [user_id] ASC )
) 
GO

CREATE TABLE [dbo].[user_play] (
	[id] [int] IDENTITY(1, 1) NOT NULL,
	[user_name] [varchar](50) NOT NULL,
	[room] [varchar](50) NOT NULL,
	[score] [int] NOT NULL,
[remark] [varchar](500) NULL,
CONSTRAINT [PK_user_play] PRIMARY KEY CLUSTERED ( [user_name] ASC, [room] ASC )
)
GO

### 開啟 ChangeTracking

In [None]:
ALTER DATABASE DataSync  
SET CHANGE_TRACKING = ON  
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  
GO

In [None]:
ALTER TABLE DataSync.dbo.users
ENABLE CHANGE_TRACKING  
WITH (TRACK_COLUMNS_UPDATED = OFF)
GO

### Change Tracking 的常用 SQL 語句

##### 獲取數據庫中的哪些表啟用了 Change Tracking

In [None]:
SELECT Object_Name(object_id) table_name, *
FROM sys.change_tracking_tables
GO

<img src='./img/6.png'>

##### 獲取表的最小可用版本和當前版本

In [None]:
SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('users')) as MinValidVersion,
       CHANGE_TRACKING_CURRENT_VERSION() as CurrentVersion
GO

<img src='./img/7.png'>

### 測試 Change Tracking

##### 插入數據

In [None]:
INSERT INTO users(user_name) 
VALUES('子陽')
GO

In [None]:
SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('users')) as MinValidVersion,
       CHANGE_TRACKING_CURRENT_VERSION() as CurrentVersion
GO

<img src='./img/8.png'>

In [None]:
SELECT *
FROM CHANGETABLE(CHANGES [users], 0) ct
GO

<img src='./img/9.png'>

In [None]:
SELECT *
FROM CHANGETABLE(CHANGES [users], 1) ct
GO

<img src='./img/10.png'>

In [None]:
if(lastVersion < minValidVersion){
    lastVersion = minValidVersion - 1;
}

// 接下來
// 1. 調用 select CHANGE_TRACKING_CURRENT_VERSION(), 獲取當前版本 currentVersion，並保存之；同時作為下一次同步時的 lastVersion
// 2. 調用 select * from CHANGETABLE(CHANGES [table_name], lastVersion) c 獲取從 lastVersion 到 currentVersion 的變更數據

In [None]:
SELECT t.*, c.SYS_CHANGE_OPERATION 
FROM CHANGETABLE(CHANGES [users], 0) c left join users t ON c.user_id = t.user_id
GO

##### 更改數據

In [None]:
SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('users')) as MinValidVersion,
       CHANGE_TRACKING_CURRENT_VERSION() as CurrentVersion
GO

<img src='./img/11.png'>

In [None]:
SELECT *
FROM CHANGETABLE(CHANGES [users], 0) ct
GO

<img src='./img/12.png'>

In [None]:
SELECT *
FROM CHANGETABLE(CHANGES [users], 1) ct
GO

<img src='./img/13.png'>

##### 刪除數據

In [None]:
DELETE users 
WHERE user_id = 1

### 其他注意事項

###### 當有多個表時，當前版本是共用的

In [None]:
INSERT INTO user_play(user_name, room, score) 
VALUES('子陽', '標準場', 100)
GO

In [None]:
ALTER TABLE DataSync.dbo.user_play
ENABLE CHANGE_TRACKING  
WITH (TRACK_COLUMNS_UPDATED = ON) 

In [None]:
SELECT CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('user_play')) as user_play_MinValidVersion,
       CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('users')) as users_MinValidVersion,
       CHANGE_TRACKING_CURRENT_VERSION() as CurrentVersion
GO

# 通過 TRACK_COLUMNS_UPDATED 獲取更新的列

In [None]:
UPDATE user_play 
SET score = 150 
WHERE id = 1
GO

In [None]:
SELECT *
FROM CHANGETABLE(CHANGES [user_play], 4) ct
GO

<img src='./img/14.png'>

In [None]:
SELECT CHANGE_TRACKING_IS_COLUMN_IN_MASK( 
        COLUMNPROPERTY(OBJECT_ID('user_play'), 'score', 'ColumnId'),
        c.sys_change_columns
    ) score_changed,
    CHANGE_TRACKING_IS_COLUMN_IN_MASK( 
        COLUMNPROPERTY(OBJECT_ID('user_play'), 'id', 'ColumnId'),
        c.sys_change_columns
    ) id_changed, *
FROM CHANGETABLE(CHANGES [user_play], 4) c

<img src='./img/15.png'>

# 總結