In [None]:
#!/usr/bin/env python3

"""
Reference collector script for NetFlow v1, v5, and v9 Python package.
This file belongs to https://github.com/bitkeks/python-netflow-v9-softflowd.
Copyright 2016-2020 Dominik Pataky <software+pynetflow@dpataky.eu>
Licensed under MIT License. See LICENSE.
"""
import argparse
import gzip
import json
import logging
import queue
import socket
import socketserver
import threading
import time
import asyncio
import nest_asyncio
nest_asyncio.apply()
from collections import namedtuple


from package.ipfix import IPFIXTemplateNotRecognized
#from package.utils import *
from package.utils import UnknownExportVersion, parse_packet, flow_filter_v4, flow_filter_v6
from package.v9 import V9TemplateNotRecognized
from package.mysql_os import MysqlOperation
from package.influxdb_os import InsertRecords


RawPacket = namedtuple('RawPacket', ['ts', 'client', 'data'])
ParsedPacket = namedtuple('ParsedPacket', ['ts', 'client', 'export'])

# Amount of time to wait before dropping an undecodable ExportPacket
PACKET_TIMEOUT = 60 * 60

logger = logging.getLogger("netflow-collector")
ch = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)


class QueuingRequestHandler(socketserver.BaseRequestHandler):
    def handle(self):
        data = self.request[0]  # get content, [1] would be the socket
        self.server.queue.put(RawPacket(time.time(), self.client_address, data))
        logger.debug(
            "Received %d bytes of data from %s", len(data), self.client_address
        )


class QueuingUDPListener(socketserver.ThreadingUDPServer):
    """A threaded UDP server that adds a (time, data) tuple to a queue for
    every request it sees
    """

    def __init__(self, interface, queue):
        self.queue = queue

        # If IPv6 interface addresses are used, override the default AF_INET family
        if ":" in interface[0]:
            self.address_family = socket.AF_INET6

        super().__init__(interface, QueuingRequestHandler)


