In [4]:
import pandas as pd
import numpy as np
import os
import functools

def extract_mud_info(fileDir):
    features = ['action','direction','type','protocol',
                'source_IP','destination_IP','source_port','destination_port',
                'source_lower_port', 'source_upper_port', 'destination_lower_port', 'destination_upper_port',
                'icmp_Type','icmp_Code']
    rules = pd.DataFrame(columns = features)
    
    for file in os.listdir(fileDir):
        try:
            MUDfile = pd.read_json(fileDir+file,  orient='index')
        except Exception as e:
            continue
#         device_name = MUDfile['systeminfo']
        ACL_FROM=MUDfile['from-device-policy'][0]['access-lists']['access-list']
        ACL_TO = MUDfile['to-device-policy'][0]['access-lists']['access-list']
        From_list = [f['name'] for f in ACL_FROM]
        To_list = [t['name'] for t in ACL_TO]
        ACL = MUDfile['acl'][1]
        
        for acl in ACL:
            direction = 'from' if acl['name'] in From_list else 'to'
            
            ace_list = acl['aces']['ace']
            if acl['type'] == 'ipv4-acl-type':
                aceType = 'ipv4'
            elif acl['type'] == 'ipv6-acl-type':
                aceType = 'ipv6'
            else:
                continue
            
            for ace in ace_list:
                aceProtocol = None
                aceSrcIP = None
                aceDstIP = None
                aceSrcPort = None
                aceDstPort = None
                icmpType = None
                icmpCode = None
                aceSrcLPort = None
                aceSrcUPort = None
                aceDstLPort = None
                aceDstUPort = None
                
                aceName = ace['name']
                aceAction = ace['actions']['forwarding']
                aceMatch = ace['matches']
                
                if aceMatch.get(aceType):
                    aceProtocol = aceMatch[aceType].get('protocol')
                    aceSrcIP = aceMatch[aceType].get('source-'+aceType+'-network') or aceMatch[aceType].get('ietf-acldns:src-dnsname')
                    aceDstIP = aceMatch[aceType].get('destination-'+aceType+'-network') or aceMatch[aceType].get('ietf-acldns:dst-dnsname')
                    
                    if aceMatch.get('ietf-mud:mud'):
                        matchController = aceMatch['ietf-mud:mud'].get('controller')
                        matchManufacter = aceMatch['ietf-mud:mud'].get('manufacturer')
                        if direction == 'to' and aceSrcIP == None: aceSrcIP = matchController or matchManufacter
                        if direction == 'from' and aceDstIP == None: aceDstIP = matchController or matchManufacter
                    
                    
                    if aceProtocol == 6 and aceMatch.get('tcp'):
                        if aceMatch['tcp'].get('source-port'):
                            aceSrcPort = aceMatch['tcp']['source-port'].get('port')
                            aceSrcLPort = aceMatch['tcp']['source-port'].get('lower-port')
                            aceSrcUPort = aceMatch['tcp']['source-port'].get('upper-port')
            
                        if aceMatch['tcp'].get('destination-port'):
                            aceDstPort = aceMatch['tcp']['destination-port'].get('port')
                            aceDstLPort = aceMatch['tcp']['destination-port'].get('lower-port')
                            aceDstUPort = aceMatch['tcp']['destination-port'].get('upper-port')
                            
                    elif aceProtocol == 17 and aceMatch.get('udp'):
                        if aceMatch['udp'].get('source-port'):
                            aceSrcPort = aceMatch['udp']['source-port'].get('port')
                            aceSrcLPort = aceMatch['udp']['source-port'].get('lower-port')
                            aceSrcUPort = aceMatch['udp']['source-port'].get('upper-port')

                        if aceMatch['udp'].get('destination-port'):
                            aceDstPort = aceMatch['udp']['destination-port'].get('port')
                            aceDstLPort = aceMatch['udp']['destination-port'].get('lower-port')
                            aceDstUPort = aceMatch['udp']['destination-port'].get('upper-port')
                            
                    elif aceProtocol == 1 and aceMatch.get('icmp'):
                        icmpType = aceMatch['icmp']['type']
                        icmpCode = aceMatch['icmp']['code']
                    
                    
                    thisACE = [aceAction,direction,aceType,aceProtocol,
                               aceSrcIP,aceDstIP,aceSrcPort,aceDstPort,
                               aceSrcLPort,aceSrcUPort,aceDstLPort,aceDstUPort,
                               icmpType,icmpCode]
                    rules.loc[len(rules)] = thisACE
    
    rules = rules.where(pd.notnull(rules),None)
    rules = rules.drop_duplicates()
    
    print('MUD file information done!')
    return rules
                

In [5]:
fileDir = 'C:/Users/sonya/Desktop/MasterThesisProject/test/'
Therules = extract_mud_info(fileDir)
Therules

MUD file information done!


