In [1]:
import socket, struct, secrets, json
# 필요 패키지 import 

In [3]:
# 기본 데이터 설정하기
DNS_BASE_PORT = 53
TARGET_URL = 'example.com'
ROOT_DATA_PATH = '../root-dns/dns.json'

In [4]:
# 루트 서버 데이터 호출
with open(ROOT_DATA_PATH, 'r') as dns_list:
    root_dns = json.load(dns_list)
dns_list.close()
root_servers = root_dns['Root-servers'][0]['Ipv4']

In [5]:
root_servers

'198.41.0.4'

In [6]:
# 루트 서버 (0)번 요청 패킷 생성
transecID = secrets.randbits(16)
headers_section = {
        # transecID는 2Bytes == 16bit
        'Transaction ID' : transecID,
        'Flags': 0x0000,
        # 모든 값이 루트 질의 NS로 0적용
        'QDCOUNT': 1,
        'ANCOUNT': 0,
        'NSCOUNT': 0,
        'ARCOUNT': 0,
    }
header_pack = struct.pack('!HHHHHH', \
        headers_section['Transaction ID'],
        headers_section['Flags'],
        headers_section['QDCOUNT'],
        headers_section['ANCOUNT'],
        headers_section['NSCOUNT'],
        headers_section['ARCOUNT'])
    
Question_section = {
        'QNAME': TARGET_URL,
        'QTYPE': 1,
        # A  레코드 조회
        'QCLASS': 1
    }
    # Qname 처리
qname_bytes = b''
for domain in str(Question_section['QNAME']).split('.'):
        if not domain:
            continue
        qname_bytes += struct.pack('!B', len(domain))
        qname_bytes += domain.encode('ascii')
qname_bytes += b'\x00'
    
    # QS 섹션 패킹
qs_pack = struct.pack('!HH', 
        Question_section['QTYPE'],
        Question_section['QCLASS'])

packet = (header_pack + qname_bytes + qs_pack), transecID

In [9]:
packet[0]

b'\x1f{\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\x03com\x00\x00\x01\x00\x01'

In [14]:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
bytes_sent = client_socket.sendto(packet[0], (root_servers, 53))
response_data, _ = client_socket.recvfrom(4096)
print(response_data)

b'\x1f{\x82\x00\x00\x01\x00\x00\x00\r\x00\x0b\x07example\x03com\x00\x00\x01\x00\x01\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x14\x01l\x0cgtld-servers\x03net\x00\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01j\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01h\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01d\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01b\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01f\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01k\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01m\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01i\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01g\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01a\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01c\xc0+\xc0\x14\x00\x02\x00\x01\x00\x02\xa3\x00\x00\x04\x01e\xc0+\xc0)\x00\x01\x00\x01\x00\x02\xa3\x00\x00\x04\xc0)\xa2\x1e\xc0)\x00\x1c\x00\x01\x00\x02\xa3\x00\x00\x10 \x01\x05\x00\xd97\x00\x00\x00\x00\x00

In [33]:
# 수신받은 데이터 파싱
current_offset = 0 

# 헤더 파싱
transecID, Flags, QDCOUNT, ANCOUNT, NSCOUNT, ARCOUNT = struct.unpack('!HHHHHH', response_data[0:12])
current_offset += 12
# 헤더 12 바이트 처리 후 오프셋 업데이트

print(transecID, Flags, QDCOUNT, ANCOUNT, NSCOUNT)


8059 33280 1 0 13


In [None]:
dns_parts = []
# 이 코드는 POC이므로 개념 증명 패싱. 이 코드는 재귀적 리졸빙을 순서대로 오차없이 구현이 목표
while True:
    label_length = response_data[current_offset]
    current_offset += 1
    
    if label_length == 0:
        break
    
    # 압축 포인터 무시(POC)
    label = response_data[current_offset:current_offset + label_length].decode('ascii')
    dns_parts.append(label)
    current_offset += label_length
    
qtype, qclass = struct.unpack('!HH', response_data[current_offset:current_offset + 4])
current_offset += 4

print(dns_parts)
print(qtype, qclass)

['example', 'com']
1 1


In [36]:
current_offset
print('current_offset', current_offset)
# 12(headers) + 13(example.com) + 4(QS) 

current_offset 29