class ThreadedNetFlowListener(threading.Thread):
    """A thread that listens for incoming NetFlow packets, processes them, and
    makes them available to consumers.
    - When initialized, will start listening for NetFlow packets on the provided
      host and port and queuing them for processing.
    - When started, will start processing and parsing queued packets.
    - When stopped, will shut down the listener and stop processing.
    - When joined, will wait for the listener to exit
    For example, a simple script that outputs data until killed with CTRL+C:
    >>> listener = ThreadedNetFlowListener('0.0.0.0', 2055)
    >>> print("Listening for NetFlow packets")
    >>> listener.start() # start processing packets
    >>> try:
    ...     while True:
    ...         ts, export = listener.get()
    ...         print("Time: {}".format(ts))
    ...         for f in export.flows:
    ...             print(" - {IPV4_SRC_ADDR} sent data to {IPV4_DST_ADDR}"
    ...                   "".format(**f))
    ... finally:
    ...     print("Stopping...")
    ...     listener.stop()
    ...     listener.join()
    ...     print("Stopped!")
    """

    def __init__(self, host: str, port: int):
        logger.info("Starting the NetFlow listener on {}:{}".format(host, port))
        self.output = queue.Queue()
        self.input = queue.Queue()
        self.server = QueuingUDPListener((host, port), self.input)
        self.thread = threading.Thread(target=self.server.serve_forever)
        self.thread.start()
        self._shutdown = threading.Event()
        super().__init__()

    def get(self, block=True, timeout=None) -> ParsedPacket:
        """Get a processed flow.
        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a flow is available. If 'timeout' is
        a non-negative number, it blocks at most 'timeout' seconds and raises
        the queue.Empty exception if no flow was available within that time.
        Otherwise ('block' is false), return a flow if one is immediately
        available, else raise the queue.Empty exception ('timeout' is ignored
        in that case).
        """
        return self.output.get(block, timeout)

    def run(self):
        # Process packets from the queue
        try:
            templates = {"netflow": {}, "ipfix": {}}
            to_retry = []
            while not self._shutdown.is_set():
                try:
                    # 0.5s delay to limit CPU usage while waiting for new packets
                    pkt = self.input.get(block=True, timeout=0.5)  # type: RawPacket
                except queue.Empty:
                    continue

                try:
                    # templates is passed as reference, updated in V9ExportPacket
                    export = parse_packet(pkt.data, templates)
                except UnknownExportVersion as e:
                    logger.error("%s, ignoring the packet", e)
                    continue
                except (V9TemplateNotRecognized, IPFIXTemplateNotRecognized):
                    # TODO: differentiate between v9 and IPFIX, use separate to_retry lists
                    if time.time() - pkt.ts > PACKET_TIMEOUT:
                        logger.warning("Dropping an old and undecodable v9/IPFIX ExportPacket")
                    else:
                        to_retry.append(pkt)
                        logger.debug("Failed to decode a v9/IPFIX ExportPacket - will "
                                     "re-attempt when a new template is discovered")
                    continue

                if export.header.version == 10:
                    logger.debug("Processed an IPFIX ExportPacket with length %d.", export.header.length)
                else:
                    logger.debug("Processed a v%d ExportPacket with %d flows.",
                                 export.header.version, export.header.count)

                # If any new templates were discovered, dump the unprocessable
                # data back into the queue and try to decode them again
                if export.header.version in [9, 10] and export.contains_new_templates and to_retry:
                    logger.debug("Received new template(s)")
                    logger.debug("Will re-attempt to decode %d old v9/IPFIX ExportPackets", len(to_retry))
                    for p in to_retry:
                        self.input.put(p)
                    to_retry.clear()

                self.output.put(ParsedPacket(pkt.ts, pkt.client, export))
        finally:
            # Only reached when while loop ends
            self.server.shutdown()
            self.server.server_close()

    def stop(self):
        logger.info("Shutting down the NetFlow listener")
        self._shutdown.set()

    def join(self, timeout=None):
        self.thread.join(timeout=timeout)
        super().join(timeout=timeout)


def get_export_packets(host: str, port: int) -> ParsedPacket:
    """A threaded generator that will yield ExportPacket objects until it is killed
    """
    listener = ThreadedNetFlowListener(host, port)
    listener.start()
    try:
        while True:
            yield listener.get()
    finally:
        listener.stop()
        listener.join()
        
async def main(*flows):
    myTool = MysqlOperation()
    await asyncio.gather(
        
        myTool.insertRecords(*flows)
    )

if __name__ == "netflow.collector":
    logger.error("The collector is currently meant to be used as a CLI tool only.")
    logger.error("Use 'python3 -m netflow.collector -h' in your console for additional help.")



try:
        # With every parsed flow a new line is appended to the output file. In previous versions, this was implemented
        # by storing the whole data dict in memory and dumping it regularly onto disk. This was extremely fragile, as
        # it a) consumed a lot of memory and CPU (dropping packets since storing one flow took longer than the arrival
        # of the next flow) and b) broke the exported JSON file, if the collector crashed during the write process,
        # rendering all collected flows during the runtime of the collector useless (the file contained one large JSON
        # dict which represented the 'data' dict).

        # In this new approach, each received flow is parsed as usual, but it gets appended to a gzipped file each time.
        # All in all, this improves in three aspects:InnodbOperation
        # 1. collected flow data is not stored in memory any more
        # 2. received and parsed flows are persisted reliably
        # 3. the disk usage of files with JSON and its full strings as keys is reduced by using gzipped files
        # This also means that the files have to be handled differently, because they are gzipped and not formatted as
        # one single big JSON dump, but rather many little JSON dumps, separated by line breaks.
        flows = []
        for ts, client, export in get_export_packets("0.0.0.0", 9996):
            try: 
                export.flows[0].IP_PROTOCOL_VERSION
            except AttributeError:
                flow = flow_filter_v4(client, export)
            else:
                if(export.flows[0].IP_PROTOCOL_VERSION == 4):
                    flow = flow_filter_v4(client, export)
                else:
                    flow = flow_filter_v6(client, export)
            print(flow)
            flows.append(flow)
            if(len(flows) == 10000): 
                myTool = MysqlOperation()
                #innoTool = InnodbOperation()
                asyncio.run(asyncio.gather(myTool.insertRecords(*flows), InsertRecords(*flows)))
                flows = []


            
                