Unnamed: 0,action,direction,type,protocol,source_IP,destination_IP,source_port,destination_port,source_lower_port,source_upper_port,destination_lower_port,destination_upper_port,icmp_Type,icmp_Code
0,accept,from,ipv4,17,,ru.pool.ntp.org,,123.0,,,,,,
1,accept,from,ipv4,17,,urn:ietf:params:mud:gateway,,67.0,,,,,,
2,accept,from,ipv4,6,,devs.tplinkcloud.com,,50443.0,,,,,,
3,accept,from,ipv4,17,,de.pool.ntp.org,,123.0,,,,,,
4,accept,from,ipv4,17,,s1b.time.edu.cn,,123.0,,,,,,
5,accept,from,ipv4,17,,255.255.255.255/32,,67.0,,,,,,
6,accept,from,ipv4,17,,time-b.nist.gov,,123.0,,,,,,
7,accept,from,ipv4,17,,0.cn.pool.ntp.org,,123.0,,,,,,
8,accept,from,ipv4,17,,,9999.0,,,,,,,
9,accept,from,ipv4,17,,1.asia.pool.ntp.org,,123.0,,,,,,


In [6]:
Therules.to_csv('C:/Users/sonya/Desktop/MasterThesisProject/MUDruls.csv')

In [11]:
import re

def findPolicyNumber(policies,policyCode):
    pattern = r'POLICY ' + re.escape(policyCode) + r'(\d+)'
    regex = re.compile(pattern,re.IGNORECASE)
    match = regex.findall(policies)
    if match:
        return int(match[-1])
    else:
        return 0
    
def PolicyGenerator(policyCode,policyFile,rules):
    
    with open(policyFile,'r+') as policy:
        if policyCode == 'M':
            MUDFeatures = rules.columns
            policies = policy.read()
            policyNumber = findPolicyNumber(policies,policyCode)
            
            for index, rule in rules.iterrows():
                policyNumber += 1
                newPolicy = '\n\nPOLICY M' + str(policyNumber) + ':\n'
                for index, feature in enumerate(MUDFeatures):
                    if rule[feature]:
                        if feature.endswith('_lower_port') and rule[index+1]:
                            newPolicy += ' AND\n'
                            newPolicy += '\t\t' + re.match('(.*?)_',feature).group()+ 'port = [' + str(rule[feature]) + ', ' + str(rule[index+1]) + ']'
                        elif feature.endswith('_upper_port'):
                            continue
                        else:
                            target = feature.lower() + ' = ' + str(rule[feature])
                            if feature == 'action':
                                newPolicy += rule[feature].upper() + '\t'
                            elif feature == 'direction':
                                newPolicy += target
                            else:
                                newPolicy += ' AND\n'
                                newPolicy += '\t\t' + target
    
                policy.write(newPolicy)
        
        if policyCode == 'P':
            policies = policy.read()
            policyNumber = findPolicyNumber(policies,policyCode)
            for rule in rules:
                policyNumber += 1
                newPolicy = '\n\nPOLICY P' + str(policyNumber) + ':\n'
                newPolicy += rule['permission'].upper() + '\t'
                rule_action = rule['action']
                for feature in rule_action:
                    if len(rule_action[feature]) > 1:
                        for action in rule_action[feature]:
                            target = feature.lower() + ' = ' + str(action)
                            if list(rule_action[feature]).index(action) > 0:
                                newPolicy += ' OR\n'
                                newPolicy += '\t\t' + target
                            else:
                                newPolicy += target
                    else:
                        target = feature.lower() + ' = ' + str(rule_action[feature][0])
                        if list(rule_action).index(feature) > 0:
                            newPolicy += ' AND\n'
                            newPolicy += '\t\t' + target
                        else:
                            newPolicy += target

                rule_condition = rule['condition']
                newPolicy += '\n' + rule['condition_key'].upper() + '\t'
                for feature in rule_condition:
                    if len(rule_condition[feature]) > 1:
                        for condition in rule_condition[feature]:
                            target = feature.lower() + ' = ' + str(condition)
                            if list(rule_condition[feature]).index(condition) > 0:
                                newPolicy += ' OR\n'
                                newPolicy += '\t\t' + target
                            else:
                                newPolicy += target
                    else:
                        target = feature.lower() + ' = ' + str(rule_condition[feature][0])
                        if list(rule_condition).index(feature) > 0:
                            newPolicy += ' AND\n'
                            newPolicy += '\t\t' + target
                        else:
                            newPolicy += target
                policy.write(newPolicy)

In [12]:
policyFile = 'C:/Users/sonya/Desktop/MasterThesisProject/mypolicytest.pat'
PolicyGenerator('M',policyFile,Therules)

In [None]:
import csv
import json
 
 
# Function to convert a CSV to JSON
# Takes the file paths as arguments
def make_json(csvFilePath, jsonFilePath):
     
    # create a dictionary
    data = {}
     
    # Open a csv reader called DictReader
    with open(csvFilePath, encoding='utf-8') as csvf:
        csvReader = csv.DictReader(csvf)
         
        # Convert each row into a dictionary
        # and add it to data
        for rows in csvReader:
             
            # Assuming a column named 'No' to
            # be the primary key
            key = rows['No']
            data[key] = rows
 
    # Open a json writer, and use the json.dumps()
    # function to dump data
    with open(jsonFilePath, 'w', encoding='utf-8') as jsonf:
        jsonf.write(json.dumps(data, indent=4))
         
# Driver Code
 
# Decide the two file paths according to your
# computer system
csvFilePath = r'safty_conditions.csv'
jsonFilePath = r'Names.json'
 
# Call the make_json function
make_json(csvFilePath, jsonFilePath)