In [1]:
spark

In [2]:
if 'sc' not in locals():
    from pyspark.context import SparkContext
    from pyspark.sql.context import SQLContext
    from pyspark.sql.session import SparkSession
    
    sc = SparkContext()
    sqlContext = SQLContext(sc)
    spark = SparkSession(sc)

In [3]:
import re
import pandas as pd

In [4]:
import glob

raw_data_files = glob.glob('*.log')
raw_data_files

['access.log']

In [5]:

base_df = spark.read.text(raw_data_files)
base_df.printSchema()

root
 |-- value: string (nullable = true)



In [6]:
type(base_df)

pyspark.sql.dataframe.DataFrame

In [7]:
base_df.show(10, truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                       |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|51.77.140.110 - - [30/Aug/2020:06:25:42 -0400] "GET /wp-login.php HTTP/1.1" 200 5365 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"                                     |
|51.77.140.110 - - [30/Aug/2020:06:25:46 -0400] "POST /wp-login.php HTTP/1.1" 200 5496 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"   

In [8]:

sample_logs = [item['value'] for item in base_df.take(15)]
sample_logs

['51.77.140.110 - - [30/Aug/2020:06:25:42 -0400] "GET /wp-login.php HTTP/1.1" 200 5365 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"',
 '51.77.140.110 - - [30/Aug/2020:06:25:46 -0400] "POST /wp-login.php HTTP/1.1" 200 5496 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"',
 '51.77.140.110 - - [30/Aug/2020:06:25:47 -0400] "POST /xmlrpc.php HTTP/1.1" 200 3591 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"',
 '5.135.159.189 - - [30/Aug/2020:06:48:47 -0400] "GET /wp-login.php HTTP/1.1" 200 5365 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"',
 '5.135.159.189 - - [30/Aug/2020:06:48:50 -0400] "POST /wp-login.php HTTP/1.1" 200 5496 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:62.0) Gecko/20100101 Firefox/62.0"',
 '5.135.159.189 - - [30/Aug/2020:06:48:51 -0400] "POST /xmlrpc.php HTTP/1.1" 200 3591 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:62.0)

In [9]:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
hosts = [re.search(host_pattern, item).group(1)
           if re.search(host_pattern, item)
           else 'no match'
           for item in sample_logs]
hosts

['51.77.140.110',
 '51.77.140.110',
 '51.77.140.110',
 '5.135.159.189',
 '5.135.159.189',
 '5.135.159.189',
 '192.241.225.120',
 '106.38.241.168',
 '138.197.174.39',
 '138.197.174.39',
 '138.197.174.39',
 '73.124.254.3',
 '35.204.152.99',
 '35.204.152.99',
 '35.204.152.99']

In [10]:
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
timestamps = [re.search(ts_pattern, item).group(1) for item in sample_logs]
timestamps

['30/Aug/2020:06:25:42 -0400',
 '30/Aug/2020:06:25:46 -0400',
 '30/Aug/2020:06:25:47 -0400',
 '30/Aug/2020:06:48:47 -0400',
 '30/Aug/2020:06:48:50 -0400',
 '30/Aug/2020:06:48:51 -0400',
 '30/Aug/2020:06:58:31 -0400',
 '30/Aug/2020:07:11:09 -0400',
 '30/Aug/2020:07:11:37 -0400',
 '30/Aug/2020:07:11:37 -0400',
 '30/Aug/2020:07:11:38 -0400',
 '30/Aug/2020:07:12:15 -0400',
 '30/Aug/2020:07:12:15 -0400',
 '30/Aug/2020:07:12:16 -0400',
 '30/Aug/2020:07:12:17 -0400']

In [11]:
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
method_uri_protocol = [re.search(method_uri_protocol_pattern, item).groups()
               if re.search(method_uri_protocol_pattern, item)
               else 'no match'
              for item in sample_logs]
method_uri_protocol

[('GET', '/wp-login.php', 'HTTP/1.1'),
 ('POST', '/wp-login.php', 'HTTP/1.1'),
 ('POST', '/xmlrpc.php', 'HTTP/1.1'),
 ('GET', '/wp-login.php', 'HTTP/1.1'),
 ('POST', '/wp-login.php', 'HTTP/1.1'),
 ('POST', '/xmlrpc.php', 'HTTP/1.1'),
 ('GET', '/', 'HTTP/1.1'),
 ('GET', '/', 'HTTP/1.1'),
 ('GET', '/', 'HTTP/1.1'),
 ('GET', '/favicon.ico', 'HTTP/1.1'),
 ('GET', '/', 'HTTP/1.1'),
 ('POST',
  '/wp-cron.php?doing_wp_cron=1598785935.1434218883514404296875',
  'HTTP/1.1'),
 ('GET', '/wp-login.php', 'HTTP/1.1'),
 ('POST', '/wp-login.php', 'HTTP/1.1'),
 ('POST', '/xmlrpc.php', 'HTTP/1.1')]

In [12]:
from pyspark.sql.functions import regexp_extract

logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'))
logs_df.show(10, truncate=False)
print((logs_df.count(), len(logs_df.columns)))

+---------------+--------------------------+------+-------------+--------+
|host           |timestamp                 |method|endpoint     |protocol|
+---------------+--------------------------+------+-------------+--------+
|51.77.140.110  |30/Aug/2020:06:25:42 -0400|GET   |/wp-login.php|HTTP/1.1|
|51.77.140.110  |30/Aug/2020:06:25:46 -0400|POST  |/wp-login.php|HTTP/1.1|
|51.77.140.110  |30/Aug/2020:06:25:47 -0400|POST  |/xmlrpc.php  |HTTP/1.1|
|5.135.159.189  |30/Aug/2020:06:48:47 -0400|GET   |/wp-login.php|HTTP/1.1|
|5.135.159.189  |30/Aug/2020:06:48:50 -0400|POST  |/wp-login.php|HTTP/1.1|
|5.135.159.189  |30/Aug/2020:06:48:51 -0400|POST  |/xmlrpc.php  |HTTP/1.1|
|192.241.225.120|30/Aug/2020:06:58:31 -0400|GET   |/            |HTTP/1.1|
|106.38.241.168 |30/Aug/2020:07:11:09 -0400|GET   |/            |HTTP/1.1|
|138.197.174.39 |30/Aug/2020:07:11:37 -0400|GET   |/            |HTTP/1.1|
|138.197.174.39 |30/Aug/2020:07:11:37 -0400|GET   |/favicon.ico |HTTP/1.1|
+---------------+--------

In [13]:
status_pattern = r'\s(\d{3})\s'
status = [re.search(status_pattern, item).group(1) for item in sample_logs]
print(status)

['200', '200', '200', '200', '200', '200', '200', '301', '200', '404', '200', '200', '200', '200', '200']


In [15]:
logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status')
                        )
logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))

