In [103]:
# Create a SparkSession
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName("pyspark-notebook-log-analysis")
    .master("spark://spark-master:7077")
    .config("spark.executor.memory", "512m")
    .getOrCreate()
)

In [104]:
spark

In [105]:
# Read the access.log file
df = (
    spark
    .read
    .text("hdfs://namenode:9000/access.log")
)
# Print the schema
df.printSchema()

root
 |-- value: string (nullable = true)



In [106]:
df.show(10, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                         |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|185.191.171.19 - - [08/Nov/2020:04:00:09 +0100] "GET /pl/pracownicy/pracownik.php?cID=250&print=1&s=publikacje HTTP/1.1" 200 1127 "-

In [107]:
print((df.count(), len(df.columns)))

(14946, 1)


In [108]:
sample_logs = [item['value'] for item in df.take(15)]
sample_logs

['185.191.171.19 - - [08/Nov/2020:04:00:09 +0100] "GET /pl/pracownicy/pracownik.php?cID=250&print=1&s=publikacje HTTP/1.1" 200 1127 "-" "Mozilla/5.0 (compatible; SemrushBot/6~bl; +http://www.semrush.com/bot.html)"',
 '185.191.171.43 - - [08/Nov/2020:04:01:08 +0100] "GET /pl/odnosniki/ HTTP/1.1" 200 17158 "-" "Mozilla/5.0 (compatible; SemrushBot/6~bl; +http://www.semrush.com/bot.html)"',
 '185.191.171.17 - - [08/Nov/2020:04:01:21 +0100] "GET /en/pracownicy/pracownik.php?cID=66&print=1&s=pracownik HTTP/1.1" 200 1127 "-" "Mozilla/5.0 (compatible; SemrushBot/6~bl; +http://www.semrush.com/bot.html)"',
 '13.66.139.2 - - [08/Nov/2020:04:01:26 +0100] "GET /pl/jednostki/jednostka.php?cID=22 HTTP/1.1" 200 11652 "-" "Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)"',
 '114.119.135.32 - - [08/Nov/2020:04:01:35 +0100] "GET /en/pracownicy/pracownik.php?cID=243&s=seminaria HTTP/1.1" 200 6224 "-" "Mozilla/5.0 (Linux; Android 7.0;) AppleWebKit/537.36 (KHTML, like Gecko) Mobile S

In [109]:
import re

host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
hosts = [re.search(host_pattern, item).group(1)
           if re.search(host_pattern, item)
           else 'no match'
           for item in sample_logs]
hosts

['185.191.171.19',
 '185.191.171.43',
 '185.191.171.17',
 '13.66.139.2',
 '114.119.135.32',
 '185.191.171.18',
 '114.119.138.170',
 '114.119.141.67',
 '185.191.171.7',
 '185.191.171.38',
 '185.191.171.7',
 '185.191.171.24',
 '185.191.171.26',
 '54.36.148.93',
 '54.36.148.11']

In [110]:
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} \+\d{4})\]'
timestamps = [
    re.search(ts_pattern, item).group(1)
    if re.search(ts_pattern, item)
    else 'no match' for item in sample_logs
]
timestamps

['08/Nov/2020:04:00:09 +0100',
 '08/Nov/2020:04:01:08 +0100',
 '08/Nov/2020:04:01:21 +0100',
 '08/Nov/2020:04:01:26 +0100',
 '08/Nov/2020:04:01:35 +0100',
 '08/Nov/2020:04:01:38 +0100',
 '08/Nov/2020:04:01:59 +0100',
 '08/Nov/2020:04:02:45 +0100',
 '08/Nov/2020:04:02:49 +0100',
 '08/Nov/2020:04:02:56 +0100',
 '08/Nov/2020:04:03:39 +0100',
 '08/Nov/2020:04:03:44 +0100',
 '08/Nov/2020:04:04:11 +0100',
 '08/Nov/2020:04:04:24 +0100',
 '08/Nov/2020:04:04:25 +0100']

In [111]:
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_uri_protocol = [
    re.search(method_uri_protocol_pattern, item).groups()
    if re.search(method_uri_protocol_pattern, item)
    else 'no match'
    for item in sample_logs
]
method_uri_protocol

