In [202]:
import json
import re
from awsglue.context import GlueContext
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructField, StructType, StringType, LongType


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [203]:
session = SparkSession.builder.appName("test").getOrCreate() 
spark = session.sparkContext
gc = GlueContext(spark)

file =  's3://wfercosta-spark/logs/*.gz'

schema = StructType([
    StructField("date", StringType(), True),
    StructField("time", StringType(), True),
    StructField("x-edge-location", StringType(), True),
    StructField("sc-bytes", StringType(), True),
    StructField("c-ip", StringType(), True),
    StructField("cs-method", StringType(), True),
    StructField("cs(Host)", StringType(), True),
    StructField("cs-uri-stem", StringType(), True),
    StructField("sc-status", StringType(), True),
    StructField("cs(Referer)", StringType(), True),
    StructField("cs(User-Agent)", StringType(), True),
    StructField("cs-uri-query", StringType(), True),
    StructField("cs(Cookie)", StringType(), True),
    StructField("x-edge-result-type", StringType(), True),
    StructField("x-edge-request-id", StringType(), True),
    StructField("x-host-header", StringType(), True),
    StructField("cs-protocol", StringType(), True),
    StructField("cs-bytes", StringType(), True),
    StructField("time-taken", StringType(), True),
    StructField("x-forwarded-for", StringType(), True),
    StructField("ssl-protocol", StringType(), True),
    StructField("ssl-cipher", StringType(), True),
    StructField("x-edge-response-result-type", StringType(), True),
    StructField("cs-protocol-version", StringType(), True),
    StructField("fle-status", StringType(), True),
    StructField("fle-encrypted-fields", StringType(), True),
    StructField("c-port", StringType(), True),
    StructField("time-to-first-byte", StringType(), True),
    StructField("x-edge-detailed-result-type", StringType(), True),
    StructField("sc-content-type", StringType(), True),
    StructField("sc-content-len", StringType(), True),
    StructField("sc-range-start", StringType(), True),
    StructField("sc-range-end", StringType(), True),
])


df = session.read.csv(file, sep=' ', header=False, schema=schema)
df = df.withColumn("filename", F.input_file_name())
df = df.where(F.col('date').startswith("#") == False)
df = df.select(['date', 'time', 'sc-status', 'cs-uri-stem', 'time-taken', 'filename'])

df = df.withColumnRenamed("sc-status","status")
df = df.withColumnRenamed("cs-uri-stem","resource")
df = df.withColumnRenamed("time-taken","response_time")
df = df.withColumn("timestamp", F.to_timestamp(F.concat(F.col('date'), F.lit(' '), F.col('time'))))
df = df.drop(F.col('time'))

df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+--------------------+-------------+--------------------+-------------------+
|      date|status|            resource|response_time|            filename|          timestamp|
+----------+------+--------------------+-------------+--------------------+-------------------+
|2021-05-11|   502|/open-banking/acc...|        0.112|s3://wfercosta-sp...|2021-05-11 03:00:25|
|2021-05-11|   502|                   /|        0.112|s3://wfercosta-sp...|2021-05-11 03:00:25|
|2021-05-11|   502|                   /|        0.112|s3://wfercosta-sp...|2021-05-11 03:00:25|
|2021-05-11|   502|                   /|        0.112|s3://wfercosta-sp...|2021-05-11 03:00:25|
+----------+------+--------------------+-------------+--------------------+-------------------+

In [204]:
# Remove not required resourcs
regex_valid_apis = r'^\/[a-z-_]+\/[a-z-_]+\/v[0-9]{1,2}\/.+'
df_rqd_res = df.filter(df.resource.rlike(regex_valid_apis))
df_rqd_res.show(1, False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+-------------------------------------------------------------------+-------------+-------------------------------+-------------------+
|date      |status|resource                                                           |response_time|filename                       |timestamp          |
+----------+------+-------------------------------------------------------------------+-------------+-------------------------------+-------------------+
|2021-05-11|502   |/open-banking/accounts/v1/accounts-urn/urn:isbn:0451450523/balances|0.112        |s3://wfercosta-spark/logs/f4.gz|2021-05-11 03:00:25|
+----------+------+-------------------------------------------------------------------+-------------+-------------------------------+-------------------+

In [205]:
#Normalize resoruce path variables with placeholders

regex_norm_placeholders = r'\/([0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12})|\/(urn:[a-z0-9A-z]*:[a-z0-9A-z]+)|\/([0-9]+)'

df_norm_placeholders = df_rqd_res.withColumn('resource', F.regexp_replace('resource', regex_norm_placeholders, '\/{param}'))

df_norm_placeholders.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+--------------------+-------------+--------------------+-------------------+
|      date|status|            resource|response_time|            filename|          timestamp|
+----------+------+--------------------+-------------+--------------------+-------------------+
|2021-05-11|   502|/open-banking/acc...|        0.112|s3://wfercosta-sp...|2021-05-11 03:00:25|
+----------+------+--------------------+-------------+--------------------+-------------------+

In [206]:
#Explodes additional fields

regex_extract_context = r'^\/([a-z_-]+)'
regex_extract_family = r'^\/[a-z_-]+\/([a-z_-]+)'
regex_extract_version = r'^\/[a-z_-]+\/[a-z_-]+\/(v[0-9]{1,2})'
regex_extract_resource = r'^\/[a-z_-]+\/[a-z_-]+\/v[0-9]{1,2}(\/.*)'

df_extra_cols = df_norm_placeholders
df_extra_cols = df_extra_cols.withColumn('context', F.regexp_extract('resource', regex_extract_context, 1))
df_extra_cols = df_extra_cols.withColumn('family', F.regexp_extract('resource', regex_extract_family, 1))
df_extra_cols = df_extra_cols.withColumn('version', F.regexp_extract('resource', regex_extract_version, 1))

df_extra_cols = df_extra_cols.withColumn('resource', F.regexp_extract('resource', regex_extract_resource, 1))

df_extra_cols.select(['date', 'context', 'family', 'version', 'resource', 'status', 'response_time', 'timestamp']).show(20, False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+--------+-------+------------------------------+------+-------------+-------------------+
|date      |context     |family  |version|resource                      |status|response_time|timestamp          |
+----------+------------+--------+-------+------------------------------+------+-------------+-------------------+
|2021-05-11|open-banking|accounts|v1     |/accounts-urn/{param}/balances|502   |0.112        |2021-05-11 03:00:25|
+----------+------------+--------+-------+------------------------------+------+-------------+-------------------+

In [214]:
from datetime import datetime

df_finish = df_extra_cols

files_processed = [row.filename for row in df_finish.select('filename').distinct().collect()]
dates = [datetime.strptime(row.date, "%Y-%m-%d") for row in df_finish.select('date').distinct().collect()]

df_finish = df_extra_cols.select(['date', 'context', 'family', 'version', 'resource', 'status', 'response_time', 'timestamp'])

#df_finish.persist(StorageLevel.MEMORY_AND_DISK)

for date in dates:
    df_split = df_finish.filter(df_finish.date == date)
    df_split.coalesce(1).write\
     .format("csv")\
     .mode("append")\
     .save(f's3://wfercosta-spark/cleaned/{date.year}/{date.month}/{date.day}', header=True)




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…