-
Notifications
You must be signed in to change notification settings - Fork 2
/
mw_endpoint.py
214 lines (194 loc) · 7.88 KB
/
mw_endpoint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# pylint: disable=unbalanced-tuple-unpacking
import logging
import json
import base64
from urllib.parse import urlparse
from flask import Blueprint, render_template, request, jsonify, flash, url_for
from flask_jsonrpc.exceptions import OtherError
from werkzeug.utils import redirect
from app_core import app, db, limiter
from models import WavesTx, WavesTxSig
import utils
import tx_utils
from web_utils import bad_request, get_json_params
logger = logging.getLogger(__name__)
mw = Blueprint('mw', __name__, template_folder='templates')
limiter.limit("100 per hour")(mw)
# wave specific config settings
NODE_BASE_URL = app.config["NODE_BASE_URL"]
SEED = app.config["WALLET_SEED"]
ADDRESS = app.config["WALLET_ADDRESS"]
ASSET_ID = app.config["ASSET_ID"]
ASSET_NAME = app.config["ASSET_NAME"]
TESTNET = app.config["TESTNET"]
TX_SIGNERS = app.config["TX_SIGNERS"]
ASSET_MASTER_PUBKEY = app.config["ASSET_MASTER_PUBKEY"]
#
# Jinja2 filters
#
@app.context_processor
def inject_config_qrcode_svg():
url_parts = urlparse(request.url)
url = url_parts._replace(scheme="premiomwlink", path="/config").geturl()
qrcode_svg = utils.qrcode_svg_create(url, box_size=6)
return dict(mw_config_url=url, mw_config_qrcode_svg=qrcode_svg)
#
# Flask views
#
def _tx_broadcast(txid):
dbtx = WavesTx.from_txid(db.session, txid)
if not dbtx:
return bad_request('tx not found', 404)
tx = dbtx.tx_with_sigs()
error = ""
# broadcast transaction
try:
dbtx = tx_utils.broadcast_transaction(db.session, dbtx.txid)
db.session.add(dbtx)
db.session.commit()
except OtherError as ex:
error = ex.message
if hasattr(ex, 'data'):
error = f'{ex.message} - {ex.data}'
return error, dbtx, tx
@app.route("/config")
def config():
return jsonify(dict(asset_id=ASSET_ID, asset_name=ASSET_NAME, testnet=TESTNET, tx_signers=TX_SIGNERS, tx_types=tx_utils.TYPES))
@app.route("/tx_link/<txid>")
def tx_link(txid):
url_parts = urlparse(request.url)
url = url_parts._replace(scheme="premiomwlink", path="/txid/" + txid).geturl()
qrcode_svg = utils.qrcode_svg_create(url)
return render_template("mw/tx_link.html", qrcode_svg=qrcode_svg, url=url, txid=txid)
@app.route("/tx_view/<txid>")
def tx_view(txid):
dbtx = WavesTx.from_txid(db.session, txid)
if not dbtx:
flash('tx not found', 'danger')
return redirect(url_for('tx_link', txid=txid, _external=True))
tx = dbtx.tx_with_sigs()
return render_template("mw/tx_show.html", tx=json.dumps(tx))
@app.route("/tx_broadcast_web/<txid>")
def tx_broadcast_web(txid):
error, dbtx, tx = _tx_broadcast(txid) # pylint: disable=unused-variable
if error:
flash(error, 'danger')
return redirect(url_for('tx_link', txid=txid, _external=True))
@app.route("/tx_create", methods=["POST"])
def tx_create():
tx_utils.tx_init_chain_id(TESTNET)
content = request.get_json(force=True)
if content is None:
return bad_request("failed to decode JSON object")
params, err_response = get_json_params(content, ["type", "timestamp"])
if err_response:
return err_response
type_, timestamp = params
if not type_ in tx_utils.TYPES:
return bad_request("'type' not valid")
pubkey = ASSET_MASTER_PUBKEY
address = tx_utils.generate_address(pubkey)
amount = 0
if type_ == "transfer":
fee = tx_utils.get_fee(NODE_BASE_URL, tx_utils.DEFAULT_TX_FEE, address, None)
params, err_response = get_json_params(content, ["recipient", "amount"])
if err_response:
return err_response
recipient, amount = params
tx = tx_utils.transfer_asset_payload(address, pubkey, None, recipient, ASSET_ID, amount, "", None, fee, timestamp)
elif type_ == "issue":
fee = tx_utils.get_fee(NODE_BASE_URL, tx_utils.DEFAULT_ASSET_FEE, address, None)
params, err_response = get_json_params(content, ["asset_name", "asset_description", "amount"])
if err_response:
return err_response
asset_name, asset_description, amount = params
tx = tx_utils.issue_asset_payload(address, pubkey, None, asset_name, asset_description, amount, None, 2, True, fee, timestamp)
elif type_ == "reissue":
fee = tx_utils.get_fee(NODE_BASE_URL, tx_utils.DEFAULT_ASSET_FEE, address, None)
params, err_response = get_json_params(content, ["amount"])
if err_response:
return err_response
amount, = params
tx = tx_utils.reissue_asset_payload(address, pubkey, None, ASSET_ID, amount, True, fee, timestamp)
elif type_ == "sponsor":
fee = tx_utils.get_fee(NODE_BASE_URL, tx_utils.DEFAULT_SPONSOR_FEE, address, None)
params, err_response = get_json_params(content, ["asset_fee"])
if err_response:
return err_response
asset_fee, = params
amount = asset_fee
tx = tx_utils.sponsor_payload(address, pubkey, None, ASSET_ID, asset_fee, fee, timestamp)
elif type_ == "setscript":
fee = tx_utils.get_fee(NODE_BASE_URL, tx_utils.DEFAULT_SCRIPT_FEE, address, None)
params, err_response = get_json_params(content, ["script"])
if err_response:
return err_response
script, = params
tx = tx_utils.set_script_payload(address, pubkey, None, script, fee, timestamp)
else:
return bad_request("invalid type")
txid = tx_utils.tx_to_txid(tx)
dbtx = WavesTx.from_txid(db.session, txid)
if dbtx:
return bad_request("txid already exists")
dbtx = WavesTx(txid, type_, tx_utils.CTX_CREATED, amount, False, json.dumps(tx))
db.session.add(dbtx)
db.session.commit()
return jsonify(dict(txid=txid, state=tx_utils.CTX_CREATED, tx=tx))
@app.route("/tx_status", methods=["POST"])
def tx_status():
content = request.get_json(force=True)
if content is None:
return bad_request("failed to decode JSON object")
params, err_response = get_json_params(content, ["txid"])
if err_response:
return err_response
txid, = params
dbtx = WavesTx.from_txid(db.session, txid)
if not dbtx:
return bad_request('tx not found', 404)
tx = dbtx.tx_with_sigs()
return jsonify(dict(txid=txid, state=dbtx.state, tx=tx))
@app.route("/tx_serialize", methods=["POST"])
def tx_serialize():
content = request.get_json(force=True)
if content is None:
return bad_request("failed to decode JSON object")
params, err_response = get_json_params(content, ["tx"])
if err_response:
return err_response
tx, = params
if not "type" in tx:
return bad_request("tx does not contain 'type' field")
tx_serialized = tx_utils.tx_serialize(tx)
res = {"bytes": base64.b64encode(tx_serialized).decode("utf-8", "ignore")}
return jsonify(res)
@app.route("/tx_signature", methods=["POST"])
def tx_signature():
content = request.get_json(force=True)
if content is None:
return bad_request("failed to decode JSON object")
params, err_response = get_json_params(content, ["txid", "signer_index", "signature"])
if err_response:
return err_response
txid, signer_index, signature = params
dbtx = WavesTx.from_txid(db.session, txid)
if not dbtx:
return bad_request('tx not found', 404)
logger.info(":: adding sig to tx - %s, %d, %s", txid, signer_index, signature)
sig = WavesTxSig(dbtx, signer_index, signature)
db.session.add(sig)
db.session.commit()
tx = dbtx.tx_with_sigs()
return jsonify(dict(txid=txid, state=dbtx.state, tx=tx))
@app.route("/tx_broadcast", methods=["POST"])
def tx_broadcast():
content = request.get_json(force=True)
if content is None:
return bad_request("failed to decode JSON object")
params, err_response = get_json_params(content, ["txid"])
if err_response:
return err_response
txid, = params
error, dbtx, tx = _tx_broadcast(txid)
return jsonify(dict(txid=txid, state=dbtx.state, tx=tx, error=error))