# Log File - Data Exploration

## Loading Libraries

In [None]:
from pyspark.sql import SparkSession

## Spark Session Object Creation


In [None]:
spark = SparkSession\
.builder\
.appName("pyspark-notebook").\
config("spark.sql.legacy.timeParserPolicy", "LEGACY").\
getOrCreate()

## Loading

In [None]:
log_file_path="actual_log.txt"

In [None]:
base_df = spark.read.text(log_file_path)
# View the schema
base_df.printSchema()
base_df.show(truncate=False)

root
 |-- value: string (nullable = true)

+-------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------------+
|in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839     |
|uplherc.upl.com - - [01/Aug/1995:00:00:07 -0400] "GET / HTTP/1.0" 304                                                          |
|uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 304 0                               |
|uplherc.upl.com - - [01/Aug/1995:00:00:08 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 304                               |
|uplherc.upl.com - - [01/Aug/1995:00:00:08 -040

## Parsing

 Common Log Format:

<b>remotehost rfc931 authuser date "request" status bytes</b>

<table>
    <tr>
        <th>field</th>
        <th>meaning</th>
    </tr>
    <tr>
        <td>remotehost</td>
        <td>Remote hostname (or IP number if DNS hostname is not available).</td>
    </tr>
    <tr>
        <td>rfc931</td>
        <td>The remote logname of the user. We don't really care about this field.</td>
    </tr>
    <tr>
        <td>authuser</td>
        <td>The username of the remote user, as authenticated by the HTTP server.</td>
    </tr>
    <tr>
        <td>date</td>
        <td>The date and time of the request.</td>
    </tr>
    <tr>
        <td>request</td>
        <td>The request, exactly as it came from the browser or client.</td>
    </tr>
    <tr>
        <td>status</td>
        <td>The HTTP status code the server sent back to the client.</td>
    </tr>
    <tr>
        <td>bytes</td>
        <td>The number of bytes (Content-Length) transferred to the client.</td>
    </tr>
</table>
Next, parsing it into individual columns. Use the built-in regexp_extract() function to do the parsing. This function matches a column against a regular expression with one or more capture groups and allows to extract one of the matched groups. We'll use one regular expression for each field that we'd like to extract.
<br> </br>   
Regular expression reference: <a href="https://regexone.com/">RegexOne web site</a>.

In [None]:
from pyspark.sql.functions import split, regexp_extract
split_df = base_df.select(regexp_extract('value', r'^([^\s]+\s)', 1).alias('host'),
                          regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),
                          regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),
                          regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),
                          regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))
split_df.show(truncate=False)

+----------------------------+--------------------------+---------------------------------------------------+------+------------+
|host                        |timestamp                 |path                                               |status|content_size|
+----------------------------+--------------------------+---------------------------------------------------+------+------------+
|in24.inetnebr.com           |01/Aug/1995:00:00:01 -0400|/shuttle/missions/sts-68/news/sts-68-mcc-05.txt    |200   |1839        |
|uplherc.upl.com             |01/Aug/1995:00:00:07 -0400|/                                                  |304   |null        |
|uplherc.upl.com             |01/Aug/1995:00:00:08 -0400|/images/ksclogo-medium.gif                         |304   |0           |
|uplherc.upl.com             |01/Aug/1995:00:00:08 -0400|/images/MOSAIC-logosmall.gif                       |304   |null        |
|uplherc.upl.com             |01/Aug/1995:00:00:08 -0400|/images/USA-logosmall.gif        

## Cleaning
Check that there are no null rows in the original data set.

In [None]:
base_df.filter(base_df['value'].isNull()).count()

0

Check the parsed dataframe

In [None]:
bad_rows_df = split_df.filter(split_df['host'].isNull() |
                              split_df['timestamp'].isNull() |
                              split_df['path'].isNull() |
                              split_df['status'].isNull() |
                             split_df['content_size'].isNull())
bad_rows_df.count()

2

Figuring out which fields are affected.

In [None]:
from pyspark.sql.functions import col, sum

def count_null(col_name):
  return sum(col(col_name).isNull().cast('integer')).alias(col_name)
expression = []
[expression.append(count_null(col_name)) for col_name in split_df.columns]
split_df.agg(*expression).show()

+----+---------+----+------+------------+
|host|timestamp|path|status|content_size|
+----+---------+----+------+------------+
|   0|        0|   0|     0|           2|
+----+---------+----+------+------------+



## Fix the rows with null content_size

Use na to replace <code>content_size</code> with 0.

In [None]:
cleaned_df = split_df.na.fill({'content_size': 0})
expression = []
[expression.append(count_null(col_name)) for col_name in cleaned_df.columns]

cleaned_df.agg(*expression).show()

+----+---------+----+------+------------+
|host|timestamp|path|status|content_size|
+----+---------+----+------+------------+
|   0|        0|   0|     0|           0|
+----+---------+----+------+------------+



In [None]:
from pyspark.sql.functions import *
logs_df = cleaned_df.select('*', to_timestamp(cleaned_df['timestamp'],"dd/MMM/yyyy:HH:mm:ss ZZZZ").cast('timestamp').alias('time')).drop('timestamp')
total_log_entries = logs_df.count()
print(total_log_entries)
logs_df.show(truncate=False)

20
+----------------------------+---------------------------------------------------+------+------------+-------------------+
|host                        |path                                               |status|content_size|time               |
+----------------------------+---------------------------------------------------+------+------------+-------------------+
|in24.inetnebr.com           |/shuttle/missions/sts-68/news/sts-68-mcc-05.txt    |200   |1839        |1995-08-01 04:00:01|
|uplherc.upl.com             |/                                                  |304   |0           |1995-08-01 04:00:07|
|uplherc.upl.com             |/images/ksclogo-medium.gif                         |304   |0           |1995-08-01 04:00:08|
|uplherc.upl.com             |/images/MOSAIC-logosmall.gif                       |304   |0           |1995-08-01 04:00:08|
|uplherc.upl.com             |/images/USA-logosmall.gif                          |304   |0           |1995-08-01 04:00:08|
|ix-esc-ca2-0