In [2]:
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession

In [3]:
sc = SparkContext()
sqlContext = SQLContext(sc)
spark = SparkSession(sc)

In [5]:
import re
import pandas as pd
import glob

In [8]:
data_files = glob.glob('*.gz')
df = spark.read.text(data_files)

In [28]:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'
ts_pattern = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]'
method_uri_protocol_pattern = r'\"(\S+)\s(\S+)\s*(\S*)\"'
status_pattern = r'\s(\d{3})\s'
content_size_pattern = r'\s(\d+)$'

In [24]:
from pyspark.sql.functions import regexp_extract

In [57]:
df_logs = df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                   regexp_extract('value', ts_pattern, 1).alias('timestamp'),
                   regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                   regexp_extract('value', method_uri_protocol_pattern, 2).alias('endpoint'),
                   regexp_extract('value', method_uri_protocol_pattern, 3).alias('protocol'),
                   regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                   regexp_extract('value', content_size_pattern, 1).cast('integer').alias('content_size'))

In [67]:
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum

In [82]:
def count_null_values(col_name):
    return spark_sum(col(col_name).isNull().cast('integer')).alias(col_name)

exprs = [count_null_values(col_name) for col_name in df_logs.columns]
df_logs.agg(*exprs).show()

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     1|       33905|
+----+---------+------+--------+--------+------+------------+



In [83]:
df_logs = df_logs[df_logs['status'].isNotNull()]

In [84]:
exprs = [count_null_values(col_name) for col_name in df_logs.columns]
df_logs.agg(*exprs).show()

+----+---------+------+--------+--------+------+------------+
|host|timestamp|method|endpoint|protocol|status|content_size|
+----+---------+------+--------+--------+------+------------+
|   0|        0|     0|       0|       0|     0|       33904|
+----+---------+------+--------+--------+------+------------+

