# Apache spark Pipeline to analyze web server logs

## Extract

In [13]:
from pyspark.sql import *
import pyspark.sql.types as t
import pyspark.sql.functions as f

In [14]:
spark = SparkSession.builder \
    .appName("logsApp") \
    .config('spark.dirver.maxResultSize', '4096') \
    .master("local[5]") \
    .getOrCreate()

In [15]:
file_df = spark.read \
    .text(r"D:\project\web-server-logs-analysis\data\calgary_access_log\access_log")

## Transform

### Use regex to extract columns from text file

In [16]:
log_reg = r'^(local|remote)( - - )\[([\w\s:/-]+)\] \"(\w{3,8}) (\w.+) ?([\w/.\d]+)?\" (\d{3}) ([\d|-]+)$'

logs_df = file_df.select(f.regexp_extract('value', log_reg, 1).alias('host'),
                         f.regexp_extract('value', log_reg, 3).alias('date'),
                         f.regexp_extract('value', log_reg, 4).alias('http_method'),
                         f.regexp_extract('value', log_reg, 5).alias('filename'),
                         f.regexp_extract('value', log_reg, 7).alias('status_code'),
                         f.regexp_extract('value', log_reg, 8).alias('reply_size'))

### Set schema

In [17]:
date_pattern = "dd/MMM/yyyy:HH:mm:ss Z"
logs_df = logs_df.withColumn("host", f.col("host").cast("string")) \
    .withColumn("date", f.to_timestamp(f.col("date"), date_pattern)) \
    .withColumn("http_method", f.col("http_method").cast("string")) \
    .withColumn("filename", f.col("filename").cast("string")) \
    .withColumn("status_code", f.col("status_code").cast("int")) \
    .withColumn("reply_size", f.col("reply_size").cast("int"))

logs_df = logs_df.dropna(subset=['status_code'])

### Add new columns

In [18]:
@f.udf(returnType=t.StringType())
def status_category_udf(status: int):
    if not status:
        return "-"
    elif 200 <= status <= 299:
        return "2xx"
    elif 300 <= status <= 399:
        return "3xx"
    elif 400 <= status <= 499:
        return "4xx"
    elif 500 <= status <= 599:
        return "5xx"
    else:
        return "-"

@f.udf(returnType=t.StringType())
def size_category_udf(size: int):
    if not size:
        return "-"
    elif 0 <= size <= 1_00_000:
        return "small"
    elif 1_01_000 <= size <= 2_00_000:
        return "medium"
    else:
        return "large"

In [19]:
# there can be multiple files in one request, hence use a regex and get the first file extension
logs_df = logs_df \
    .withColumn("file_extension", f.regexp_extract(f.col("filename"), r"\.[\w-]+", 0))

# remove "." from the start of string
logs_df = logs_df \
    .withColumn("file_extension", f.substring(f.lower("file_extension"), 2, 100))

In [20]:
# extract components from date
logs_df = logs_df \
    .withColumn("year", f.year("date")) \
    .withColumn("month", f.month("date")) \
    .withColumn("day_of_week", f.dayofweek("date"))

In [21]:
# status category column
logs_df = logs_df \
    .withColumn("status_category", status_category_udf(logs_df.status_code))

# categorize based on reply size
logs_df = logs_df \
    .withColumn("size_category", size_category_udf(logs_df.status_code))

In [22]:
logs_df.printSchema()

root
 |-- host: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- http_method: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- status_code: integer (nullable = true)
 |-- reply_size: integer (nullable = true)
 |-- file_extension: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- status_category: string (nullable = true)
 |-- size_category: string (nullable = true)



In [23]:
logs_df.show(3)

+-----+-------------------+-----------+-------------------+-----------+----------+--------------+----+-----+-----------+---------------+-------------+
| host|               date|http_method|           filename|status_code|reply_size|file_extension|year|month|day_of_week|status_category|size_category|
+-----+-------------------+-----------+-------------------+-----------+----------+--------------+----+-----+-----------+---------------+-------------+
|local|1994-10-24 15:41:41|        GET|index.html HTTP/1.0|        200|       150|          html|1994|   10|          2|            2xx|        small|
|local|1994-10-24 15:41:41|        GET|     1.gif HTTP/1.0|        200|      1210|           gif|1994|   10|          2|            2xx|        small|
|local|1994-10-24 15:43:13|        GET|index.html HTTP/1.0|        200|      3185|          html|1994|   10|          2|            2xx|        small|
+-----+-------------------+-----------+-------------------+-----------+----------+------------

## Load

In [24]:
logs_df.write.partitionBy("year").mode("overwrite").parquet("./output/logs_parquet/")

In [25]:
spark.stop()

## Analysis - Head over to log_analysis.ipynb