In [1]:
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession
    
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

23/05/11 15:32:00 WARN Utils: Your hostname, dev-2.local resolves to a loopback address: 127.0.2.3; using fd01:db8:1111:0:0:0:0:3 instead (on interface lo0)
23/05/11 15:32:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/11 15:32:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/11 15:32:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
import re
from datetime import datetime

In [1]:
import glob

raw_data_files = glob.glob('data/*.gz')
raw_data_files

['data/NASA_access_log_Jul95.gz', 'data/NASA_access_log_Aug95.gz']

In [6]:
raw_logs_df = spark.read.text(raw_data_files)
raw_logs_df

DataFrame[value: string]

### Viewing sample data in our dataframe

In [6]:
raw_logs_df.take(10)


[Stage 0:>                                                          (0 + 1) / 1]

                                                                                

[Row(value='199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245'),
 Row(value='unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985'),
 Row(value='199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085'),
 Row(value='burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0'),
 Row(value='199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179'),
 Row(value='burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0'),
 Row(value='burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0'),
 Row(value='205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985'),
 Row(value='d104.aa.net - - [01/Ju

In [7]:
print((raw_logs_df.count(), len(raw_logs_df.columns)))


[Stage 1:>                                                          (0 + 2) / 2]

(3461613, 1)



                                                                                

In [9]:
a_log = raw_logs_df.first()
a_log

                                                                                

Row(value='199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245')

In [10]:
re_host = '(^\S+\.[\S+\.]+\S+2)'
re_time = '\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
re_method_uri_protocol = '\"(\S+)\s(\S+)\s*(\S*)\"'
re_status = '\s(\d{3})\s'
re_content_size = '(\d+)$'
log_pattern = f'{re_host}\s-\s-\s{re_time}\s{re_method_uri_protocol}{re_status}{re_content_size}'


In [11]:
match = re.search(log_pattern, a_log.value)
print(match)

None


In [12]:
from pyspark.sql import functions as F
from pyspark.sql import types as SparkTypes

parsed_logs_df = raw_logs_df.select(F.regexp_extract('value', re_host, 1).alias('host'),
                        F.regexp_extract('value', re_time, 1).alias('timestamp'),
                        F.regexp_extract('value', re_method_uri_protocol, 1).alias('method'),
                        F.regexp_extract('value', re_method_uri_protocol, 2).alias('endpoint'),
                        F.regexp_extract('value', re_method_uri_protocol, 3).alias('protocol'),
                        F.regexp_extract('value', re_status, 1).alias('status'),
                        F.regexp_extract('value', re_content_size, 1).alias('content_size'),
                        )
parsed_logs_df.cache()

DataFrame[host: string, timestamp: string, method: string, endpoint: string, protocol: string, status: string, content_size: string]

In [13]:
normalized_logs_df = parsed_logs_df.filter(
        (F.col('host') != '') &
        (F.col('timestamp') != '') &
        (F.col('method') != '') &
        (F.col('endpoint') != '') &
        (F.col('protocol') != '') &
        (F.col('status') != '') &
        (F.col('content_size') != '')
    ) 
normalized_logs_df.count()

                                                                                

507334

In [18]:
# normalized_logs_df = parsed_logs_df.withColumn('timestamp', F.udf(lambda s: datetime.strptime(s, "%d/%b/%Y:%H:%M:%S %z").date())("timestamp"))
# normalized_logs_df = normalized_logs_df.withColumn('status', F.udf(lambda s: int(s))('status'))
# normalized_logs_df = normalized_logs_df.withColumn('content_size', F.udf(lambda s: int(s))('content_size'))
normalized_logs_df = parsed_logs_df.withColumns({
    "timestamp": F.udf(lambda s: datetime.strptime(s, "%d/%b/%Y:%H:%M:%S %z"), SparkTypes.TimestampType())("timestamp"),
    "status": F.udf(lambda s: int(s), SparkTypes.IntegerType())('status'),
    "content_size": F.udf(lambda s: int(s), SparkTypes.IntegerType())('content_size'),
})

In [19]:
normalized_logs_df.show(10)
normalized_logs_df.printSchema()

+--------------+-------------------+------+--------------------+--------+------+------------+
|          host|          timestamp|method|            endpoint|protocol|status|content_size|
+--------------+-------------------+------+--------------------+--------+------+------------+
|              |1995-07-01 11:00:01|   GET|    /history/apollo/|HTTP/1.0|   200|        6245|
|              |1995-07-01 11:00:06|   GET| /shuttle/countdown/|HTTP/1.0|   200|        3985|
| 199.120.110.2|1995-07-01 11:00:09|   GET|/shuttle/missions...|HTTP/1.0|   200|        4085|
|              |1995-07-01 11:00:11|   GET|/shuttle/countdow...|HTTP/1.0|   304|           0|
| 199.120.110.2|1995-07-01 11:00:11|   GET|/shuttle/missions...|HTTP/1.0|   200|        4179|
|              |1995-07-01 11:00:12|   GET|/images/NASA-logo...|HTTP/1.0|   304|           0|
|              |1995-07-01 11:00:12|   GET|/shuttle/countdow...|HTTP/1.0|   200|           0|
|       205.212|1995-07-01 11:00:12|   GET|/shuttle/countdow

In [81]:
bad_rows_df = parsed_logs_df.filter(parsed_logs_df['host'].isNull()| 
                                    parsed_logs_df['timestamp'].isNull() | 
                                    parsed_logs_df['method'].isNull() |
                                    parsed_logs_df['endpoint'].isNull() |
                                    parsed_logs_df['status'].isNull() |
                                    parsed_logs_df['content_size'].isNull()|
                                    parsed_logs_df['protocol'].isNull())
bad_rows_df.count()

0

In [15]:
good_rows_df = parsed_logs_df.filter(
        F.col('host').isNotNull() &
        F.col('timestamp').isNotNull() &
        F.col('method').isNotNull() &
        F.col('endpoint').isNotNull() &
        F.col('protocol').isNotNull() &
        F.col('status').isNotNull() &
        F.col('content_size').isNotNull()
    )
good_rows_df.count()

                                                                                

3461613

In [82]:
parsed_logs_df.printSchema()

root
 |-- host: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- method: string (nullable = true)
 |-- endpoint: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- status: string (nullable = true)
 |-- content_size: string (nullable = true)



In [None]:
s = "01/Jul/1995:00:00:01 -0400"
datetime.strptime(s, "%d/%b/%Y:%H:%M:%S %z")

In [170]:
parsed_logs_df.write.csv("parsed_logs")

                                                                                