In [None]:
#!pip install scapy
#!pip install pandas
#!pip install matplotlib
#!pip install networkx

In [None]:
from scapy.all import * # Packet manipulation
import pandas as pd # Pandas - Create and Manipulate DataFrames
from datetime import datetime # Datetime - Convert Epoch to Datetime
import socket # Socket - Convert protocol numbers to protocol names
import networkx as nx # NetworkX - Create and Manipulate Graphs
import ipaddress # IPAddress - Check for multicast and broadcast addresses

In [None]:
class PCAPToDataFrame:
    def __init__(self, capture_file):
        self.capture_file = capture_file
        self.data_list = []

    def proto_name_by_num(self, proto_num):
        for name,num in vars(socket).items():
            if name.startswith("IPPROTO") and proto_num == num:
                return name[8:]
        return "Unknown"

    def data_extract(self, packet):
        # Convert the timestamp to a readable UTC time
        time = datetime.utcfromtimestamp(int(packet.time)).strftime('%Y-%m-%d %H:%M:%S')
        protocol = self.proto_name_by_num(int(packet[IP].proto))

        # Extract the desired data from the packet
        data = {
            "time": time,
            "src_ip": packet[IP].src,
            "src_mac": packet[Ether].src,
            "dst_ip": packet[IP].dst,
            "dst_mac": packet[Ether].dst,
            "protocol": protocol,
            "payload_size": len(packet[IP].payload)
        }

        # Check if the source or destination IP address is a multicast address
        src_ip = ipaddress.ip_address(packet[IP].src)
        dst_ip = ipaddress.ip_address(packet[IP].dst)
        if src_ip.is_multicast or dst_ip.is_multicast:
            data["multicast"] = True
        else:
            data["multicast"] = False

        # Check if the source and destination IP addresses are both private
        src_ip = ipaddress.ip_address(packet[IP].src)
        dst_ip = ipaddress.ip_address(packet[IP].dst)
        if src_ip.is_private and dst_ip.is_private:
            data["private_to_private"] = True
        else:
            data["private_to_private"] = False

        # Check if the destination IP address is a broadcast address
        dst_ip = ipaddress.ip_address(packet[IP].dst)
        if not dst_ip.is_global and dst_ip.is_link_local:
            data["dst_broadcast"] = True
        else:
            data["dst_broadcast"] = False

        # Extract the source and destination port numbers if they are present
        if packet.haslayer(TCP):
            data["src_port"] = int(packet[TCP].sport)
            data["dst_port"] = int(packet[TCP].dport)

        if packet.haslayer(UDP):
            data["src_port"] = int(packet[UDP].sport)
            data["dst_port"] = int(packet[UDP].dport)

        # Accumulate the data in a list
        self.data_list.append(data)

    def read_pcap_to_dataframe(self):
        def data_extract_wrapper(packet):
            self.data_extract(packet)

        packets = sniff(offline=self.capture_file, prn=data_extract_wrapper, store=0)

        # Create a Pandas DataFrame from the data
        df = pd.DataFrame(self.data_list)
        return df

In [None]:
# Create an instance of the PCAPToDataFrame class
pcap_to_df = PCAPToDataFrame("test.pcap")

# Read the PCAP file and create a DataFrame
df = pcap_to_df.read_pcap_to_dataframe()

In [None]:
df.head(3)

In [None]:
df1 = df[['src_ip', 'src_mac', 'dst_ip', 'dst_mac']]
G = nx.Graph()
G = nx.from_pandas_edgelist(df1, 'src_ip', 'dst_ip', create_using=nx.Graph())
nx.draw_planar(G, with_labels = True) 


In [None]:
# Filter the dataframe to only show rows where the source and destination IP addresses are both private
filtered_df = df[df["private_to_private"] == True]