# This script serve as an ETL PIPELINE for Cybersecurity_attack.csv

## loading various library 

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql.functions import col, mean, dayofweek, hour, to_timestamp
import logging

## initializing logging and spark instances

In [8]:
logging.basicConfig(level=logging.INFO)


In [9]:
# Create Spark Session
spark = SparkSession.builder.appName("CyberSecurityETL_CSV_Parquet").getOrCreate()

## loading the csv into spark dataframe using pyspark 

In [11]:
# dictionary directly explicity datatype

schema = StructType([
    StructField("Timestamp", StringType(), True),
    StructField("Source_IP_Address", StringType(), True),
    StructField("Destination_IP_Address", StringType(), True),
    StructField("Source_Port", IntegerType(), True),
    StructField("Destination_Port", IntegerType(), True),
    StructField("Protocol", StringType(), True),
    StructField("Packet_Length", IntegerType(), True),
    StructField("Packet_Type", StringType(), True),
    StructField("Traffic_Type", StringType(), True),
    StructField("Payload_Data", StringType(), True),

    StructField("Malware_Indicators", StringType(), True),
    StructField("Anomaly_Score", FloatType(), True),
    
    StructField("Alerts/Warnings", StringType(), True),
    StructField("Attack_Type", StringType(), True),
    
    StructField("Attack_Signature", StringType(), True),
    StructField("Action_Taken", StringType(), True),
    StructField("Severity_Level", StringType(), True),
    StructField("User_Information", StringType(), True),
    StructField("Device_Information", StringType(), True),

    StructField("Network_Segment", StringType(), True),
    StructField("Geo-location_Data", StringType(), True),
    StructField("Proxy_Information", StringType(), True),
    StructField("Firewall_Logs", StringType(), True),

    StructField("IDS/IPS_Alerts", StringType(), True),
    StructField("Log_Source", StringType(), True),

])

## creating  spark dataframe


In [18]:
df = spark.read.csv("cybersecurity_attacks.csv", header=True,inferSchema=True, sep=",")

In [19]:
df.show(10)

+--------------------+-----------------+----------------------+---------------+----------------+---------------+-------------+-----------+-------------+--------------------+------------------+------------------+---------------+-----------+----------------+------------+--------------+-------------------+--------------------+---------------+--------------------+-----------------+-------------+--------------+----------+
+--------------------+-----------------+----------------------+---------------+----------------+---------------+-------------+-----------+-------------+--------------------+------------------+------------------+---------------+-----------+----------------+------------+--------------+-------------------+--------------------+---------------+--------------------+-----------------+-------------+--------------+----------+
| 2023-05-30 06:33:58|    103.216.15.12|          84.9.164.252|          31225|           17616|           ICMP|          503|       Data|         HTTP|Qui na