In [7]:
import dpkt
import json
import socket
import argparse
import ipaddress
from typing import NamedTuple,Dict, Union
from abc import ABC

class AbstractConfig(ABC):
	def verify(self, ip_packet: dpkt.ip.IP) -> bool:
		pass

class PacketTracker(NamedTuple):
	size: int
	packets: int
	connections: int


def get_ip_addr(ip_section) -> tuple:
	if 'source-ipv4-network' in ip_section:
		return ip_section['source-ipv4-network'], None
	if 'ietf-acldns:src-dnsname' in ip_section:
		return socket.gethostbyname(ip_section['ietf-acldns:src-dnsname']) + '/32', None
	if 'ietf-acldns:dst-dnsname' in ip_section:
		return None, socket.gethostbyname(ip_section['ietf-acldns:dst-dnsname']) + '/32'
	if 'destination-ipv4-network' in ip_section:
		return None, ip_section['destination-ipv4-network']
	return None, None


def parse_config(config_file):
	configs:Dict[AbstractConfig , PacketTracker] = {}
	with open(config_file) as f:
		config_json = json.loads(f.read())

	for entry in config_json['ietf-access-control-list:access-lists']['acl']:
		for c in entry['aces']['ace']:
			if c['matches'].get('ipv4') is None:
				continue
			src_ip_addr, dst_ip_addr = get_ip_addr(c['matches']['ipv4'])
			c_ip = ConfigIP(c['matches']['ipv4']['protocol'], src_ip_addr, dst_ip_addr)
			if c['matches']['ipv4']['protocol'] == dpkt.ip.IP_PROTO_ICMP:
				configs[ConfigICMP(c['matches']['icmp']['type'], c['matches']['icmp']['code'], c_ip)] = PacketTracker(0,0,0)
			elif c['matches']['ipv4']['protocol'] == dpkt.ip.IP_PROTO_UDP:
				configs[
					ConfigUDP(
						c_ip,
						c['matches']['udp'].get('destination-port', dict()).get('port'),
						c['matches']['udp'].get('source-port', dict()).get('port'),
					)
				] = PacketTracker(0,0,0)
			elif c['matches']['ipv4']['protocol'] == dpkt.ip.IP_PROTO_TCP:
				configs[
					ConfigTCP(
						c_ip,
						c['matches']['tcp'].get('destination-port', dict()).get('port'),
						c['matches']['tcp'].get('source-port', dict()).get('port'),
					)
				] = PacketTracker(0,0,0)

	return configs


def iterate_pcap(filename, configs: Dict[AbstractConfig, PacketTracker]):
	with open(filename, 'rb') as f:
		total_bandwidth = 0
		for timestamp, buffer in dpkt.pcapng.Reader(f):
			eth_packet = dpkt.ethernet.Ethernet(buffer)
			if eth_packet.type != dpkt.ethernet.ETH_TYPE_IP:
				continue
			total_bandwidth += eth_packet.data.len
			for config, data in configs.items():
				if config.verify(eth_packet.data):
					data.packets += 1
					data.size += eth_packet.data.len
	return configs, total_bandwidth

def main(config: str, pcap):
	configs = parse_config(config)
	return iterate_pcap(pcap, configs)

In [12]:
usa_pcap_usa_rule, tota_bw_usa = main('xiaomi_hub_merged_usMud.json','xiaomi-hub_merged_us.pcapng')

In [61]:
usa_pcap_uk_rule, tota_bw_usa = main('xiaomi_hub_merged_ukMud.json','xiaomi-hub_merged_us.pcap')

In [62]:
uk_pcap_usa_rule, tota_bw_uk = main('xiaomi_hub_merged_usMud.json','xiaomi-hub_merged_us.pcap')

In [63]:
uk_pcap_uk_rule, tota_bw_uk = main('xiaomi_hub_merged_ukMud.json','xiaomi-hub_merged_us.pcap')

In [11]:
usa_pcap_usa_rule

{UDP: filtered for dest_port: 67, src_port:None filtered for dest_ip: None, src_ip:None protocol: 17: [28,
  9408],
 UDP: filtered for dest_port: 8053, src_port:None filtered for dest_ip: 161.117.52.228/32, src_ip:None protocol: 17: [0,
  0],
 UDP: filtered for dest_port: None, src_port:54321 filtered for dest_ip: None, src_ip:None protocol: 17: [2045,
  240908],
 UDP: filtered for dest_port: 5353, src_port:None filtered for dest_ip: 224.0.0.251/32, src_ip:None protocol: 17: [770,
  267986],
 UDP: filtered for dest_port: None, src_port:8053 filtered for dest_ip: None, src_ip:161.117.52.228/32 protocol: 17: [0,
  0],
 UDP: filtered for dest_port: 54321, src_port:None filtered for dest_ip: None, src_ip:None protocol: 17: [1816,
  181520],
 UDP: filtered for dest_port: None, src_port:67 filtered for dest_ip: None, src_ip:None protocol: 17: [28,
  9272]}

In [None]:

# def get_args():
# 	parser = argparse.ArgumentParser(description='Pcap parser')
# 	parser.add_argument('-f', '--file', dest='filename', required=True, help='pcap to parse')
# 	parser.add_argument('-c', '--config', dest='config', required=True, help='config file')
#
# 	return parser.parse_args()