Skip to content

Commit

Permalink
βœ… πŸ”… ℹ️
Browse files Browse the repository at this point in the history
πŸ”… added new core attribute _pcap_read to hold PcapReader generator
ℹ️ minor updates
βœ… pcap_payload method added
βœ… split_by_n method added
  • Loading branch information
securisecctf committed Dec 3, 2019
1 parent 9eee62f commit d5c146a
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 11 deletions.
3 changes: 3 additions & 0 deletions chepy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def __init__(self, *data):
#: Holds all the methods that are called/chanined and their args
self._stack = list()
#: Holder for scapy pcap reader
self._pcap_read = None
#: Holder for scapy assembled sessions
self._pcap_sessions = None

#: Log level
Expand Down Expand Up @@ -937,6 +939,7 @@ def read_pcap(self):
Chepy: The Chepy object.
"""
pcap_file = scapy_utils.rdpcap(self.state)
self._pcap_read = scapy_utils.PcapReader(self.state)
self.state = GREEN("Pcap loaded")
self._pcap_sessions = pcap_file.sessions(full_duplex)
return self
Expand Down
21 changes: 21 additions & 0 deletions chepy/modules/pcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,24 @@ def pcap_http_streams(self):

self.state = hold
return self

@ChepyDecorators.call_stack
def pcap_payload(self, layer: str):
"""Get an array of payloads based on provided layer
Args:
layer (str): A valid Scapy layer.
Returns:
Chepy: The Chepy object.
"""
assert hasattr(scapy, layer), "Not a valid Scapy layer"
hold = []
for packet in self._pcap_read:
if not packet.haslayer(layer):
continue
check_raw = packet.haslayer("Raw")
if check_raw:
hold.append(packet.getlayer(scapy.Raw).load)
self.state = hold
return self
13 changes: 13 additions & 0 deletions chepy/modules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,19 @@ def split_by(self, pattern: str = "\n", trim=True):
self.state = re.split(pattern, self._convert_to_str())
return self

@ChepyDecorators.call_stack
def split_by_n(self, n: int):
"""Split a string by n characters.
Args:
n (int): [description]
Returns:
Chepy: The Chepy object.
"""
self.state = re.findall(".{1," + str(n) + "}", self._convert_to_str())
return self

@ChepyDecorators.call_stack
def unique(self):
"""Get an array of unique list items
Expand Down
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@ requests
requests-toolbelt
scapy
ua-parser
ujson
tqdm
ujson
Binary file modified tests/files/test.pcapng
Binary file not shown.
17 changes: 8 additions & 9 deletions tests/test_pcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ def test_pcap_dns():
== 3
)


def test_pcap_http_streams():
assert (
len(
Chepy("tests/files/test.pcapng")
.read_pcap()
.pcap_http_streams()
.o
)
== 4
)
assert len(Chepy("tests/files/test.pcapng").read_pcap().pcap_http_streams().o) == 4


def test_pcap_payload():
assert Chepy("tests/files/test.pcapng").read_pcap().pcap_payload(
layer="ICMP"
).o == [b"secret", b"message"]

4 changes: 4 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ def test_split_by():
assert len(Chepy("some lol random lolol data").split_by("lo", trim=False).o) == 4


def test_split_by_n():
assert Chepy("some string").split_by_n(2).o[2] == " s"


def test_unique():
assert len(Chepy('["a", "a", 1]').str_list_to_list().unique().o) == 2

Expand Down

0 comments on commit d5c146a

Please sign in to comment.