-
Notifications
You must be signed in to change notification settings - Fork 19
/
oblog_access.h
121 lines (87 loc) · 2.49 KB
/
oblog_access.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
* Copyright (c) 2021 OceanBase
* OceanBase Migration Service LogProxy is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#pragma once
#include <string>
#include <map>
#include <stdint.h>
// this three MUST BE ordered like this
#include "LogRecord.h"
#ifdef USE_OBCDC_NS
#include "libobcdc.h"
#ifdef USE_LIBOBLOG_3
using oceanbase::liboblog::IObLog;
using oceanbase::liboblog::ObLogError;
using oceanbase::liboblog::ObLogFactory;
#else
typedef oceanbase::libobcdc::IObCDCInstance IObLog;
typedef oceanbase::libobcdc::ObCDCFactory ObLogFactory;
typedef oceanbase::libobcdc::ObCDCError ObLogError;
#define construct_oblog construct_obcdc
#endif
#else
#include "liboblog.h"
using oceanbase::liboblog::IObLog;
using oceanbase::liboblog::ObLogError;
using oceanbase::liboblog::ObLogFactory;
#endif
#ifndef OB_TIMEOUT
#define OB_TIMEOUT -4012
#endif
#ifndef OB_SUCCESS
#define OB_SUCCESS 0
#endif
namespace oceanbase {
namespace logproxy {
#ifndef NEED_MAPPING_CLASS
using namespace oceanbase::logmessage;
#endif
class OblogAccess {
public:
OblogAccess();
virtual ~OblogAccess();
int init(const std::map<std::string, std::string>& configs, uint64_t start_timestamp);
int init_with_us(const std::map<std::string, std::string>& configs, uint64_t start_timestamp_us);
int start();
void stop();
int fetch(ILogRecord*& record);
int fetch(ILogRecord*& record, uint64_t timeout_us);
void release(ILogRecord* record);
uint64_t start_timestamp() const
{
return _start_timestamp;
}
void set_start_timestamp(uint64_t start_timestamp)
{
_start_timestamp = start_timestamp;
}
uint64_t wait_timeout_us() const
{
return _wait_timeout_us;
}
void set_wait_timeout_us(uint64_t wait_timeout_us)
{
_wait_timeout_us = wait_timeout_us;
}
private:
static void handle_error(const ObLogError& error);
private:
// state
uint64_t _start_timestamp = 0;
uint64_t _start_timestamp_us = 0;
// params
uint64_t _wait_timeout_us = 100;
// liboblog
IObLog* _oblog;
ObLogFactory* _oblog_factory;
};
} // namespace logproxy
} // namespace oceanbase