In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract, col, count, desc
from pyspark.sql.types import IntegerType

spark = SparkSession.builder.appName("ApacheLogAnalysis").getOrCreate()

In [3]:
log_file = "./access_log.txt"

logs_df = spark.read.text(log_file)

In [4]:
log_pattern = r'(\S+) - - \[(.*?)\] "(.*?)" (\S{3}) (\d+) "(.*?)"'

parsed_logs_df = logs_df.select(
    regexp_extract('value', log_pattern, 1).alias("ip_address"),
    regexp_extract('value', log_pattern, 2).alias("timestamp"),
    regexp_extract('value', log_pattern, 3).alias("request"),
    regexp_extract('value', log_pattern, 4).alias("status"),
    regexp_extract('value', log_pattern, 5).cast("integer").alias("bytes"),
    regexp_extract('value', log_pattern, 6).alias("user_agent")
)

In [5]:
cleaned_df = parsed_logs_df.filter(col("status").isNotNull() & (col("status") != ""))
cleaned_df = cleaned_df.withColumn("status", col("status").cast(IntegerType()))

In [6]:
parsed_logs_df = cleaned_df.withColumn("method", regexp_extract("request", r'(\S+)', 1)) \
                               .withColumn("endpoint", regexp_extract("request", r' (\S+) ', 1))

parsed_logs_df.show(10, truncate=False)

+--------------+--------------------------+----------------------------------+------+-----+----------------------------------+------+---------------------+
|ip_address    |timestamp                 |request                           |status|bytes|user_agent                        |method|endpoint             |
+--------------+--------------------------+----------------------------------+------+-----+----------------------------------+------+---------------------+
|66.249.75.159 |29/Nov/2015:03:50:05 +0000|GET /robots.txt HTTP/1.1          |200   |55   |-                                 |GET   |/robots.txt          |
|66.249.75.168 |29/Nov/2015:03:50:06 +0000|GET /blog/ HTTP/1.1               |200   |8083 |-                                 |GET   |/blog/               |
|185.71.216.232|29/Nov/2015:03:53:15 +0000|POST /wp-login.php HTTP/1.1       |200   |1691 |http://nohatenews.com/wp-login.php|POST  |/wp-login.php        |
|54.165.199.171|29/Nov/2015:04:32:27 +0000|GET /sitemap_index.xm

In [7]:
ip_count_df = parsed_logs_df.groupBy("ip_address").agg(count("*").alias("request_count")).orderBy(desc("request_count"))
ip_count_df.show(10)

+--------------+-------------+
|    ip_address|request_count|
+--------------+-------------+
| 46.166.139.20|        68487|
|54.165.199.171|         4381|
|195.154.250.88|         1723|
| 97.100.169.53|          240|
| 62.210.88.201|           81|
|  66.249.66.59|           69|
|  66.249.66.62|           54|
|   66.249.66.3|           47|
|   52.91.1.103|           39|
| 172.56.26.235|           24|
+--------------+-------------+
only showing top 10 rows



In [8]:
endpoint_count_df = parsed_logs_df.groupBy("endpoint").agg(count("*").alias("endpoint_count")).orderBy(desc("endpoint_count"))
endpoint_count_df.show(10)

+--------------------+--------------+
|            endpoint|endpoint_count|
+--------------------+--------------+
|         /xmlrpc.php|         68494|
|       /wp-login.php|          1923|
|                   /|           336|
|              /blog/|           138|
|         /robots.txt|           123|
|   /post-sitemap.xml|           118|
|  /sitemap_index.xml|           118|
|   /page-sitemap.xml|           117|
|/category-sitemap...|           117|
| /orlando-headlines/|            95|
+--------------------+--------------+
only showing top 10 rows



In [9]:
status_count_df = parsed_logs_df.groupBy("status").agg(count("*").alias("status_count")).orderBy(desc("status_count"))
status_count_df.show(10)

+------+------------+
|status|status_count|
+------+------------+
|   200|       64965|
|   500|       10714|
|   301|         159|
|   404|          24|
|   400|           2|
|   302|           2|
|   405|           1|
+------+------------+



In [10]:
spark.stop()