Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Print all output as raw bytes
Printing all output as raw bytes allows MaxScale to control the formatting
process. This also removes the need to convert the bytes to Python strings
and the need to parse the JSON.
  • Loading branch information
markus456 committed Mar 20, 2017
1 parent 7c20701 commit 7d7d8a0
Showing 1 changed file with 14 additions and 37 deletions.
51 changes: 14 additions & 37 deletions server/modules/protocol/examples/cdc.py
Expand Up @@ -12,52 +12,32 @@
# Public License.

import time
import json
import re
import sys
import socket
import hashlib
import argparse
import subprocess
import selectors
import binascii
import os

# Read data as JSON
def read_json():
decoder = json.JSONDecoder()
rbuf = bytes()
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
def read_data():
sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ)

while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
events = sel.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
buf = sock.recv(4096, socket.MSG_DONTWAIT)
rbuf += buf
while True:
rbuf = rbuf.lstrip()
data = decoder.raw_decode(rbuf.decode('ascii'))
rbuf = rbuf[data[1]:]
print(json.dumps(data[0]))
except ValueError as err:
sys.stdout.flush()
pass
except Exception:
break

# Read data as Avro
def read_avro():
ep = selectors.EpollSelector()
ep.register(sock, selectors.EVENT_READ)
if len(buf) > 0:
os.write(sys.stdout.fileno(), buf)
sys.stdout.flush()
else:
raise Exception('Socket was closed')

while True:
pollrc = ep.select(timeout=int(opts.read_timeout) if int(opts.read_timeout) > 0 else None)
try:
buf = sock.recv(4096, socket.MSG_DONTWAIT)
os.write(sys.stdout.fileno(), buf)
sys.stdout.flush()
except Exception:
except BlockingIOError:
break
except Exception as ex:
print(ex, file=sys.stderr)
break

parser = argparse.ArgumentParser(description = "CDC Binary consumer", conflict_handler="resolve")
Expand Down Expand Up @@ -91,7 +71,4 @@ def read_avro():
# Request a data stream
sock.send(bytes(("REQUEST-DATA " + opts.FILE + (" " + opts.GTID if opts.GTID else "")).encode()))

if opts.format == "JSON":
read_json()
elif opts.format == "AVRO":
read_avro()
read_data()

0 comments on commit 7d7d8a0

Please sign in to comment.