<a href="https://colab.research.google.com/github/priyaroy16/Web-Server-Logs-Pyspark/blob/main/Web_Server_Access_Logs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark
!pip install kaggle #Install Kaggle to upload dataset file directly

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=85ca4db4c390b912806863e486b37df7234a3c40fc4007ddd71edd8cc25d0f20
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
#Upload Kaggle API Key file (kaggle.json)
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"priyaroy1601","key":"565cac1700996162f4a687ef7051f782"}'}

In [3]:
#Download the datset directly using the below steps
!mkdir -p ~/.kaggle
!mv kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

!kaggle datasets download -d eliasdabbas/web-server-access-logs
!unzip web-server-access-logs.zip

Dataset URL: https://www.kaggle.com/datasets/eliasdabbas/web-server-access-logs
License(s): CC0-1.0
Downloading web-server-access-logs.zip to /content
 99% 264M/267M [00:03<00:00, 92.5MB/s]
100% 267M/267M [00:03<00:00, 88.8MB/s]
Archive:  web-server-access-logs.zip
  inflating: access.log              
  inflating: client_hostname.csv     


In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType
from pyspark.sql.functions import to_timestamp
import re

In [5]:
#Create Spark Session
spark = SparkSession.builder.appName("Log File Analysis").getOrCreate()

#Define Schema
schema = StructType([
    StructField("ip", dataType=StringType(), nullable=True),
    StructField("unknown_column1", dataType=StringType(), nullable=True),
    StructField("unknown_column2", dataType=StringType(), nullable=True),
    StructField("timestamp", dataType=StringType(), nullable=True),
    StructField("request", dataType=StringType(), nullable=True),
    StructField("status", dataType=StringType(), nullable=True),
    StructField("size", dataType=StringType(), nullable=True),
    StructField("referrer", dataType=StringType(), nullable=True),
    StructField("user_agent", dataType=StringType(), nullable=True),
    StructField("unknown_column3", dataType=StringType(), nullable=True)
])

In [6]:
def parse_log_file(logs):
  # Regex match for each column
  match = re.match(r'(\S+) (\S+) (\S+) \[(.*?)\] "(.*?)" (\S+) (\S+) "(.*?)" "(.*?)" "(.*?)"', logs)
  if match:
    return (match.group(1), match.group(2), match.group(3), match.group(4), match.group(5), match.group(6), match.group(7), match.group(8), match.group(9), match.group(10))
  else:
    return ("-","-","-","-","-","-","-","-","-","-")

logFile = spark.sparkContext.textFile("/content/access.log")
parsedLogs = logFile.map(parse_log_file)

df = spark.createDataFrame(parsedLogs, schema)

In [7]:
# Drop unknown columns
df = df.drop('unknown_column1', 'unknown_column2', 'unknown_column3')
df.show(10)
df.cache() #Caching the dataframe since it's used frequently for better performance

