Skip to content

Commit

Permalink
remove not needed replacement code
Browse files Browse the repository at this point in the history
  • Loading branch information
mplattner committed Jun 26, 2020
1 parent b608b0e commit 96e756e
Show file tree
Hide file tree
Showing 8 changed files with 0 additions and 207 deletions.
13 changes: 0 additions & 13 deletions mitmproxy/http.py
Expand Up @@ -185,19 +185,6 @@ def copy(self):
f.response = self.response.copy()
return f

def replace(self, pattern, repl, *args, **kwargs):
"""
Replaces a regular expression pattern with repl in both request and
response of the flow. Encoded content will be decoded before
replacement, and re-encoded afterwards.
Returns the number of replacements made.
"""
c = self.request.replace(pattern, repl, *args, **kwargs)
if self.response:
c += self.response.replace(pattern, repl, *args, **kwargs)
return c


def make_error_response(
status_code: int,
Expand Down
42 changes: 0 additions & 42 deletions mitmproxy/net/http/headers.py
@@ -1,5 +1,3 @@
import re

import collections
from mitmproxy.coretypes import multidict
from mitmproxy.utils import strutils
Expand Down Expand Up @@ -147,46 +145,6 @@ def items(self, multi=False):
else:
return super().items()

def replace(self, pattern, repl, flags=0, count=0):
"""
Replaces a regular expression pattern with repl in each "name: value"
header line.
Returns:
The number of replacements made.
"""
if isinstance(pattern, str):
pattern = strutils.escaped_str_to_bytes(pattern)
if isinstance(repl, str):
repl = strutils.escaped_str_to_bytes(repl)
pattern = re.compile(pattern, flags)
replacements = 0
flag_count = count > 0
count_reached = False
fields = []
for name, value in self.fields:
if count_reached:
fields.append((name, value))
continue
line, n = pattern.subn(repl, name + b": " + value, count=count)
try:
name, value = line.split(b": ", 1)
except ValueError:
# We get a ValueError if the replacement removed the ": "
# There's not much we can do about this, so we just keep the header as-is.
pass
else:
replacements += n
fields.append((name, value))
if flag_count:
count -= n
if count == 0:
count_reached = True
continue
fields.append((name, value))
self.fields = tuple(fields)
return replacements


def parse_content_type(c):
"""
Expand Down
21 changes: 0 additions & 21 deletions mitmproxy/net/http/message.py
Expand Up @@ -250,24 +250,3 @@ def encode(self, e):
self.content = self.raw_content
if "content-encoding" not in self.headers:
raise ValueError("Invalid content encoding {}".format(repr(e)))

def replace(self, pattern, repl, flags=0, count=0):
"""
Replaces a regular expression pattern with repl in both the headers
and the body of the message. Encoded body will be decoded
before replacement, and re-encoded afterwards.
Returns:
The number of replacements made.
"""
if isinstance(pattern, str):
pattern = strutils.escaped_str_to_bytes(pattern)
if isinstance(repl, str):
repl = strutils.escaped_str_to_bytes(repl)
replacements = 0
if self.content:
self.content, replacements = re.subn(
pattern, repl, self.content, flags=flags, count=count
)
replacements += self.headers.replace(pattern, repl, flags=flags, count=count)
return replacements
21 changes: 0 additions & 21 deletions mitmproxy/net/http/request.py
Expand Up @@ -128,27 +128,6 @@ def make(

return req

def replace(self, pattern, repl, flags=0, count=0):
"""
Replaces a regular expression pattern with repl in the headers, the
request path and the body of the request. Encoded content will be
decoded before replacement, and re-encoded afterwards.
Returns:
The number of replacements made.
"""
if isinstance(pattern, str):
pattern = strutils.escaped_str_to_bytes(pattern)
if isinstance(repl, str):
repl = strutils.escaped_str_to_bytes(repl)

c = super().replace(pattern, repl, flags, count)
self.path, pc = re.subn(
pattern, repl, self.data.path, flags=flags, count=count
)
c += pc
return c

@property
def first_line_format(self):
"""
Expand Down
26 changes: 0 additions & 26 deletions test/mitmproxy/net/http/test_headers.py
Expand Up @@ -65,32 +65,6 @@ def test_bytes(self):
headers = Headers()
assert bytes(headers) == b""

def test_replace_simple(self):
headers = Headers(Host="example.com", Accept="text/plain")
replacements = headers.replace("Host: ", "X-Host: ")
assert replacements == 1
assert headers["X-Host"] == "example.com"
assert "Host" not in headers
assert headers["Accept"] == "text/plain"

def test_replace_multi(self):
headers = self._2host()
headers.replace(r"Host: example.com", r"Host: example.de")
assert headers.get_all("Host") == ["example.de", "example.org"]

def test_replace_remove_spacer(self):
headers = Headers(Host="example.com")
replacements = headers.replace(r"Host: ", "X-Host ")
assert replacements == 0
assert headers["Host"] == "example.com"

def test_replace_with_count(self):
headers = Headers(Host="foobarfoo.com", Accept="foo/bar")
replacements = headers.replace("foo", "bar", count=1)
assert replacements == 1
assert headers["Host"] == "barbarfoo.com"
assert headers["Accept"] == "foo/bar"


def test_parse_content_type():
p = parse_content_type
Expand Down
10 changes: 0 additions & 10 deletions test/mitmproxy/net/http/test_message.py
Expand Up @@ -100,16 +100,6 @@ def test_timestamp_end(self):
def test_http_version(self):
_test_decoded_attr(tutils.tresp(), "http_version")

def test_replace(self):
r = tutils.tresp()
r.content = b"foofootoo"
r.replace(b"foo", "gg")
assert r.content == b"ggggtoo"

r.content = b"foofootoo"
r.replace(b"foo", "gg", count=1)
assert r.content == b"ggfootoo"


class TestMessageContentEncoding:
def test_simple(self):
Expand Down
14 changes: 0 additions & 14 deletions test/mitmproxy/net/http/test_request.py
Expand Up @@ -61,20 +61,6 @@ def test_make(self):
with pytest.raises(TypeError):
Request.make("GET", "https://example.com/", headers=42)

def test_replace(self):
r = treq()
r.path = b"foobarfoo"
r.replace(b"foo", "bar")
assert r.path == "barbarbar"

r.path = b"foobarfoo"
r.replace(b"foo", "bar", count=1)
assert r.path == "barbarfoo"

r.path = "foobarfoo"
r.replace("foo", "bar", count=1)
assert r.path == "barbarfoo"

def test_first_line_format(self):
_test_passthrough_attr(treq(), "first_line_format")

Expand Down
60 changes: 0 additions & 60 deletions test/mitmproxy/test_http.py
Expand Up @@ -44,16 +44,6 @@ def test_get_url(self):
assert r.url == "https://address:22/path"
assert r.pretty_url == "https://foo.com:22/path"

def test_replace(self):
r = http.HTTPRequest.wrap(mitmproxy.test.tutils.treq())
r.path = "path/foo"
r.headers["Foo"] = "fOo"
r.content = b"afoob"
assert r.replace("(?i)foo", "boo") == 4
assert r.path == "path/boo"
assert b"foo" not in r.content
assert r.headers["boo"] == "boo"

def test_constrain_encoding(self):
r = http.HTTPRequest.wrap(mitmproxy.test.tutils.treq())
r.headers["accept-encoding"] = "gzip, oink"
Expand All @@ -78,14 +68,6 @@ def test_simple(self):
resp2 = resp.copy()
assert resp2.get_state() == resp.get_state()

def test_replace(self):
r = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp())
r.headers["Foo"] = "fOo"
r.content = b"afoob"
assert r.replace("(?i)foo", "boo") == 3
assert b"foo" not in r.content
assert r.headers["boo"] == "boo"

