Skip to content

Commit

Permalink
πŸ”… βœ… ℹ️
Browse files Browse the repository at this point in the history
βœ… Pkt2Dict class added to internal.functions
βœ… pcap_to_dict method added
βœ… get_every_n method added
  • Loading branch information
securisecctf committed Dec 3, 2019
1 parent 2d53134 commit b39a083
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/tests_multi_os.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ jobs:
restore-keys: |
${{ runner.os }}-pip-dev-
- name: Install
if: steps.cache.outputs.cache-hit != 'true'
run: |
pip install -r dev_requirements.txt
python -m pip install --upgrade pip
Expand Down Expand Up @@ -81,6 +82,7 @@ jobs:
restore-keys: |
${{ runner.os }}-pip-dev-
- name: Install
if: steps.cache.outputs.cache-hit != 'true'
run: |
pip install -r dev_requirements.txt
python -m pip install --upgrade pip
Expand Down Expand Up @@ -125,6 +127,7 @@ jobs:
restore-keys: |
${{ runner.os }}-pip-dev-
- name: Install
if: steps.cache.outputs.cache-hit != 'true'
run: |
pip install -r dev_requirements.txt
python -m pip install --upgrade pip
Expand Down
9 changes: 5 additions & 4 deletions chepy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import ujson
import jsonpickle
import regex as re
import scapy.utils as scapy_utils
import scapy.all as scapy
from decorator import decorator

from .modules.exceptions import PrintException
Expand Down Expand Up @@ -145,7 +145,7 @@ def _abs_path(self, path: str) -> str:
path (str): Path to expand
Returns:
str: Expanded absolute path
object: Path object
"""
return pathlib.Path(path).expanduser().absolute()

Expand Down Expand Up @@ -938,8 +938,9 @@ def read_pcap(self):
Returns:
Chepy: The Chepy object.
"""
pcap_file = scapy_utils.rdpcap(self.state)
self._pcap_read = scapy_utils.PcapReader(self.state)
pcap_path = str(self._abs_path(self.state))
self._pcap_read = scapy.PcapReader(pcap_path)
pcap_file = scapy.rdpcap(pcap_path)
self.state = GREEN("Pcap loaded")
self._pcap_sessions = pcap_file.sessions(full_duplex)
return self
Expand Down
41 changes: 41 additions & 0 deletions chepy/modules/internal/functions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections import ChainMap
import scapy.all as scapy


Expand Down Expand Up @@ -60,3 +61,43 @@ def full_duplex(p): # pragma: no cover
# else:
# sess = p.sprintf("Ethernet type=%04xr,Ether.type%")
return sess


def _layer2dict(obj):
d = {}

if not getattr(obj, "fields_desc", None):
return
for f in obj.fields_desc:
value = getattr(obj, f.name)
if value is type(None): # pragma: no cover
value = None

if not isinstance(
value, (int, float, str, bytes, bool, list, tuple, set, dict, type(None))
):
value = _layer2dict(value)
d[f.name] = value
return {obj.name: d}


class Pkt2Dict:
def __init__(self, pkt):
self.pkt = pkt

def to_dict(self):
"""
Turn every layer to dict, store in ChainMap type.
:return: ChainMaq
"""
d = list()
count = 0

while True:
layer = self.pkt.getlayer(count)
if not layer:
break
d.append(_layer2dict(layer))

count += 1
return dict(**ChainMap(*list(filter(lambda x: x is not None, d))))
37 changes: 25 additions & 12 deletions chepy/modules/pcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from scapy.utils import PcapNgReader, PcapReader

from ..core import ChepyCore, ChepyDecorators
from .internal.functions import Pkt2Dict


