In [0]:
%run ./unzip_file_notebook

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


mkdir: cannot create directory ‘/tmp/logs/’: File exists
mkdir: cannot create directory ‘/tmp/logs/packed’: File exists
mkdir: cannot create directory ‘/tmp/logs/unpacked’: File exists


--2024-10-18 14:27:43--  https://raw.githubusercontent.com/seriallink/assignments/refs/heads/main/access_log.txt.7z.001
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15728640 (15M) [application/octet-stream]
Saving to: ‘/tmp/logs/packed/access_log.txt.7z.001’

     0K .......... .......... .......... .......... ..........  0% 6.13M 2s
    50K .......... .......... .......... .......... ..........  0% 7.33M 2s
   100K .......... .......... .......... .......... ..........  0% 5.14M 2s
   150K .......... .......... .......... .......... ..........  1% 12.6M 2s
   200K .......... .......... .......... .......... ..........  1% 7.35M 2s
   250K .......... .......... .......... .......... ..........  1% 35.3M 2s
   300K .......... .......... .......... ........

In [0]:
import pytz
from datetime import datetime
from pyspark.sql.functions import regexp_extract, desc, count, col, lit, date_format, to_date, sum, min, max, avg, when

In [0]:
def write_delta(df, partition, mode, location, table_name):
    """
        @Description: Write dataframe as delta table.
        @Param df: Dataframe containing data to save.
        @Param partition: Set partition ref column name.
        @Param mode: Set if this operation will overwrite or append data.
        @Param location: Set location of delta files.
        @Param table_name: Name of the table created.
    """

    if spark.catalog.tableExists(tableName=table_name):
        print("Table already exists.")
    else:
        try:
            df.write \
                .partitionBy(partition) \
                .format('delta') \
                .mode(mode) \
                .option('path', location) \
                .saveAsTable(table_name)
        except:
            print('Something went wrong trying to write delta table!')            

In [0]:
def split_columns(df, regx_pattern):
    """
        @Description: Read a dataframe and apply regex pattern to format web server logs columns
        @Param regx_pattern: String value containing a regex pattern to format web server logs columns
    """

    return df.select(
        regexp_extract(col('value'), regx_pattern, 1).alias('ip_address'),
        regexp_extract(col('value'), regx_pattern, 2).alias('client_id'),
        regexp_extract(col('value'), regx_pattern, 3).alias('user_id'),
        regexp_extract(col('value'), regx_pattern, 4).alias('datetime'),
        regexp_extract(col('value'), regx_pattern, 5).alias('http_method'),
        regexp_extract(col('value'), regx_pattern, 6).alias('resource'),
        regexp_extract(col('value'), regx_pattern, 7).alias('deployed_version'),
        regexp_extract(col('value'), regx_pattern, 8).alias('status_code'),
        regexp_extract(col('value'), regx_pattern, 9).alias('content_size'),
        col('ingest_date')
    )


In [0]:
def group_by_filter_column(df, column, condition):
    """
        @Description: Read a dataframe do a group by and filter a dataframe and return count of occurrences at the column. 
        @Param df: Dataframe to be grouped
        @Param column: Column name
        @Param condition: Condition to apply in filter
    """

    return df.select(col(column))\
                 .filter(condition)\
                 .groupby(col(column))\
                 .agg(count(column).alias('count'))\
                 .orderBy(desc(col('count')))

#### Realiza a leitura do arquivo expandido em um dataframe, aplicando uma coluna com a data corrente para o particionamento da tabela. Após a operação, é escrita a tabela no modo delta chamada de tb_raw_access_log, contendo o conteúdo do arquivo em sua forma bruta.

In [0]:
timezone = pytz.timezone('America/Sao_Paulo')
current_date = datetime.now(timezone).strftime('%Y-%m-%d')

df_file = spark.read.option('header', 'false').text('/tmp/logs/unpacked/access_log.txt')
df_file = df_file.withColumn('ingest_date', lit(current_date))

partition = 'ingest_date'
mode = 'append'
location = '/mnt/delta/raw/'
table_name = 'tb_raw_access_log'

write_delta(df_file, partition, mode, location, table_name)

process finished!


#### Realiza a leitura da tabela tb_raw_access_log e aplica a formatação das colunas para também salvar em uma tabela, tb_trusted_access_log, contendo os dados transformados.