def test_get_content_type(self):
resp = http.HTTPResponse.wrap(mitmproxy.test.tutils.tresp())
resp.headers = Headers(content_type="text/plain")
Expand Down Expand Up @@ -212,48 +194,6 @@ def test_resume_duplicated(self):
f2.resume()
assert f.intercepted is f2.intercepted is False

def test_replace_unicode(self):
f = tflow.tflow(resp=True)
f.response.content = b"\xc2foo"
f.replace(b"foo", u"bar")

def test_replace_no_content(self):
f = tflow.tflow()
f.request.content = None
assert f.replace("foo", "bar") == 0

def test_replace(self):
f = tflow.tflow(resp=True)
f.request.headers["foo"] = "foo"
f.request.content = b"afoob"

f.response.headers["foo"] = "foo"
f.response.content = b"afoob"

assert f.replace("foo", "bar") == 6

assert f.request.headers["bar"] == "bar"
assert f.request.content == b"abarb"
assert f.response.headers["bar"] == "bar"
assert f.response.content == b"abarb"

def test_replace_encoded(self):
f = tflow.tflow(resp=True)
f.request.content = b"afoob"
f.request.encode("gzip")
f.response.content = b"afoob"
f.response.encode("gzip")

f.replace("foo", "bar")

assert f.request.raw_content != b"abarb"
f.request.decode()
assert f.request.raw_content == b"abarb"

assert f.response.raw_content != b"abarb"
f.response.decode()
assert f.response.raw_content == b"abarb"

def test_timestamp_start(self):
f = tflow.tflow()
assert f.timestamp_start == f.request.timestamp_start
Expand Down

0 comments on commit 96e756e

Please sign in to comment.