-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
229 lines (189 loc) · 6.92 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#################################################################################
# Hydra TestManager main script
#
# File: main.py
# Name: James Aiken
# Date: 25/03/2019
# Course: CSC4006 - Research and Development Project
# Desc: Script to handle the execution of a submitted adversarial test from the
# Hydra web application. Handles launching of the NIDS Neptune, as well
# as the Mininet SDN. Calls the TestSuite class to perform the requested
# adversarial attack.
#
#
# Usage: Program is executed from a terminal using 'sudo python TestManager/main.py'
# with appropriate arguments
#
# Args:
# target_classifier: machine learning clasifier to be used in test
# network_attack: network attack to be performed in test
# adversarial_attack: adversarial technique applied to network attack
#
# Requirements: test_suite.py and mininet Python API directory
#
#################################################################################
from __future__ import division
import os
import time
import sys
import logging
from subprocess import Popen, PIPE
from mininet.topo import Topo
from mininet.net import Mininet
from mininet.util import dumpNodeConnections
from mininet.log import setLogLevel
from mininet.cli import CLI
from mininet.node import OVSSwitch, RemoteController
from test_suite import TestSuite
#################################################################################
# main()
#
# Main method for the TestManager package to handle an adversarial test
# Launches Neptune and Mininet before calling the TestSuite to perform the test
# Outputs results to file
#
# Args:
# target_classifier: machine learning clasifier to be used in test
# network_attack: network attack to be performed in test
# adversarial_attack: adversarial technique applied to network attack
#
# Terminates Neptune and Mininet when finished
#
def main():
dir = "/home/james/Documents/University/ResearchProject/HydraWebApp/App/"
net_assigned = False
try:
# Read testing arguments
target_classifier = sys.argv[1]
network_attack = sys.argv[2]
adversarial_attack = sys.argv[3]
training_dir = dir + "nids_config/training_status.txt"
# Configure and launch Mininet
setLogLevel('info')
net = launch_network()
net_assigned = True
#Launch NIDS
try:
with open(training_dir,'w') as training_status:
training_status.write("")
except:
logging.error('Unable to open training status directory')
launch_nids(dir, target_classifier)
print("\n")
# Initialise test suite and execute attack
ts = TestSuite(dir, network_attack, adversarial_attack, net)
result = ts.run_test()
time.sleep(5)
results_dir = dir + "TestManager/test_results/results.txt"
try:
with open(results_dir,"a+") as results:
# Write results based on what the test has returned
output = result
results.write(str(output))
results.write("\n")
except:
logging.error('Unable to open and write to results file')
except KeyboardInterrupt:
print("Stopping Neptune..")
os.system('sudo pkill -f Neptune/main.py')
print("\nStopping network..\n")
if net_assigned:
net.stop()
sys.exit(0)
finally:
print("Stopping Neptune..")
os.system('sudo pkill -f Neptune/main.py')
if net_assigned:
print("\nStopping network..\n")
net.stop()
time.sleep(5)
with open(training_dir,'w') as training_status:
training_status.write("")
#################################################################################
# launch_nids(dir, target_classifier)
#
# Function to launch the intrusion detection system Neptune in a child terminal
# with appropriate congiguration
# Sleeps until Neptune has trained its classifier
#
# Args:
# dir: base directory of application
# target_classifier: classifier to be used by Neptune
#
def launch_nids(dir, target_classifier):
print("Launching Neptune..")
classifier_dir = dir + "nids_config/classifier_type.txt"
try:
with open(classifier_dir,"w") as classifier_config:
# Write classifier type to Neptune configuration file
if str(target_classifier) == "Random Forest Model":
classifier_config.write("1")
elif str(target_classifier) == "KNN":
classifier_config.write("2")
elif str(target_classifier) == "SVM":
classifier_config.write("3")
elif str(target_classifier) == "Neural Network":
classifier_config.write("4")
elif str(target_classifier) == "Logistic Regression":
classifier_config.write("5")
else:
classifier_config.write("-1")
print("Invalid classifier")
except:
logging.error('Unable to open and write to classifier config directory')
time.sleep(2)
# Launch Neptune in child terminal
neptune_cmd = "sudo python -W ignore " + dir + "Neptune/"+ "main.py"
Popen(['gnome-terminal', '-e', neptune_cmd], stdout=PIPE)
training_status_dir = dir + "nids_config/training_status.txt"
training_wait = True
# Wait for Neptune to train classifier
while(training_wait):
time.sleep(1)
with open(training_status_dir, "r") as training_status:
for line in training_status:
if str(line) == '1':
training_wait = False
time.sleep(2)
#################################################################################
# launch_network()
#
# Launches the Mininet network, connecting to the remote controller
#
# Returns:
# net: the Mininet class network object
#
def launch_network():
net = Mininet(autoSetMacs = True, cleanup = True)
s1 = net.addSwitch('s1')
for n in range(1,15):
h = net.addHost('h%s' % n)
net.addLink(h, s1)
net.addController('c0', controller=RemoteController, ip='127.0.0.1', port=6653)
net.start()
time.sleep(5)
print("Network started..")
return net
#################################################################################
# logo()
#
# Display Hydra logo on app launch using print()
#
def logo():
"""
Prints the logo of Hydra
"""
logo = '''
---------------------------------------------------
_ _ _ _ _ _ _ _ ____ ___
| | | || || || || _ | | _ / / _ /
| |_| || /| |/ || | | | | |_) )| |_| |
| _ | /_ _/ | | | | | __/ | _ |
| | | | | | | |_/ / | | | | | |
|_| |_| |_| |_____/ |_| |_| |_|
---------------------------------------------------
'''
print(logo)
if __name__ == '__main__':
logo()
main()