Skip to content

Commit

Permalink
Make events as records
Browse files Browse the repository at this point in the history
  • Loading branch information
rusuly committed Oct 18, 2023
1 parent c010ad7 commit 85a3bec
Show file tree
Hide file tree
Showing 28 changed files with 215 additions and 373 deletions.
2 changes: 1 addition & 1 deletion src/MySqlCdc/BinlogReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private EventHeader GetEventHeader(ReadOnlySequence<byte> buffer)
{
using var memoryOwner = new MemoryOwner(buffer.Slice(0, EventConstants.HeaderSize));
var reader = new PacketReader(memoryOwner.Memory.Span);
return new EventHeader(ref reader);
return EventHeader.Read(ref reader);
}

private HeaderWithEvent Deserialize(ReadOnlySequence<byte> packet)
Expand Down
37 changes: 14 additions & 23 deletions src/MySqlCdc/Events/DeleteRowsEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,38 @@ namespace MySqlCdc.Events;
/// Represents one or many deleted rows in row based replication.
/// <a href="https://mariadb.com/kb/en/library/rows_event_v1/">See more</a>
/// </summary>
public class DeleteRowsEvent : IBinlogEvent
/// <remarks>
/// Creates a new <see cref="DeleteRowsEvent"/>.
/// </remarks>
public record DeleteRowsEvent(
long TableId,
int Flags,
int ColumnsNumber,
bool[] ColumnsPresent,
IReadOnlyList<RowData> Rows) : IBinlogEvent
{
/// <summary>
/// Gets id of the table where rows were deleted
/// </summary>
public long TableId { get; }
public long TableId { get; } = TableId;

/// <summary>
/// Gets <a href="https://mariadb.com/kb/en/rows_event_v1/#flags">flags</a>
/// </summary>
public int Flags { get; }
public int Flags { get; } = Flags;

/// <summary>
/// Gets number of columns in the table
/// </summary>
public int ColumnsNumber { get; }
public int ColumnsNumber { get; } = ColumnsNumber;

/// <summary>
/// Gets bitmap of columns present in row event. See binlog_row_image parameter.
/// </summary>
public bool[] ColumnsPresent { get; }
public bool[] ColumnsPresent { get; } = ColumnsPresent;

/// <summary>
/// Gets deleted rows
/// </summary>
public IReadOnlyList<RowData> Rows { get; }

/// <summary>
/// Creates a new <see cref="DeleteRowsEvent"/>.
/// </summary>
public DeleteRowsEvent(
long tableId,
int flags,
int columnsNumber,
bool[] columnsPresent,
IReadOnlyList<RowData> rows)
{
TableId = tableId;
Flags = flags;
ColumnsNumber = columnsNumber;
ColumnsPresent = columnsPresent;
Rows = rows;
}
public IReadOnlyList<RowData> Rows { get; } = Rows;
}
2 changes: 1 addition & 1 deletion src/MySqlCdc/Events/EventDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ protected EventDeserializer()

