In [2]:
import os
import dask.dataframe as dd
import pandas as pd
import numpy as np



In [42]:
print("Loading .csv netflow files from 'data/csv/netflow/' and saving as chuncked .parquet files\n")

csv_dir = "data/csv/netflow/"
parquet_dir = "data/parquet/netflow/"

i = 1
for file in os.listdir(csv_dir):
    
    file_name, file_extension = os.path.splitext(file)
    
    if file_extension == ".csv" and not os.path.exists(parquet_dir+"no_ip_port/"+file_name+"_chunks"):
    
        # Load csv file
        print("#"+str(i)+"\nLoading: '"+file+"' ...")
        csv_path = csv_dir+file
        dd_df = dd.read_csv(csv_path)
        
        # Save to parquet file original NetFlow data
        orig_parquet = "original/"+file_name+"_chunks"
        print("Saving original to: '"+parquet_dir+orig_parquet+"' ...")
        dd_df = dd_df.repartition(partition_size="100MB")
        dd_df.to_parquet(parquet_dir+orig_parquet, compression="gzip")
        
        # Save to parquet file NetFlow data with src/dst ip/port
        # Drop duplicates
        parquet_ip_port = "ip_port/"+file_name+"_chunks"
        print("Saving column selection with src/dst ip/port to: '"+parquet_dir+parquet_ip_port+"' ...")
        dd_df = dd_df[['IPV4_SRC_ADDR', 'L4_SRC_PORT', 
                 'IPV4_DST_ADDR', 'L4_DST_PORT',
                'PROTOCOL', 'L7_PROTO', 'IN_BYTES', 'OUT_BYTES', 'IN_PKTS', 'OUT_PKTS',
                'TCP_FLAGS', 'FLOW_DURATION_MILLISECONDS', 'Label']]
        dd_df = dd_df.drop_duplicates()
        dd_df = dd_df.repartition(partition_size="100MB")
        dd_df.to_parquet(parquet_dir+parquet_ip_port, compression="gzip")

        # Save to parquet file NetFlow data wo src/dst ip/port
        # Drop duplicates
        parquet_no_ip_port = "no_ip_port/"+file_name+"_chunks"
        print("Saving column selection wo src/dst ip/port to: '"+parquet_dir+parquet_no_ip_port+"' ...")
        dd_df = dd_df[['PROTOCOL', 'L7_PROTO', 'IN_BYTES', 'OUT_BYTES', 'IN_PKTS', 'OUT_PKTS',
                'TCP_FLAGS', 'FLOW_DURATION_MILLISECONDS', 'Label']]
        dd_df = dd_df.drop_duplicates()
        dd_df = dd_df.repartition(partition_size="100MB")
        #print("Shape:" +str(dd_df.shape[0].compute())+", "+str(dd_df.shape[1]))
        dd_df.to_parquet(parquet_dir+parquet_no_ip_port, compression="gzip")
        i += 1

Loading .csv netflow files from 'data/csv/netflow/' and saving as chuncked .parquet files

#1
Loading: NF-BoT-IoT-v2.csv ...
Saving original to: data/parquet/netflow/original/NF-BoT-IoT-v2_chunks ...
Saving column selection with src/dst ip/port to: data/parquet/netflow/ip_port/NF-BoT-IoT-v2_chunks ...
Saving column selection wo src/dst ip/port to: data/parquet/netflow/no_ip_port/NF-BoT-IoT-v2_chunks ...


In [None]:
print("Loading .csv cicflow files from 'data/csv/cic/' and saving as chuncked .parquet files\n")

csv_dir = "data/csv/cic/"
parquet_dir = "data/parquet/cic/"

i = 1
for file in os.listdir(csv_dir):
    
    file_name, file_extension = os.path.splitext(file)
    
    if file_extension == ".csv":
         
        # Load csv file
        print("#"+str(i)+"\nLoading: "+file+" ...")
        csv_path = csv_dir+file
        dd_df = dd.read_csv(csv_path)
        dd_df = dd_df.dropna()
                
        # Save to parquet file original cicflow data
        orig_parquet = "original/"+file_name+"_chunks"
        print("Saving original to: "+parquet_dir+orig_parquet+" ...")
        dd_df = dd_df.repartition(partition_size="100MB")
        dd_df.to_parquet(parquet_dir+orig_parquet, compression="gzip")
        i += 1

Loading .csv cicflow files from 'data/csv/cic/' and saving as chuncked .parquet files

#1
Loading: CIC-BoT-IoT.csv ...
Saving original to: data/parquet/cic/original/CIC-BoT-IoT_chunks ...
