This repository has been archived by the owner on Jul 23, 2023. It is now read-only.
/
werkzeug_connector.py
150 lines (120 loc) · 5.18 KB
/
werkzeug_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# Shadow Daemon -- Web Application Firewall
#
# Copyright (C) 2014-2021 Hendrik Buchwald <hb@zecure.org>
#
# This file is part of Shadow Daemon. Shadow Daemon is free software: you can
# redistribute it and/or modify it under the terms of the GNU General Public
# License as published by the Free Software Foundation, version 2.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from .connector import Input, Output, Connector
from werkzeug.datastructures import ImmutableMultiDict
class InputWerkzeug(Input):
def __init__(self, request):
self.request = request
def get_client_ip(self):
return self.request.environ.get(self.config.get('client_ip', default='REMOTE_ADDR'))
def get_caller(self):
return self.request.environ.get(self.config.get('caller', default='PATH_INFO'))
def get_resource(self):
host = self.request.host
url = self.request.url
return url[url.find(host) + len(host):]
def gather_input(self):
# Reset input.
self.input = {}
# Save GET parameters in input.
get_input = self.request.args
for key in get_input:
path = 'GET|' + self.escape_key(key)
values = get_input.getlist(key)
if len(values) > 1:
for index, value in enumerate(values):
self.input[path + '|' + str(index)] = value
else:
self.input[path] = values[0]
# Save POST parameters in input.
post_input = self.request.form
for key in post_input:
path = 'POST|' + self.escape_key(key)
values = post_input.getlist(key)
if len(values) > 1:
for index, value in enumerate(values):
self.input[path + '|' + str(index)] = value
else:
self.input[path] = values[0]
# Save raw data in input. Has to be done AFTER post_input!
data_raw = self.request.data
if data_raw:
self.input['DATA|raw'] = data_raw
# Save cookies in input.
for key in self.request.cookies:
self.input['COOKIE|' + self.escape_key(key)] = self.request.cookies[key]
# Save headers in input.
for key in self.request.environ:
if key[:5] == 'HTTP_':
self.input['SERVER|' + self.escape_key(key)] = self.request.environ[key]
# Save the file names of uploads.
files_input = self.request.files
for key in files_input:
path = 'FILES|' + self.escape_key(key)
values = files_input.getlist(key)
if len(values) > 1:
for index, value in enumerate(values):
self.input[path + '|' + str(index)] = value.filename
else:
self.input[path] = values[0].filename
def defuse_input(self, threats):
# Get the input and create copy to make it mutable.
get_input = self.request.args.copy()
post_input = self.request.form.copy()
cookies_input = self.request.cookies.copy()
files_input = self.request.files.copy()
# Remove threats.
for path in threats:
path_split = self.split_path(path)
if len(path_split) < 2:
continue
key = self.unescape_key(path_split[1])
if path_split[0] == 'SERVER':
self.request.environ[key] = u''
elif path_split[0] == 'COOKIE':
cookies_input[key] = u''
elif path_split[0] == 'GET':
if len(path_split) == 3:
get_list = get_input.getlist(key)
get_list[int(path_split[2])] = u''
get_input.setlist(key, get_list)
else:
get_input[key] = u''
elif path_split[0] == 'POST':
if len(path_split) == 3:
post_list = post_input.getlist(key)
post_list[int(path_split[2])] = u''
post_input.setlist(key, post_list)
else:
post_input[key] = u''
elif path_split[0] == 'FILES':
# GET/POST approach for arrays does not work, because the upload is deleted.
del files_input[key]
elif path_split[0] == 'DATA':
self.request.data = u''
# Update the GET data.
self.request.args = ImmutableMultiDict(get_input)
# Update the POST data.
self.request.form = ImmutableMultiDict(post_input)
# Update the cookies.
self.request.cookies = cookies_input
# Update the file uploads.
self.request.files = ImmutableMultiDict(files_input)
# Don't stop the complete request.
return True
def gather_hashes(self):
# Integrity check not supported, because everything is routed through one file.
self.hashes = {}