Permalink
Fetching contributors…
Cannot retrieve contributors at this time
executable file 470 lines (403 sloc) 16.7 KB
#!/usr/bin/env python3
# Transaction Serialization Sample Code (Python3 version)
# Author: rome@ripple.com
# Copyright Ripple 2018
# Requires Python 3.5+ because of bytes.hex()
import argparse
import json
import logging
import re
import sys
from address import decode_address
from xrpl_num import IssuedAmount
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
def load_defs(fname="definitions.json"):
"""
Loads JSON from the definitions file and converts it to a preferred format.
(The definitions file should be drop-in compatible with the one from the
ripple-binary-codec JavaScript package.)
"""
with open(fname) as definitions_file:
definitions = json.load(definitions_file)
return {
"TYPES": definitions["TYPES"],
# type_name str: type_sort_key int
"FIELDS": {k:v for (k,v) in definitions["FIELDS"]}, # convert list of tuples to dict
# field_name str: {
# nth: field_sort_key int,
# isVLEncoded: bool,
# isSerialized: bool,
# isSigningField: bool,
# type: type_name str
# }
"LEDGER_ENTRY_TYPES": definitions["LEDGER_ENTRY_TYPES"],
"TRANSACTION_RESULTS": definitions["TRANSACTION_RESULTS"],
"TRANSACTION_TYPES": definitions["TRANSACTION_TYPES"],
}
def field_sort_key(field_name):
"""Return a tuple sort key for a given field name"""
field_type_name = DEFINITIONS["FIELDS"][field_name]["type"]
return (DEFINITIONS["TYPES"][field_type_name], DEFINITIONS["FIELDS"][field_name]["nth"])
def field_id(field_name):
"""
Returns the unique field ID for a given field name.
This field ID consists of the type code and field code, in 1 to 3 bytes
depending on whether those values are "common" (<16) or "uncommon" (>=16)
"""
field_type_name = DEFINITIONS["FIELDS"][field_name]["type"]
type_code = DEFINITIONS["TYPES"][field_type_name]
field_code = DEFINITIONS["FIELDS"][field_name]["nth"]
# Codes must be nonzero and fit in 1 byte
assert 0 < field_code <= 255
assert 0 < type_code <= 255
if type_code < 16 and field_code < 16:
# high 4 bits is the type_code
# low 4 bits is the field code
combined_code = (type_code << 4) | field_code
return uint8_to_bytes(combined_code)
elif type_code >= 16 and field_code < 16:
# first 4 bits are zeroes
# next 4 bits is field code
# next byte is type code
byte1 = uint8_to_bytes(field_code)
byte2 = uint8_to_bytes(type_code)
return b''.join( (byte1, byte2) )
elif type_code < 16 and field_code >= 16:
# first 4 bits is type code
# next 4 bits are zeroes
# next byte is field code
byte1 = uint8_to_bytes(type_code << 4)
byte2 = uint8_to_bytes(field_code)
return b''.join( (byte1, byte2) )
else: # both are >= 16
# first byte is all zeroes
# second byte is type
# third byte is field code
byte2 = uint8_to_bytes(type_code)
byte3 = uint8_to_bytes(field_code)
return b''.join( (bytes(1), byte2, byte3) )
def vl_encode(vl_contents):
"""
Helper function for length-prefixed fields including Blob types
and some AccountID types.
Encodes arbitrary binary data with a length prefix. The length of the prefix
is 1-3 bytes depending on the length of the contents:
Content length <= 192 bytes: prefix is 1 byte
192 bytes < Content length <= 12480 bytes: prefix is 2 bytes
12480 bytes < Content length <= 918744 bytes: prefix is 3 bytes
"""
vl_len = len(vl_contents)
if vl_len <= 192:
len_byte = vl_len.to_bytes(1, byteorder="big", signed=False)
return b''.join( (len_byte, vl_contents) )
elif vl_len <= 12480:
vl_len -= 193
byte1 = ((vl_len >> 8) + 193).to_bytes(1, byteorder="big", signed=False)
byte2 = (vl_len & 0xff).to_bytes(1, byteorder="big", signed=False)
return b''.join( (byte1, byte2, vl_contents) )
elif vl_len <= 918744:
vl_len -= 12481
byte1 = (241 + (vl_len >> 16)).to_bytes(1, byteorder="big", signed=False)
byte2 = ((vl_len >> 8) & 0xff).to_bytes(1, byteorder="big", signed=False)
byte3 = (vl_len & 0xff).to_bytes(1, byteorder="big", signed=False)
return b''.join( (byte1, byte2, byte3, vl_contents) )
raise ValueError("VariableLength field must be <= 918744 bytes long")
# Individual field type serialization routines ---------------------------------
def accountid_to_bytes(address):
"""
Serialize an AccountID field type. These are length-prefixed.
Some fields contain nested non-length-prefixed AccountIDs directly; those
call decode_address() instead of this function.
"""
return vl_encode(decode_address(address))
def amount_to_bytes(a):
"""
Serializes an "Amount" type, which can be either XRP or an issued currency:
- XRP: 64 bits; 0, followed by 1 ("is positive"), followed by 62 bit UInt amount
- Issued Currency: 64 bits of amount, followed by 160 bit currency code and
160 bit issuer AccountID.
"""
if type(a) == str:
# is XRP
xrp_amt = int(a)
if (xrp_amt >= 0):
assert xrp_amt <= 10**17
# set the "is positive" bit -- this is backwards from usual two's complement!
xrp_amt = xrp_amt | 0x4000000000000000
else:
assert xrp_amt >= -(10**17)
# convert to absolute value, leaving the "is positive" bit unset
xrp_amt = -xrp_amt
return xrp_amt.to_bytes(8, byteorder="big", signed=False)
elif type(a) == dict:
if sorted(a.keys()) != ["currency", "issuer", "value"]:
raise ValueError("amount must have currency, value, issuer only (actually had: %s)" %
sorted(a.keys()))
issued_amt = IssuedAmount(a["value"]).to_bytes()
logger.debug("Issued amount: %s"%issued_amt.hex())
currency_code = currency_code_to_bytes(a["currency"])
return issued_amt + currency_code + decode_address(a["issuer"])
else:
raise ValueError("amount must be XRP string or {currency, value, issuer}")
def array_to_bytes(array):
"""
Serialize an array of objects from decoded JSON.
Each member object must have a type wrapper and an inner object.
For example:
[
{
// wrapper object
"Memo": {
// inner object
"MemoType": "687474703a2f2f6578616d706c652e636f6d2f6d656d6f2f67656e65726963",
"MemoData": "72656e74"
}
}
]
"""
members_as_bytes = []
for el in array:
wrapper_key = list(el.keys())[0]
inner_obj = el[wrapper_key]
members_as_bytes.append(field_to_bytes(field_name=wrapper_key, field_val=el))
members_as_bytes.append(field_id("ArrayEndMarker"))
return b''.join(members_as_bytes)
def blob_to_bytes(field_val):
"""
Serializes a string of hex as binary data with a length prefix.
"""
vl_contents = bytes.fromhex(field_val)
return vl_encode(vl_contents)
def currency_code_to_bytes(code_string, xrp_ok=False):
if re.match(r"^[A-Za-z0-9?!@#$%^&*<>(){}\[\]|]{3}$", code_string):
# ISO 4217-like code
if code_string == "XRP":
if xrp_ok:
# Rare, but when the currency code "XRP" is serialized, it's
# a special-case all zeroes.
logger.debug("Currency code(XRP): "+("0"*40))
return bytes(20)
raise ValueError("issued currency can't be XRP")
code_ascii = code_string.encode("ASCII")
logger.debug("Currency code ASCII: %s"%code_ascii.hex())
# standard currency codes: https://developers.ripple.com/currency-formats.html#standard-currency-codes
# 8 bits type code (0x00)
# 88 bits reserved (0's)
# 24 bits ASCII
# 16 bits version (0x00)
# 24 bits reserved (0's)
return b''.join( ( bytes(12), code_ascii, bytes(5) ) )
elif re.match(r"^[0-9a-fA-F]{40}$", code_string):
# raw hex code
return bytes.fromhex(code_string) # requires Python 3.5+
else:
raise ValueError("invalid currency code")
def hash128_to_bytes(contents):
"""
Serializes a hexadecimal string as binary and confirms that it's 128 bits
"""
b = hash_to_bytes(contents)
if len(b) != 16: # 16 bytes = 128 bits
raise ValueError("Hash128 is not 128 bits long")
return b
def hash160_to_bytes(contents):
b = hash_to_bytes(contents)
if len(b) != 20: # 20 bytes = 160 bits
raise ValueError("Hash160 is not 160 bits long")
return b
def hash256_to_bytes(contents):
b = hash_to_bytes(contents)
if len(b) != 32: # 32 bytes = 256 bits
raise ValueError("Hash256 is not 256 bits long")
return b
def hash_to_bytes(contents):
"""
Helper function; serializes a hash value from a hexadecimal string
of any length.
"""
return bytes.fromhex(field_val)
def object_to_bytes(obj):
"""
Serialize an object from decoded JSON.
Each object must have a type wrapper and an inner object. For example:
{
// type wrapper
"SignerEntry": {
// inner object
"Account": "rUpy3eEg8rqjqfUoLeBnZkscbKbFsKXC3v",
"SignerWeight": 1
}
}
Puts the child fields (e.g. Account, SignerWeight) in canonical order
and appends an object end marker.
"""
wrapper_key = list(obj.keys())[0]
inner_obj = obj[wrapper_key]
child_order = sorted(inner_obj.keys(), key=field_sort_key)
fields_as_bytes = []
for field_name in child_order:
if (DEFINITIONS["FIELDS"][field_name]["isSerialized"]):
field_val = inner_obj[field_name]
field_bytes = field_to_bytes(field_name, field_val)
logger.debug("{n}: {h}".format(n=field_name, h=field_bytes.hex()))
fields_as_bytes.append(field_bytes)
fields_as_bytes.append(field_id("ObjectEndMarker"))
return b''.join(fields_as_bytes)
def pathset_to_bytes(pathset):
"""
Serialize a PathSet, which is an array of arrays,
where each inner array represents one possible payment path.
A path consists of "path step" objects in sequence, each with one or
more of "account", "currency", and "issuer" fields, plus (ignored) "type"
and "type_hex" fields which indicate which fields are present.
(We re-create the type field for serialization based on which of the core
3 fields are present.)
"""
if not len(pathset):
raise ValueError("PathSet type must not be empty")
paths_as_bytes = []
for n in range(len(pathset)):
path = path_as_bytes(pathset[n])
logger.debug("Path %d: %s"%(n, path.hex()))
paths_as_bytes.append(path)
if n + 1 == len(pathset): # last path; add an end byte
paths_as_bytes.append(bytes.fromhex("00"))
else: # add a path separator byte
paths_as_bytes.append(bytes.fromhex("ff"))
return b''.join(paths_as_bytes)
def path_as_bytes(path):
"""
Helper function for representing one member of a pathset as a bytes object
"""
if not len(path):
raise ValueError("Path must not be empty")
path_contents = []
for step in path:
step_data = []
type_byte = 0
if "account" in step.keys():
type_byte |= 0x01
step_data.append(decode_address(step["account"]))
if "currency" in step.keys():
type_byte |= 0x10
step_data.append(currency_code_to_bytes(step["currency"], xrp_ok=True))
if "issuer" in step.keys():
type_byte |= 0x20
step_data.append(decode_address(step["issuer"]))
step_data = [uint8_to_bytes(type_byte)] + step_data
path_contents += step_data
return b''.join(path_contents)
def tx_type_to_bytes(txtype):
"""
TransactionType field is a special case that is written in JSON
as a string name but in binary as a UInt16.
"""
type_uint = DEFINITIONS["TRANSACTION_TYPES"][txtype]
return uint16_to_bytes(type_uint)
def uint8_to_bytes(i):
return i.to_bytes(1, byteorder="big", signed=False)
def uint16_to_bytes(i):
return i.to_bytes(2, byteorder="big", signed=False)
def uint32_to_bytes(i):
return i.to_bytes(4, byteorder="big", signed=False)
# Core serialization logic -----------------------------------------------------
def field_to_bytes(field_name, field_val):
"""
Returns a bytes object containing the serialized version of a field
including its field ID prefix.
"""
field_type = DEFINITIONS["FIELDS"][field_name]["type"]
logger.debug("Serializing field {f} of type {t}".format(f=field_name, t=field_type))
id_prefix = field_id(field_name)
logger.debug("id_prefix is: %s" % id_prefix.hex())
if field_name == "TransactionType":
# Special case: convert from string to UInt16
return b''.join( (id_prefix, tx_type_to_bytes(field_val)) )
dispatch = {
# TypeName: function(field): bytes object
"AccountID": accountid_to_bytes,
"Amount": amount_to_bytes,
"Blob": blob_to_bytes,
"Hash128": hash128_to_bytes,
"Hash160": hash160_to_bytes,
"Hash256": hash256_to_bytes,
"PathSet": pathset_to_bytes,
"STArray": array_to_bytes,
"STObject": object_to_bytes,
"UInt8" : uint8_to_bytes,
"UInt16": uint16_to_bytes,
"UInt32": uint32_to_bytes,
}
field_binary = dispatch[field_type](field_val)
return b''.join( (id_prefix, field_binary) )
def serialize_tx(tx, for_signing=False):
"""
Takes a transaction as decoded JSON and returns a bytes object representing
the transaction in binary format.
The input format should omit transaction metadata and the transaction
should be formatted with the transaction instructions at the top level.
("hash" can be included, but will be ignored)
If for_signing=True, then only signing fields are serialized, so you can use
the output to sign the transaction.
SigningPubKey and TxnSignature are optional, but the transaction can't
be submitted without them.
For example:
{
"TransactionType" : "Payment",
"Account" : "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh",
"Destination" : "ra5nK24KXen9AHvsdFTKHSANinZseWnPcX",
"Amount" : {
"currency" : "USD",
"value" : "1",
"issuer" : "rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn"
},
"Fee": "12",
"Flags": 2147483648,
"Sequence": 2
}
"""
field_order = sorted(tx.keys(), key=field_sort_key)
logger.debug("Canonical field order: %s" % field_order)
fields_as_bytes = []
for field_name in field_order:
if (DEFINITIONS["FIELDS"][field_name]["isSerialized"]):
if for_signing and not DEFINITIONS["FIELDS"][field_name]["isSigningField"]:
# Skip non-signing fields in for_signing mode.
continue
field_val = tx[field_name]
field_bytes = field_to_bytes(field_name, field_val)
logger.debug("{n}: {h}".format(n=field_name, h=field_bytes.hex()))
fields_as_bytes.append(field_bytes)
all_serial = b''.join(fields_as_bytes)
logger.debug(all_serial.hex().upper())
return all_serial
# Startup stuff ----------------------------------------------------------------
logger.setLevel(logging.WARNING)
DEFINITIONS = load_defs()
# Commandline utility ----------------------------------------------------------
# parses JSON from a file or commandline argument and prints the serialized
# form of the transaction as hex
if __name__ == "__main__":
p = argparse.ArgumentParser()
txsource = p.add_mutually_exclusive_group()
txsource.add_argument("-f", "--filename", default="test-cases/tx1.json",
help="Read input transaction from a JSON file. (Uses test-cases/tx1.json by default)")
txsource.add_argument("-j", "--json",
help="Read input transaction JSON from the command line")
txsource.add_argument("--stdin", action="store_true", default=False,
help="Read input transaction JSON from standard input (stdin)")
p.add_argument("-v", "--verbose", action="store_true", default=False,
help="Display debug messages (such as individual field serializations)")
args = p.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
# Determine source of JSON transaction:
if args.json:
example_tx = json.loads(args.json)
elif args.stdin:
example_tx = json.load(sys.stdin)
else:
with open(args.filename) as f:
example_tx = json.load(f)
print(serialize_tx(example_tx).hex().upper())