class Pcap(ChepyCore):
Expand All @@ -17,19 +18,18 @@ def pcap_dns_queries(self):
Examples:
>>> Chepy("tests/files/test.pcapng").read_pcap().pcap_dns_queries().o
[
{'frame': 1, 'query': b'fcmconnection.googleapis.com.'},
b'fcmconnection.googleapis.com.',
...
{'frame': 9, 'query': b'google.com.'}
b'google.com.'
]
"""
hold = []
sessions = self._pcap_sessions
for session in sessions:
packets = sessions.get(session)
for packet in packets:
if not packet.haslayer(scapy.DNSRR):
if not scapy.DNSQR in packet:
continue
dns = packet.getlayer("DNS")
query = packet.getlayer("DNS").qd.qname
hold.append(query)
self.state = hold
Expand All @@ -51,23 +51,23 @@ def pcap_http_streams(self):
packets = sessions.get(session)
req_res = {"request": {}, "response": {}}
for packet in packets:
if not packet.haslayer(scapy_http.HTTP):
if not scapy_http.HTTP in packet:
continue
if packet.haslayer(scapy_http.HTTPRequest):
if scapy_http.HTTPRequest in packet:
req_res["request"]["headers"] = packet.getlayer(
scapy_http.HTTPRequest
).fields
if packet.haslayer(scapy_http.Raw):
if scapy_http.Raw in packet:
req_res["request"]["payload"] = packet.getlayer(
scapy_http.Raw
).load
else:
req_res["request"]["payload"] = {}
if packet.haslayer(scapy_http.HTTPResponse):
if scapy_http.HTTPResponse in packet:
req_res["response"]["headers"] = packet.getlayer(
scapy_http.HTTPResponse
).fields
if packet.haslayer(scapy_http.Raw):
if scapy_http.Raw in packet:
req_res["response"]["payload"] = packet.getlayer(
scapy_http.Raw
).load
Expand All @@ -84,18 +84,31 @@ def pcap_payload(self, layer: str):
"""Get an array of payloads based on provided layer
Args:
layer (str): A valid Scapy layer.
layer (str): Required. A valid Scapy layer.
Returns:
Chepy: The Chepy object.
"""
assert hasattr(scapy, layer), "Not a valid Scapy layer"
hold = []
for packet in self._pcap_read:
if not packet.haslayer(layer):
if not layer in packet:
continue
check_raw = packet.haslayer("Raw")
check_raw = scapy.Raw in packet
if check_raw:
hold.append(packet.getlayer(scapy.Raw).load)
self.state = hold
return self

@ChepyDecorators.call_stack
def pcap_to_dict(self):
"""Convert a pcap to a dict
Returns:
Chepy: The Chepy object.
"""
hold = []
for packet in self._pcap_read:
hold.append(Pkt2Dict(packet).to_dict())
self.state = hold
return self
25 changes: 24 additions & 1 deletion chepy/modules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,37 @@ def split_by_n(self, n: int):
"""Split a string by n characters.
Args:
n (int): [description]
n (int): n from 0
Returns:
Chepy: The Chepy object.
Examples:
>>> Chepy("some string").split_by_n(2).o[2]
" s"
"""
self.state = re.findall(".{1," + str(n) + "}", self._convert_to_str())
return self

@ChepyDecorators.call_stack
def get_every_n(self, n: int):
"""Get every nth item from a list or string.
Index starts at 0
Args:
n (int): n from 0
Returns:
Chepy: The Chepy object.
Examples:
>>> Chepy(["a", 1, "lol", "b", True]).get_every_n(3)
["a", "b"]
"""
self.state = self.state[0::n]
return self

@ChepyDecorators.call_stack
def unique(self):
"""Get an array of unique list items
Expand Down
8 changes: 8 additions & 0 deletions tests/test_pcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def test_pcap_dns():
.debug(True)
.read_pcap()
.pcap_dns_queries()
.set()
.o
)
== 3
Expand All @@ -23,3 +24,10 @@ def test_pcap_payload():
layer="ICMP"
).o == [b"secret", b"message"]


def test_packet_to_dict():
assert (
Chepy("tests/files/test.pcapng").read_pcap().pcap_to_dict().o[0]["IP"]["src"]
== "10.10.10.11"
)

4 changes: 4 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def test_split_by_n():
assert Chepy("some string").split_by_n(2).o[2] == " s"


def test_get_n():
assert Chepy(["a", 1, "lol", "", True]).get_every_n(3).o == ["a", ""]


def test_unique():
assert len(Chepy('["a", "a", 1]').str_list_to_list().unique().o) == 2

Expand Down

0 comments on commit b39a083

Please sign in to comment.