internal virtual HeaderWithEvent DeserializeEvent(ref PacketReader reader)
{
var eventHeader = new EventHeader(ref reader);
var eventHeader = EventHeader.Read(ref reader);

// Consider verifying checksum
// ChecksumType.Verify(eventBuffer, checksumBuffer);
Expand Down
36 changes: 22 additions & 14 deletions src/MySqlCdc/Events/EventHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,56 @@ namespace MySqlCdc.Events;
/// See <a href="https://mariadb.com/kb/en/library/2-binlog-event-header/">MariaDB docs</a>
/// See <a href="https://dev.mysql.com/doc/internals/en/binlog-version.html">MySQL docs</a>
/// </summary>
public class EventHeader
public record EventHeader(
long Timestamp,
EventType EventType,
long ServerId,
long EventLength,
long NextEventPosition,
int EventFlags)
{
/// <summary>
/// Provides creation time in seconds from Unix.
/// </summary>
public long Timestamp { get; }
public long Timestamp { get; } = Timestamp;

/// <summary>
/// Gets type of the binlog event.
/// </summary>
public EventType EventType { get; }
public EventType EventType { get; } = EventType;

/// <summary>
/// Gets id of the server that created the event.
/// </summary>
public long ServerId { get; }
public long ServerId { get; } = ServerId;

/// <summary>
/// Gets event length (header + event + checksum).
/// </summary>
public long EventLength { get; }
public long EventLength { get; } = EventLength;

/// <summary>
/// Gets file position of next event.
/// </summary>
public long NextEventPosition { get; }
public long NextEventPosition { get; } = NextEventPosition;

/// <summary>
/// Gets event flags. See <a href="https://mariadb.com/kb/en/2-binlog-event-header/#event-flag">documentation</a>.
/// </summary>
public int EventFlags { get; }
public int EventFlags { get; } = EventFlags;

/// <summary>
/// Creates a new <see cref="EventHeader"/>.
/// </summary>
public EventHeader(ref PacketReader reader)
public static EventHeader Read(ref PacketReader reader)
{
Timestamp = reader.ReadUInt32LittleEndian();
EventType = (EventType)reader.ReadByte();
ServerId = reader.ReadUInt32LittleEndian();
EventLength = reader.ReadUInt32LittleEndian();
NextEventPosition = reader.ReadUInt32LittleEndian();
EventFlags = reader.ReadUInt16LittleEndian();
return new EventHeader(
reader.ReadUInt32LittleEndian(),
(EventType)reader.ReadByte(),
reader.ReadUInt32LittleEndian(),
reader.ReadUInt32LittleEndian(),
reader.ReadUInt32LittleEndian(),
reader.ReadUInt16LittleEndian()
);
}
}
12 changes: 1 addition & 11 deletions src/MySqlCdc/Events/EventWithHeader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,4 @@

namespace MySqlCdc.Events;

internal class HeaderWithEvent : IPacket
{
public EventHeader Header { get; }
public IBinlogEvent Event { get; }

public HeaderWithEvent(EventHeader header, IBinlogEvent @event)
{
Header = header;
Event = @event;
}
}
internal record HeaderWithEvent(EventHeader Header, IBinlogEvent Event) : IPacket;
27 changes: 10 additions & 17 deletions src/MySqlCdc/Events/FormatDescriptionEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,26 @@ namespace MySqlCdc.Events;
/// See <a href="https://dev.mysql.com/doc/internals/en/format-description-event.html">MySQL docs</a>
/// See <a href="https://mariadb.com/kb/en/library/5-slave-registration/#events-transmission-after-com_binlog_dump">start events flow</a>
/// </summary>
public class FormatDescriptionEvent : IBinlogEvent
/// <remarks>
/// Creates a new <see cref="FormatDescriptionEvent"/>.
/// </remarks>
public record FormatDescriptionEvent(
int BinlogVersion,
string ServerVersion,
ChecksumType ChecksumType) : IBinlogEvent
{
/// <summary>
/// Gets binary log format version. This should always be 4.
/// </summary>
public int BinlogVersion { get; }
public int BinlogVersion { get; } = BinlogVersion;

/// <summary>
/// Gets MariaDB/MySQL server version name.
/// </summary>
public string ServerVersion { get; }
public string ServerVersion { get; } = ServerVersion;

/// <summary>
/// Gets checksum algorithm type.
/// </summary>
public ChecksumType ChecksumType { get; }

/// <summary>
/// Creates a new <see cref="FormatDescriptionEvent"/>.
/// </summary>
public FormatDescriptionEvent(
int binlogVersion,
string serverVersion,
ChecksumType checksumType)
{
BinlogVersion = binlogVersion;
ServerVersion = serverVersion;
ChecksumType = checksumType;
}
public ChecksumType ChecksumType { get; } = ChecksumType;
}
18 changes: 6 additions & 12 deletions src/MySqlCdc/Events/GtidEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,18 @@ public interface IGtidState
/// Marks start of a new event group(transaction).
/// <a href="https://mariadb.com/kb/en/gtid_event/">See more</a>
/// </summary>
public class GtidEvent : IBinlogEvent
/// <remarks>
/// Creates a new <see cref="GtidEvent"/>.
/// </remarks>
public record GtidEvent(IGtid Gtid, byte Flags) : IBinlogEvent
{
/// <summary>
/// Gets Global Transaction ID of the event group.
/// </summary>
public IGtid Gtid { get; }
public IGtid Gtid { get; } = Gtid;

/// <summary>
/// Gets flags.
/// </summary>
public byte Flags { get; }

/// <summary>
/// Creates a new <see cref="GtidEvent"/>.
/// </summary>
public GtidEvent(IGtid gtid, byte flags)
{
Gtid = gtid;
Flags = flags;
}
public byte Flags { get; } = Flags;
}
15 changes: 5 additions & 10 deletions src/MySqlCdc/Events/HeartbeatEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,13 @@ namespace MySqlCdc.Events;
/// The event is sent from master to the client for keep alive feature.
/// <a href="https://mariadb.com/kb/en/library/heartbeat_log_event/">See more</a>
/// </summary>
public class HeartbeatEvent : IBinlogEvent
/// <remarks>
/// Creates a new <see cref="HeartbeatEvent"/>.
/// </remarks>
public record HeartbeatEvent(string BinlogFilename) : IBinlogEvent
{
/// <summary>
/// Gets current master binlog filename
/// </summary>
public string BinlogFilename { get; }

/// <summary>
/// Creates a new <see cref="HeartbeatEvent"/>.
/// </summary>
public HeartbeatEvent(string binlogFilename)
{
BinlogFilename = binlogFilename;
}
public string BinlogFilename { get; } = BinlogFilename;
}
18 changes: 6 additions & 12 deletions src/MySqlCdc/Events/IntVarEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@ namespace MySqlCdc.Events;
/// Generated when an auto increment column or LAST_INSERT_ID() function are used.
/// <a href="https://mariadb.com/kb/en/library/intvar_event/">See more</a>
/// </summary>
public class IntVarEvent : IBinlogEvent
/// <remarks>
/// Creates a new <see cref="IntVarEvent"/>.
/// </remarks>
public record IntVarEvent(byte Type, long Value) : IBinlogEvent
{
/// <summary>
/// Gets type.
/// 0x00 - Invalid value.
/// 0x01 - LAST_INSERT_ID.
/// 0x02 - Insert id (auto_increment).
/// </summary>
public byte Type { get; }
public byte Type { get; } = Type;

/// <summary>
/// Gets value.
/// </summary>
public long Value { get; }

/// <summary>
/// Creates a new <see cref="IntVarEvent"/>.
/// </summary>
public IntVarEvent(byte type, long value)
{
Type = type;
Value = value;
}
public long Value { get; } = Value;
}
42 changes: 16 additions & 26 deletions src/MySqlCdc/Events/QueryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,44 @@ namespace MySqlCdc.Events;
/// Represents sql statement in binary log.
/// <a href="https://mariadb.com/kb/en/library/query_event/">See more</a>
/// </summary>
public class QueryEvent : IBinlogEvent
/// <remarks>
/// Creates a new <see cref="QueryEvent"/>.
/// </remarks>
public record QueryEvent(
long ThreadId,
long Duration,
int ErrorCode,
byte[] StatusVariables,
string DatabaseName,
string SqlStatement) : IBinlogEvent
{
/// <summary>
/// Gets id of the thread that issued the statement.
/// </summary>
public long ThreadId { get; }
public long ThreadId { get; } = ThreadId;

/// <summary>
/// Gets the execution time of the statement in seconds.
/// </summary>
public long Duration { get; }
public long Duration { get; } = Duration;

/// <summary>
/// Gets the error code of the executed statement.
/// </summary>
public int ErrorCode { get; }
public int ErrorCode { get; } = ErrorCode;

/// <summary>
/// Gets status variables.
/// </summary>
public byte[] StatusVariables { get; }
public byte[] StatusVariables { get; } = StatusVariables;

/// <summary>
/// Gets the default database name.
/// </summary>
public string DatabaseName { get; }
public string DatabaseName { get; } = DatabaseName;

/// <summary>
/// Gets the SQL statement.
/// </summary>
public string SqlStatement { get; }

/// <summary>
/// Creates a new <see cref="QueryEvent"/>.
/// </summary>
public QueryEvent(
long threadId,
long duration,
int errorCode,
byte[] statusVariables,
string databaseName,
string sqlStatement)
{
ThreadId = threadId;
Duration = duration;
ErrorCode = errorCode;
StatusVariables = statusVariables;
DatabaseName = databaseName;
SqlStatement = sqlStatement;
}
public string SqlStatement { get; } = SqlStatement;
}

0 comments on commit 85a3bec

Please sign in to comment.