In [0]:
df_raw = spark.read.table('tb_raw_access_log')
regex_pattern = '(\S+) (\S+) (\S+) \[(.*?)\] "(\S+) (.*?) (\S+)" (\d+) (\d+)'

df_raw = split_columns(df_raw, regex_pattern)
df_raw = df_raw.replace('','-')
df_raw = df_raw.fillna('-')
df_raw = df_raw.withColumn('date', date_format(to_date(col('datetime'), 'dd/MMM/yyyy:HH:mm:ss Z'), 'yyyy-MM-dd'))

partition = 'ingest_date'
mode = 'append'
location = '/mnt/delta/trusted/'
table_name = 'tb_trusted_access_log'

write_delta(df_raw, partition, mode, location, table_name)

#### 1. O trecho realiza a identificação das 10 maiores origens de acesso (Client IP) por quantidade de acessos.

In [0]:
column = 'ip_address'
condition = (
  col(column) != "-"
)

df_ips = group_by_filter_column(df_raw, column, condition)

df_top_ips = df_ips.select(*df_ips.columns).limit(10)
df_top_ips.display()

ip_address,count
10.216.113.172,109523
10.173.141.213,45836
10.220.112.1,43910
10.41.69.177,33991
10.169.128.121,22516
10.203.77.198,18754
10.96.173.111,17122
10.53.149.243,16706
10.31.77.18,16692
10.118.250.30,15779


#### 2. O trecho lista os 6 endpoints mais acessados, desconsiderando aqueles que representam arquivos.

In [0]:
column = 'resource'
condition = (
  ~(col(column).rlike(r'.*\..*')) &
  (col(column) != "-")
)

df_resource = group_by_filter_column(df_raw, column, condition)
df_resource = df_resource.select(*df_resource.columns).limit(6)
df_resource.display()

resource,count
/,98793
/release-schedule/,25920
/search/,22985
/release-schedule,18926
/release-schedule/?p=1&r=&l=&o=&rpp=10,8410
/news/,7488


#### 3. Trecho onde é identificada a quantidade de IPs distintos (Client IP's)

In [0]:
distinct_ips = df_ips.select(col('ip_address')).count()
print(f'Foram encontrados {distinct_ips} IP Address distintos!')

Foram encontrados 330322 IP Address distintos!


#### 4. Trecho onde identifica quantos dias de dados estão representados no arquivo

In [0]:
qtd_days = df_raw.select(col('date'))\
                  .dropDuplicates()\
                  .where(col('date') != '-')\
                  .count()

print(f'Foram encontrados {qtd_days} dias distintos!')                  

Foram encontrados 791 dias distintos!


#### 5. Trecho que identifica: 
- O volume total de dados retornado.
- O maior volume de dados em uma única resposta.
- O menor volume de dados em uma única resposta.
- O volume médio de dados retornado.

In [0]:
df_raw.select('content_size')\
      .filter(col('content_size') != '-')\
      .agg(
          sum("content_size").alias("total_volume"),
          max("content_size").alias("max_volume"),
          min("content_size").alias("min_volume"),
          avg("content_size").alias("avg_volume")
        )\
      .display()

total_volume,max_volume,min_volume,avg_volume
805219139580.0,99988,1,195015.7228488475


#### 6. Trecho que indica qual o dia da semana com o maior número de erros do tipo "HTTP Client Error"

In [0]:

column = 'week_day'
condition = (
  (col(column) != "-") & 
  (col('status_code').between('400', '499'))
)
df_day = df_raw.withColumn('week_day', date_format(col("date"), "EEEE"))
df_day = group_by_filter_column(df_day, column, condition)
week_day = df_day.select(col('week_day')).first()['week_day']

days_translation = {
    "Monday": "Segunda-feira",
    "Tuesday": "Terça-feira",
    "Wednesday": "Quarta-feira",
    "Thursday": "Quinta-feira",
    "Friday": "Sexta-feira",
    "Saturday": "Sábado",
    "Sunday": "Domingo"
}

print(f'O dia da semana com maior número de erros do tipo HTTP Client Error é: {days_translation.get(week_day)}')  

O dia da semana com maior número de erros do tipo HTTP Client Error é: Sexta-feira
