Skip to content

Commit

Permalink
Namecoin: Parse domain name JSON
Browse files Browse the repository at this point in the history
Currently only handles IPv4/IPv6/Tor/TXT records.
  • Loading branch information
JeremyRand committed Dec 1, 2019
1 parent d282ba8 commit 8693875
Showing 1 changed file with 175 additions and 0 deletions.
175 changes: 175 additions & 0 deletions electrum_nmc/electrum/names.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,184 @@ def name_expiration_datetime_estimate(name_height, chain_height, chain_unixtime)
chain_datetime = datetime.fromtimestamp(chain_unixtime)
return expiration_blocks, chain_datetime + expiration_timedelta

def get_domain_records(domain, value):
if type(value) == bytes:
try:
value = value.decode("ascii")
except UnicodeDecodeError:
return [], value

if type(value) == str:
if value == "":
value = "{}"

try:
value = json.loads(value)
except json.decoder.JSONDecodeError:
return [], value

if type(value) != dict:
return [], value

records = []

new_records, value = get_domain_records_address(domain, value)
records.extend(new_records)

if "txt" in value:
new_records, value["txt"] = get_domain_records_txt(domain, value["txt"])
records.extend(new_records)
if value["txt"] == []:
del value["txt"]

return records, value

def get_domain_records_address(domain, value):
records = []

if "ip" in value:
new_records, value["ip"] = get_domain_records_address_ip4(domain, value["ip"])
records.extend(new_records)
if value["ip"] == []:
del value["ip"]

if "ip6" in value:
new_records, value["ip6"] = get_domain_records_address_ip6(domain, value["ip6"])
records.extend(new_records)
if value["ip6"] == []:
del value["ip6"]

if "tor" in value:
new_records, value["tor"] = get_domain_records_address_tor(domain, value["tor"])
records.extend(new_records)
if value["tor"] == []:
del value["tor"]

return records, value

def get_domain_records_address_ip4(domain, value):
# Convert string to array (only 1 A record exists)
if type(value) == str:
value = [value]

# Must be array
if type(value) != list:
return [], value

# Parse each array item
records = []
remaining = []
for raw_address in value:
single_record, single_remaining = get_domain_records_address_ip4_single(domain, raw_address)
if single_record is not None:
records.append(single_record)
if single_remaining is not None:
remaining.append(single_remaining)

return records, remaining

def get_domain_records_address_ip4_single(domain, value):
# Must be string
if type(value) != str:
return None, value

return [domain, "address", ["ip4", value]], None

def get_domain_records_address_ip6(domain, value):
# Convert string to array (only 1 AAAA record exists)
if type(value) == str:
value = [value]

# Must be array
if type(value) != list:
return [], value

# Parse each array item
records = []
remaining = []
for raw_address in value:
single_record, single_remaining = get_domain_records_address_ip6_single(domain, raw_address)
if single_record is not None:
records.append(single_record)
if single_remaining is not None:
remaining.append(single_remaining)

return records, remaining

def get_domain_records_address_ip6_single(domain, value):
# Must be string
if type(value) != str:
return None, value

return [domain, "address", ["ip6", value]], None

def get_domain_records_address_tor(domain, value):
# Convert string to array (only 1 Tor record exists)
if type(value) == str:
value = [value]

# Must be array
if type(value) != list:
return [], value

# Parse each array item
records = []
remaining = []
for raw_address in value:
single_record, single_remaining = get_domain_records_address_tor_single(domain, raw_address)
if single_record is not None:
records.append(single_record)
if single_remaining is not None:
remaining.append(single_remaining)

return records, remaining

def get_domain_records_address_tor_single(domain, value):
# Must be string
if type(value) != str:
return None, value

return [domain, "address", ["tor", value]], None

def get_domain_records_txt(domain, value):
# Process Tor specially
if domain.startswith("_tor."):
domain = domain[len("_tor."):]
return get_domain_records_address_tor(domain, value)

# Convert string to array (only 1 TXT record exists)
if type(value) == str:
value = [value]

# Must be array
if type(value) != list:
return [], value

# Parse each array item
records = []
remaining = []
for raw_address in value:
single_record, single_remaining = get_domain_records_txt_single(domain, raw_address)
if single_record is not None:
records.append(single_record)
if single_remaining is not None:
remaining.append(single_remaining)

return records, remaining

def get_domain_records_txt_single(domain, value):
# Must be string
if type(value) != str:
return None, value

# TODO: Handle TXT records that are an array.

return [domain, "txt", value], None


import binascii
from datetime import datetime, timedelta
import json
import os
import re

Expand Down

0 comments on commit 8693875

Please sign in to comment.