[('GET',
  '/pl/pracownicy/pracownik.php?cID=250&print=1&s=publikacje',
  'HTTP/1.1'),
 ('GET', '/pl/odnosniki/', 'HTTP/1.1'),
 ('GET',
  '/en/pracownicy/pracownik.php?cID=66&print=1&s=pracownik',
  'HTTP/1.1'),
 ('GET', '/pl/jednostki/jednostka.php?cID=22', 'HTTP/1.1'),
 ('GET', '/en/pracownicy/pracownik.php?cID=243&s=seminaria', 'HTTP/1.1'),
 ('GET', '/en/pracownicy/pracownik.php?cID=257&s=redaktorstwo', 'HTTP/1.1'),
 ('GET',
  '/en/pracownicy/pracownik.php?cID=252&s=redaktorstwo&print=1',
  'HTTP/1.1'),
 ('GET', '/pl/jednostki/jednostka.php?cID=4&s=jednostka', 'HTTP/1.1'),
 ('GET',
  '/pl/jednostki/jednostka.php?cID=18&print=1&s=redaktorstwo',
  'HTTP/1.1'),
 ('GET',
  '/en/pracownicy/pracownik.php?cID=62&print=1&s=redaktorstwo',
  'HTTP/1.1'),
 ('GET',
  '/en/jednostki/jednostka.php?cID=12&print=1&s=konferencje',
  'HTTP/1.1'),
 ('GET', '/pl/ogloszenie.php?PKV=395', 'HTTP/1.1'),
 ('GET', '/en/pracownicy/pracownik.php?cID=115', 'HTTP/1.1'),
 ('GET', '/robots.txt', 'HTTP/1.1'),
 ('GE

In [112]:
status_pattern = r'\s(\d{3})\s'
status = [re.search(status_pattern, item).group(1) for item in sample_logs]
print(status)

['200', '200', '200', '200', '200', '200', '200', '200', '200', '200', '200', '200', '200', '200', '302']


In [113]:
content_size_pattern = r'(\d+) \"\-\"'
content_size = [re.search(content_size_pattern, item).group(1) if re.search(content_size_pattern, item) else "no-match" for item in sample_logs]
print(content_size)

['1127', '17158', '1127', '11652', '6224', '6462', '1127', '8158', '1127', '1127', '1127', '7064', '7984', '160', 'no-match']


In [114]:
from pyspark.sql.functions import regexp_extract

logs_df = df.select(
    regexp_extract('value', host_pattern, 1).alias('host'),
    regexp_extract('value', ts_pattern, 1).alias('timestamp'),
    regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
    regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
    regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
    regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
    regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size')
)
logs_df.show(10, truncate=True)

+---------------+--------------------+------+--------------------+--------+------+------------+
|           host|           timestamp|method|            endpoint|protocol|status|content_size|
+---------------+--------------------+------+--------------------+--------+------+------------+
| 185.191.171.19|08/Nov/2020:04:00...|   GET|/pl/pracownicy/pr...|HTTP/1.1|   200|        1127|
| 185.191.171.43|08/Nov/2020:04:01...|   GET|      /pl/odnosniki/|HTTP/1.1|   200|       17158|
| 185.191.171.17|08/Nov/2020:04:01...|   GET|/en/pracownicy/pr...|HTTP/1.1|   200|        1127|
|    13.66.139.2|08/Nov/2020:04:01...|   GET|/pl/jednostki/jed...|HTTP/1.1|   200|       11652|
| 114.119.135.32|08/Nov/2020:04:01...|   GET|/en/pracownicy/pr...|HTTP/1.1|   200|        6224|
| 185.191.171.18|08/Nov/2020:04:01...|   GET|/en/pracownicy/pr...|HTTP/1.1|   200|        6462|
|114.119.138.170|08/Nov/2020:04:01...|   GET|/en/pracownicy/pr...|HTTP/1.1|   200|        1127|
| 114.119.141.67|08/Nov/2020:04:02...|  

In [115]:
logs_df.count()

14946

In [116]:
df.filter(df['value'].isNull()).count()

0

In [117]:
bad_rows_df = logs_df.filter(
    logs_df['host'].isNull()| 
    logs_df['timestamp'].isNull() | 
    logs_df['method'].isNull() |
    logs_df['endpoint'].isNull() |
    logs_df['status'].isNull() |
    logs_df['content_size'].isNull()|
    logs_df['protocol'].isNull()
)
bad_rows_df.count()

6071

In [118]:
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum

def count_null(col_name):
    return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

logs_df.agg(*(count_null(col_name) for col_name in logs_df.columns)).show()

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     0|        6071|
+----+---------+------+--------+--------+------+------------+



In [80]:
logs_df = logs_df.na.fill({'content_size': 0})
logs_df.agg(*(count_null(col_name) for col_name in logs_df.columns)).show()

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     0|           0|
+----+---------+------+--------+--------+------+------------+



In [119]:
from pyspark.sql.functions import udf

month_map = {
  'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
  'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12
}


@udf
def parse_clf_time(text):
    """
    Convert Common Log time format into a Python datetime object
    Args:
        text (str): date and time in Apache time format [dd/mmm/yyyy:hh:mm:ss (+/-)zzzz]
    Returns:
        a string suitable for passing to CAST('timestamp')
    """
    return "{}-{}-{} {}:{}:{} {}".format(
        text[7:11],
        month_map[text[3:6]],
        text[0:2],
        text[12:14],
        text[15:17],
        text[18:20],
        text[21:26]
    )

In [120]:
logs_df = (
    logs_df.select(
        '*',
        (
            parse_clf_time(
                logs_df['timestamp']
            )
            .cast('timestamp')
            .alias('time')
        )
    )
).drop('timestamp')

logs_df.show(10, truncate=True)

+---------------+------+--------------------+--------+------+------------+-------------------+
|           host|method|            endpoint|protocol|status|content_size|               time|
+---------------+------+--------------------+--------+------+------------+-------------------+
| 185.191.171.19|   GET|/pl/pracownicy/pr...|HTTP/1.1|   200|        1127|2020-11-08 03:00:09|
| 185.191.171.43|   GET|      /pl/odnosniki/|HTTP/1.1|   200|       17158|2020-11-08 03:01:08|
| 185.191.171.17|   GET|/en/pracownicy/pr...|HTTP/1.1|   200|        1127|2020-11-08 03:01:21|
|    13.66.139.2|   GET|/pl/jednostki/jed...|HTTP/1.1|   200|       11652|2020-11-08 03:01:26|
| 114.119.135.32|   GET|/en/pracownicy/pr...|HTTP/1.1|   200|        6224|2020-11-08 03:01:35|
| 185.191.171.18|   GET|/en/pracownicy/pr...|HTTP/1.1|   200|        6462|2020-11-08 03:01:38|
|114.119.138.170|   GET|/en/pracownicy/pr...|HTTP/1.1|   200|        1127|2020-11-08 03:01:59|
| 114.119.141.67|   GET|/pl/jednostki/jed...|HTTP/

In [121]:
logs_df.printSchema()

root
 |-- host: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- content_size: integer (nullable = true)
 |-- time: timestamp (nullable = true)



In [122]:
logs_df.cache()

DataFrame[host: string, method: string, endpoint: string, protocol: string, status: int, content_size: int, time: timestamp]

In [123]:
content_size_summary_df = logs_df.describe(['content_size'])
content_size_summary_df.toPandas()

Unnamed: 0,summary,content_size
0,count,8875.0
1,mean,61475.26591549296
2,stddev,682742.6532386552
3,min,1.0
4,max,25566028.0


In [124]:
status_freq_df = (
    logs_df
    .groupBy('status')
    .count()
    .sort('status')
    .cache()
)
print('Total distinct HTTP Status Codes:', status_freq_df.count()) 

Total distinct HTTP Status Codes: 10


In [125]:
status_freq_df.show()

+------+-----+
|status|count|
+------+-----+
|   200|12040|
|   206|  106|
|   301|   39|
|   302|  162|
|   304|  135|
|   400|   19|
|   401|    2|
|   403|    5|
|   404| 1259|
|   408| 1179|
+------+-----+



In [126]:
status_freq_pd_df = (
    status_freq_df
    .toPandas()
    .sort_values(
        by=['count'],
        ascending=False
    )
)
status_freq_pd_df

Unnamed: 0,status,count
0,200,12040
8,404,1259
9,408,1179
3,302,162
4,304,135
1,206,106
2,301,39
5,400,19
7,403,5
6,401,2


In [127]:
host_sum_df =(
    logs_df
    .groupBy('host')
    .count()
    .sort('count', ascending=False)
    .limit(10)
)

In [128]:
host_sum_df.show()

+--------------+-----+
|          host|count|
+--------------+-----+
|  89.64.71.249|  224|
|   5.45.207.83|  137|
|  54.36.149.17|  123|
| 95.216.96.242|   99|
|23.100.232.233|   95|
|216.244.66.244|   92|
| 185.191.171.4|   90|
|185.191.171.43|   89|
|185.191.171.24|   89|
|185.191.171.44|   86|
+--------------+-----+



In [129]:
paths_df = (
    logs_df
    .groupBy('endpoint')
    .count()
    .sort('count', ascending=False).limit(20)
)

paths_df.show(truncate=False)    

+-----------------------------------------------------+-----+
|endpoint                                             |count|
+-----------------------------------------------------+-----+
|"-                                                   |1185 |
|/favicon.ico                                         |874  |
|/                                                    |398  |
|/robots.txt                                          |369  |
|/~randrusz/alglin/macodwr.pdf                        |204  |
|/~randrusz/alglin/lzespolone.pdf                     |191  |
|/~mariusz/share/classes/md/matematyka_dyskretna-w.pdf|163  |
|/style/base.css                                      |122  |
|/script/messages-pl.js                               |116  |
|/style/layout-normal.css                             |112  |
|/style/common.css                                    |110  |
|/script/base.js                                      |108  |
|/assets/body-bg.png                                  |100  |
|/assets

In [130]:
not200_df = (
    logs_df
    .filter(logs_df['status'] != 200)
)

error_endpoints_freq_df = (
    not200_df
    .groupBy('endpoint')
    .count()
    .sort('count', ascending=False)
    .limit(10)
)
                          
error_endpoints_freq_df.show(truncate=False)           

+-----------------------------------------------------+-----+
|endpoint                                             |count|
+-----------------------------------------------------+-----+
|"-                                                   |1185 |
|/favicon.ico                                         |874  |
|/apple-touch-icon.png                                |74   |
|/apple-touch-icon-precomposed.png                    |74   |
|/script/dialog.js                                    |20   |
|/currentsetting.htm                                  |19   |
|/~mariusz/share/classes/md/matematyka_dyskretna-w.pdf|15   |
|/~randrusz/alglin/lzespolone.pdf                     |13   |
|/~uostasze/lista3_mf_iie_16.pdf                      |9    |
|/vendor/phpunit/phpunit/src/Util/PHP/eval-stdin.php  |8    |
+-----------------------------------------------------+-----+



In [131]:
import pyspark.sql.functions as F

host_day_df = (
    logs_df
    .select(
        logs_df.host, 
        F.dayofmonth('time').alias('day')
    )
)
host_day_df.show(10, truncate=False)

+---------------+---+
|host           |day|
+---------------+---+
|185.191.171.19 |8  |
|185.191.171.43 |8  |
|185.191.171.17 |8  |
|13.66.139.2    |8  |
|114.119.135.32 |8  |
|185.191.171.18 |8  |
|114.119.138.170|8  |
|114.119.141.67 |8  |
|185.191.171.7  |8  |
|185.191.171.38 |8  |
+---------------+---+
only showing top 10 rows



In [132]:
# Remove duplicate hosts on the same day
host_day_distinct_df = (
    host_day_df
    .dropDuplicates()
)
host_day_df.show(10, truncate=False)

+---------------+---+
|host           |day|
+---------------+---+
|185.191.171.19 |8  |
|185.191.171.43 |8  |
|185.191.171.17 |8  |
|13.66.139.2    |8  |
|114.119.135.32 |8  |
|185.191.171.18 |8  |
|114.119.138.170|8  |
|114.119.141.67 |8  |
|185.191.171.7  |8  |
|185.191.171.38 |8  |
+---------------+---+
only showing top 10 rows



In [133]:
daily_hosts_df = (
    host_day_distinct_df
    .groupBy('day')
    .count()
    .sort("day")
)

daily_hosts_df.show(10, truncate=False)

+---+-----+
|day|count|
+---+-----+
|8  |2024 |
|9  |859  |
+---+-----+