+-------------+--------------------+--------------------+------+-----+--------------------+--------------------+
|           ip|           timestamp|             request|status| size|            referrer|          user_agent|
+-------------+--------------------+--------------------+------+-----+--------------------+--------------------+
| 54.36.149.41|22/Jan/2019:03:56...|GET /filter/27|13...|   200|30577|                   -|Mozilla/5.0 (comp...|
|  31.56.96.51|22/Jan/2019:03:56...|GET /image/60844/...|   200| 5667|https://www.zanbi...|Mozilla/5.0 (Linu...|
|  31.56.96.51|22/Jan/2019:03:56...|GET /image/61474/...|   200| 5379|https://www.zanbi...|Mozilla/5.0 (Linu...|
|40.77.167.129|22/Jan/2019:03:56...|GET /image/14925/...|   200| 1696|                   -|Mozilla/5.0 (comp...|
|  91.99.72.15|22/Jan/2019:03:56...|GET /product/3189...|   200|41483|                   -|Mozilla/5.0 (Wind...|
|40.77.167.129|22/Jan/2019:03:56...|GET /image/23488/...|   200| 2654|                   -|Mozil

DataFrame[ip: string, timestamp: string, request: string, status: string, size: string, referrer: string, user_agent: string]

In [8]:
# Knowing the number of cores in cluster helps in partitioning
num_cores = spark.sparkContext.defaultParallelism
print(num_cores)

# Each core can have 2-4 partitions. We'll make 2*3=6 partitions here
df = df.repartition(6)

2


In [9]:
# Perform data queries

# 1. Count the number of occurrences of each HTTP status code
# 2. Filter logs with codes not 200
# 3. Find the top 10 IP addresses making requests and whether they are malicious or not
# 4. Identify Potential Unauthorized Access Attempts (401)
# 5. Detect Multiple Failed Login Attempts
# 6. Find Most Frequently Accessed Sensitive Endpoints (/admin)
# 7. Identify Large File Downloads

In [10]:
# 1. Count the number of occurrences of each HTTP status code
statusCodes = df.groupby('status').count().sort("count", ascending=False)
statusCodes.show()

+------+-------+
|status|  count|
+------+-------+
|   200|9579825|
|   304| 340228|
|   302| 199835|
|   404| 105011|
|   301|  67553|
|   499|  50852|
|   500|  14266|
|   403|   5634|
|   502|    798|
|   400|    586|
|   401|    323|
|   408|    112|
|   504|    103|
|   414|     17|
|   405|      6|
|   206|      3|
+------+-------+



In [11]:
# 2. Show list of unsuccessful requests (Status code not 200)
unsuccessfulRequests = df.filter(df.status != '200')
unsuccessfulRequests.select('ip', 'request', 'status', 'size', 'referrer').show()

+--------------+--------------------+------+-----+--------------------+
|            ip|             request|status| size|            referrer|
+--------------+--------------------+------+-----+--------------------+
| 66.249.66.194|GET /m/filter?f=p...|   302|    0|                   -|
| 66.249.66.194|GET /m/filter/b12...|   302|    0|                   -|
|178.131.92.246|      GET / HTTP/1.1|   302|    0|                   -|
| 66.249.66.194|GET /product/2170...|   302|    0|                   -|
| 66.249.66.194|GET /product/4406...|   404|32461|                   -|
| 66.249.66.194|GET /m/filter?f=b...|   302|    0|                   -|
|  37.9.113.152|GET /filter/p4510...|   403|  134|                   -|
|  5.160.157.20|GET /filter?page=...|   301|  178|                   -|
|  5.78.195.160|GET /static/image...|   304|    0|https://www.zanbi...|
|37.153.179.172|GET /m/browse/mat...|   499|    0|                   -|
| 66.249.66.194|GET /filter/p6001...|   302|    0|              

In [12]:
# Check if an IP is malicious or not using Virustotal Public API
import requests
import json
import os
def check_ip_malicious(ip):
    apiurl="https://www.virustotal.com/api/v3/ip_addresses/"+ip
    header={
        "Accept":"application/json",
        "x-apikey":"cb1b72839deaff921f30c68577ec11c5110fa890eb79e1e7949543dda5eaf3ec"
        }
    response=requests.get(url=apiurl,headers=header)
    data = response.json()
    malicious = data['data']['attributes']['last_analysis_stats']['malicious']
    if malicious > 0:
        return True
    else:
        return False

In [13]:
# 3. Find the top 10 IP addresses making requests and whether they are malicious or not
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
topIPs = df.groupby('ip').count().sort("count", ascending=False).limit(10)

# Convert the check_ip_malicious function to a UDF
check_ip_udf = udf(check_ip_malicious, BooleanType())

# Apply the UDF to create a new column 'isMalicious'
topIPs = topIPs.withColumn('isMalicious', check_ip_udf(topIPs['ip']))

topIPs.show()

+---------------+------+-----------+
|             ip| count|isMalicious|
+---------------+------+-----------+
|  66.249.66.194|353483|      false|
|   66.249.66.91|314522|      false|
|151.239.241.163| 92475|      false|
|   66.249.66.92| 88332|      false|
|    91.99.30.32| 45979|      false|
|  104.222.32.91| 42058|      false|
|    91.99.72.15| 38694|      false|
|    91.99.47.57| 38612|      false|
|   5.78.190.233| 37204|      false|
|195.181.168.181| 27979|       true|
+---------------+------+-----------+



In [14]:
# 4. Identify Potential Unauthorized Access Attempts (401) and if the IP is malicious or not
unauthorizedRequests = df.filter(df.status==401).groupby('ip').count().orderBy('count', ascending=False)

unauthorizedRequests = unauthorizedRequests.withColumn('isMalicious', check_ip_udf(unauthorizedRequests['ip'])).show()

+---------------+-----+-----------+
|             ip|count|isMalicious|
+---------------+-----+-----------+
|151.239.241.163|  220|      false|
|    91.99.47.57|   34|      false|
|   5.78.190.233|   23|      false|
|    91.99.30.32|   17|      false|
|  134.19.177.23|    6|      false|
|  162.223.91.56|    5|      false|
|  134.19.177.19|    5|      false|
|  104.222.32.84|    4|      false|
|  5.117.116.238|    3|      false|
|  134.19.177.22|    3|      false|
|  134.19.177.18|    1|      false|
|  5.114.144.207|    1|      false|
|  2.176.138.136|    1|       true|
+---------------+-----+-----------+



In [15]:
# 5. Detect Multiple Failed Login Attempts
from pyspark.sql.functions import count, col, max
multipleFailedLogins = df.filter(df.status==401).groupby('ip').agg(count('ip').alias('count')).filter(col('count')>1).show()

+---------------+-----+
|             ip|count|
+---------------+-----+
|    91.99.47.57|   34|
|    91.99.30.32|   17|
|  5.117.116.238|    3|
|  162.223.91.56|    5|
|151.239.241.163|  220|
|   5.78.190.233|   23|
|  134.19.177.22|    3|
|  134.19.177.19|    5|
|  104.222.32.84|    4|
|  134.19.177.23|    6|
+---------------+-----+



In [16]:
# 6. Find Most Frequently Accessed Sensitive Endpoints (/admin)
adminAccess = df.filter(df.referrer.like('%/admin%')).show(truncate=False)

+-------------+--------------------------+-------------------------------------------------------------------------+------+-----+---------------------------+-----------------------------------------------------------------------------------------------------------------------+
|ip           |timestamp                 |request                                                                  |status|size |referrer                   |user_agent                                                                                                             |
+-------------+--------------------------+-------------------------------------------------------------------------+------+-----+---------------------------+-----------------------------------------------------------------------------------------------------------------------+
|134.19.177.21|22/Jan/2019:07:22:25 +0330|GET /static/images/relashionship.png HTTP/1.1                            |200   |5946 |https://www.zanbil.ir/admin|Mozilla/5

In [17]:
# 7. Identify Large File Downloads
df = df.withColumn("size", df["size"].cast(IntegerType())) # Need to cast size column to Integer to do 'max' computations
maxSize = df.select(max(df.size).alias('maxSize'))

LargeFiles = df.filter(df.size >= 1100000).show()

+--------------+--------------------+--------------------+------+-------+--------------------+--------------------+
|            ip|           timestamp|             request|status|   size|            referrer|          user_agent|
+--------------+--------------------+--------------------+------+-------+--------------------+--------------------+
|   2.183.8.105|22/Jan/2019:13:38...|GET /image/5801?n...|   200|1126965|https://www.zanbi...|Mozilla/5.0 (Wind...|
|    5.75.13.81|24/Jan/2019:19:55...|GET /image/5801?n...|   200|1126965|https://www.zanbi...|Mozilla/5.0 (Wind...|
|  5.126.31.243|25/Jan/2019:15:31...|GET /image/5801?n...|   200|1126965|https://www.zanbi...|Mozilla/5.0 (Wind...|
| 151.239.64.91|25/Jan/2019:17:29...|GET /image/5801?n...|   200|1126703|https://www.zanbi...|Mozilla/5.0 (Linu...|
|85.133.194.178|25/Jan/2019:22:55...|GET /image/5801?n...|   200|1126965|https://www.zanbi...|Mozilla/5.0 (Maci...|
| 185.110.28.34|22/Jan/2019:11:26...|GET /image/5801?n...|   200|1126965

In [19]:
#8 Detect Unusual Activity Patterns. Show all the IPs which have made huge amount of requests in a 5 minute window
from pyspark.sql.functions import window
df = df.withColumn('timestamp', to_timestamp(df['timestamp'], 'dd/MMM/yyyy:HH:mm:ss Z'))

df.groupby('ip', window('timestamp', '5 minutes')).count().filter(col('count') > 100).orderBy(col('count').desc()).show(truncate=False)

+--------------+------------------------------------------+-----+
|ip            |window                                    |count|
+--------------+------------------------------------------+-----+
|46.100.209.231|{2019-01-23 09:10:00, 2019-01-23 09:15:00}|1178 |
|2.188.27.28   |{2019-01-26 09:55:00, 2019-01-26 10:00:00}|1115 |
|91.98.38.74   |{2019-01-23 06:15:00, 2019-01-23 06:20:00}|985  |
|94.183.36.205 |{2019-01-22 04:50:00, 2019-01-22 04:55:00}|951  |
|66.249.66.91  |{2019-01-23 18:45:00, 2019-01-23 18:50:00}|914  |
|104.222.32.91 |{2019-01-26 15:45:00, 2019-01-26 15:50:00}|855  |
|104.222.32.91 |{2019-01-26 16:00:00, 2019-01-26 16:05:00}|854  |
|104.222.32.91 |{2019-01-26 15:50:00, 2019-01-26 15:55:00}|846  |
|104.222.32.91 |{2019-01-26 15:55:00, 2019-01-26 16:00:00}|842  |
|104.222.32.91 |{2019-01-26 16:10:00, 2019-01-26 16:15:00}|819  |
|104.222.32.91 |{2019-01-26 15:35:00, 2019-01-26 15:40:00}|812  |
|104.222.32.91 |{2019-01-26 15:40:00, 2019-01-26 15:45:00}|797  |
|104.222.3