# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.

#### Import libraries and initialize Spark session

In [None]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, when, hour, date_format, to_timestamp
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

#### Read the data from the Glue Catalog


In [None]:
# Get the table name from your Glue Data Catalog
database_name = "security_logs_db"
table_name = "raw_apache_logs_txt"  # Use your actual table name

# Read the raw apache logs
datasource = glueContext.create_dynamic_frame.from_catalog(
    database=database_name,
    table_name=table_name
)

# Convert to DataFrame for easier processing
df = datasource.toDF()

# Look at the schema
df.printSchema()

# Display a few rows to understand the data
df.show(5, truncate=False)

![output1](images\root.png)

#### Clean and transform the data


In [None]:
# Based on the schema we saw in the previous step, process the data
# The column names will match what we saw in your schema image

# Process the data based on your schema
processed_df = df.select(
    col("clientip").alias("ip"),
    col("timestamp"),
    col("verb").alias("method"),
    col("request").alias("endpoint"),
    col("httpversion").alias("http_version"),
    col("response").cast("integer").alias("status_code"),
    col("bytes").cast("integer"),
    col("referrer"),
    col("agent").alias("user_agent")
)

# Convert timestamp string to proper timestamp
processed_df = processed_df.withColumn(
    "timestamp_parsed", 
    to_timestamp(col("timestamp"), "dd/MMM/yyyy:HH:mm:ss Z")
)

# Add security-relevant columns
security_df = processed_df \
    .withColumn("hour_of_day", hour(col("timestamp_parsed"))) \
    .withColumn("date", date_format(col("timestamp_parsed"), "yyyy-MM-dd")) \
    .withColumn("is_error", when(col("status_code") >= 400, 1).otherwise(0)) \
    .withColumn("is_client_error", when((col("status_code") >= 400) & (col("status_code") < 500), 1).otherwise(0)) \
    .withColumn("is_server_error", when(col("status_code") >= 500, 1).otherwise(0)) \
    .withColumn("is_potential_attack", when(
        (col("endpoint").like("%../../../%")) |  # Path traversal attempts
        (col("endpoint").like("%exec%")) |       # Command execution attempts
        (col("endpoint").like("%select%")) |     # SQL injection attempts
        (col("endpoint").like("%union%")) |      # SQL injection attempts
        (col("endpoint").like("%script%")),      # XSS attempts
        1).otherwise(0))

# Display sample of the enriched data
security_df.show(5, truncate=False)

![output2](images\image.png)

#### Get some summary statistics on the security data


In [None]:
# Count total records
total_logs = security_df.count()
print(f"Total log entries: {total_logs}")

# Count errors by type
error_count = security_df.filter(col("is_error") == 1).count()
client_error_count = security_df.filter(col("is_client_error") == 1).count()
server_error_count = security_df.filter(col("is_server_error") == 1).count()
attack_count = security_df.filter(col("is_potential_attack") == 1).count()

print(f"Error responses: {error_count} ({error_count/total_logs*100:.2f}%)")
print(f"Client errors: {client_error_count} ({client_error_count/total_logs*100:.2f}%)")
print(f"Server errors: {server_error_count} ({server_error_count/total_logs*100:.2f}%)")
print(f"Potential attacks: {attack_count} ({attack_count/total_logs*100:.2f}%)")

# Get top 5 IP addresses with most requests
print("\nTop 5 IP addresses by request count:")
security_df.groupBy("ip").count().orderBy(col("count").desc()).show(5)

# Get top 5 requested endpoints
print("\nTop 5 requested endpoints:")
security_df.groupBy("endpoint").count().orderBy(col("count").desc()).show(5)

# Count requests by hour of day
print("\nRequests by hour of day:")
security_df.groupBy("hour_of_day").count().orderBy("hour_of_day").show(24)

![output3](images\summary.png)

#### Select final columns and save the data


# Select the final columns for our security analysis
final_df = security_df.select(
    "ip", "timestamp", "timestamp_parsed", "date", "hour_of_day", 
    "method", "endpoint", "http_version", "status_code", "bytes",
    "is_error", "is_client_error", "is_server_error", "is_potential_attack",
    "referrer", "user_agent"
)

# Define the output path

output_path = "s3://security-log-analysis-bucket/security-log-analysis-bucket-database/security-log-analysis-transformed/"

# Convert back to DynamicFrame
output_dyf = DynamicFrame.fromDF(final_df, glueContext, "output_dyf")

# Write the transformed data to S3
glueContext.write_dynamic_frame.from_options(
    frame=output_dyf,
    connection_type="s3",
    connection_options={"path": output_path},
    format="parquet"
)
glueContext.write_dynamic_frame.from_options(
    frame=output_dyf,
    connection_type="s3",
    connection_options={"path": output_path},
    format="json"
)

print("Transformation complete. Data written to:", output_path)

Output: Transformation complete. Data written to: s3://security-log-analysis-bucket/security-log-analysis-bucket-database/security-log-analysis-transformed/