# (srcaddr, srcport, first, last, protocol, flows, packets, bytes)
# TODO change the format of arcaddr to varbinary
                
                
            #entry = {ts: {
            #    "client": client,
            #    "header": export.header.to_dict(),
            #    "flows": [flow.data for flow in export.flows]}
            #}
        line = json.dumps(entry).encode() + b"\n"  # byte encoded line
        with gzip.open("{}.gz".format(int(time.time())), "ab") as fh:  # open as append, not reading the whole file
                fh.write(line)
except KeyboardInterrupt:
        logger.info("Received KeyboardInterrupt, passing through")
        pass

In [4]:
!pip install aioinflux
#!pip install aiomysql

Collecting aioinflux
  Using cached aioinflux-0.9.0-py3-none-any.whl (16 kB)
Collecting ciso8601
  Using cached ciso8601-2.1.3.tar.gz (15 kB)
Collecting aiohttp>=3.0
  Downloading aiohttp-3.6.2-cp36-cp36m-manylinux1_x86_64.whl (1.2 MB)
[K     |████████████████████████████████| 1.2 MB 3.0 MB/s eta 0:00:01
Collecting typing-extensions>=3.6.5; python_version < "3.7"
  Downloading typing_extensions-3.7.4.2-py3-none-any.whl (22 kB)
Collecting async-timeout<4.0,>=3.0
  Using cached async_timeout-3.0.1-py3-none-any.whl (8.2 kB)
Collecting multidict<5.0,>=4.5
  Downloading multidict-4.7.6-cp36-cp36m-manylinux1_x86_64.whl (148 kB)
[K     |████████████████████████████████| 148 kB 7.8 MB/s eta 0:00:01
[?25hCollecting yarl<2.0,>=1.0
  Downloading yarl-1.4.2-cp36-cp36m-manylinux1_x86_64.whl (252 kB)
[K     |████████████████████████████████| 252 kB 8.1 MB/s eta 0:00:01
Collecting idna-ssl>=1.0; python_version < "3.7"
  Downloading idna-ssl-1.1.0.tar.gz (3.4 kB)
Building wheels for collected pack

In [3]:
!python get-pip.py pip==19.3.1

python: can't open file 'get-pip.py': [Errno 2] No such file or directory


In [None]:
            print(export)
            for flow in export.flows:
                try:
                    flows.IPV4_SRC_ADDR
                except NameError:
                    srcaddr = 0
                
                 # convert string ip to int ip for save storage space   
                if isinstance(flow.IPV4_SRC_ADDR, str):
                    srcaddr = str_ip_to_int(flow.IPV4_SRC_ADDR)
                else:
                    srcaddr = flow.IPV4_SRC_ADDR
                
                srcport = flow.L4_SRC_PORT
                print(srcaddr)
                print(flow)
                print(type(flow))
            

In [6]:

import pandas as pd

l1 = [(2130706433, 7788, 4214989963, 4215020796, 6, 10, 208, 164207),(2130706433, 7788, 12108, 57372, 1, 3, 21, 1849)]
# new_record = pd.DataFrame(data=l1)
new_record = pd.DataFrame(data=l1, columns=['srcaddr','srcport','First','last', 'protocol', 'flows', 'packets', 'bytes'])
new_record = pd.DataFrame(data=new_record, columns=['srcaddr','srcport','First'])

new_record

Unnamed: 0,srcaddr,srcport,First
0,2130706433,7788,4214989963
1,2130706433,7788,12108