+---------------+--------------------+------+-------------+--------+------+
|           host|           timestamp|method|     endpoint|protocol|status|
+---------------+--------------------+------+-------------+--------+------+
|  51.77.140.110|30/Aug/2020:06:25...|   GET|/wp-login.php|HTTP/1.1|   200|
|  51.77.140.110|30/Aug/2020:06:25...|  POST|/wp-login.php|HTTP/1.1|   200|
|  51.77.140.110|30/Aug/2020:06:25...|  POST|  /xmlrpc.php|HTTP/1.1|   200|
|  5.135.159.189|30/Aug/2020:06:48...|   GET|/wp-login.php|HTTP/1.1|   200|
|  5.135.159.189|30/Aug/2020:06:48...|  POST|/wp-login.php|HTTP/1.1|   200|
|  5.135.159.189|30/Aug/2020:06:48...|  POST|  /xmlrpc.php|HTTP/1.1|   200|
|192.241.225.120|30/Aug/2020:06:58...|   GET|            /|HTTP/1.1|   200|
| 106.38.241.168|30/Aug/2020:07:11...|   GET|            /|HTTP/1.1|   301|
| 138.197.174.39|30/Aug/2020:07:11...|   GET|            /|HTTP/1.1|   200|
| 138.197.174.39|30/Aug/2020:07:11...|   GET| /favicon.ico|HTTP/1.1|   404|
+-----------

In [16]:
base_df.filter(base_df['value'].isNull()).count()

0

In [20]:

bad_rows_df = logs_df.filter(logs_df['host'].isNull()| 
                             logs_df['timestamp'].isNull() | 
                             logs_df['method'].isNull() |
                             logs_df['endpoint'].isNull() |
                             logs_df['status'].isNull() |
                             logs_df['protocol'].isNull())
bad_rows_df.count()

0

In [21]:
logs_df.columns

['host', 'timestamp', 'method', 'endpoint', 'protocol', 'status']

In [23]:
logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                         regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status')
                        )
logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))

+---------------+--------------------+------+-------------+--------+------+
|           host|           timestamp|method|     endpoint|protocol|status|
+---------------+--------------------+------+-------------+--------+------+
|  51.77.140.110|30/Aug/2020:06:25...|   GET|/wp-login.php|HTTP/1.1|   200|
|  51.77.140.110|30/Aug/2020:06:25...|  POST|/wp-login.php|HTTP/1.1|   200|
|  51.77.140.110|30/Aug/2020:06:25...|  POST|  /xmlrpc.php|HTTP/1.1|   200|
|  5.135.159.189|30/Aug/2020:06:48...|   GET|/wp-login.php|HTTP/1.1|   200|
|  5.135.159.189|30/Aug/2020:06:48...|  POST|/wp-login.php|HTTP/1.1|   200|
|  5.135.159.189|30/Aug/2020:06:48...|  POST|  /xmlrpc.php|HTTP/1.1|   200|
|192.241.225.120|30/Aug/2020:06:58...|   GET|            /|HTTP/1.1|   200|
| 106.38.241.168|30/Aug/2020:07:11...|   GET|            /|HTTP/1.1|   301|
| 138.197.174.39|30/Aug/2020:07:11...|   GET|            /|HTTP/1.1|   200|
| 138.197.174.39|30/Aug/2020:07:11...|   GET| /favicon.ico|HTTP/1.1|   404|
+-----------