In [0]:
[mount for mount in dbutils.fs.mounts() if 'data/' in mount.mountPoint]

In [0]:
from pyspark.sql import SparkSession
from pyspark import StorageLevel
import pyspark.sql.functions as f


In [0]:
fluxo = 'principal'
# fluxo = 'escala'

if fluxo == 'principal':
    source_file = spark.read.text('dbfs:/mnt/data/bronze/access_log.txt')
    target_table = 'santander_log_analysis_task'
    coalesce = 4
elif fluxo == 'escala':
    source_file = spark.read.text('dbfs:/mnt/data/data-ingestion/*') 
    target_table = 'log_analysis'
    coalesce = 132


In [0]:
try:
    df_log_analysis_silver = (
    spark.read.load(f'dbfs:/mnt/data/silver/{target_table}')
    .coalesce(coalesce)
    .persist(StorageLevel.MEMORY_AND_DISK)
)

    df_log_analysis_silver.count() # `count` realizado para forçar o cache de todo o DataFrame

except Exception as e:
    print(f"Exceção encontrada durante tentativa de leitura: {e}")
    if 'NOT_FOUND' in str(e):
        df_log_analysis_silver = (
        source_file
        .select(
            f.regexp_extract(f.col('value'), r'^([0-9]{1,3}\.){3}[0-9]{1,3}', 0).alias('ip_address'),
            f.regexp_extract(f.col('value'), r'[0-9]{2}\/[a-zA-Z]{3}\/\d{4}:\d{2}:\d{2}:\d{2}\s-\d{4}', 0).alias('log_date'),
            f.to_timestamp(f.regexp_extract(f.col('value'), r'[0-9]{2}\/[a-zA-Z]{3}\/\d{4}:\d{2}:\d{2}:\d{2}\s-\d{4}', 0), 'dd/MMM/yyyy:HH:mm:ss Z').alias('log_date_offseted'),
            f.regexp_extract(f.col('value'), r'\"(\w*)\s', 1).alias('http_method'),
            f.regexp_extract(f.col('value'), r'(?:\w*) (\/.*)\sHTTP', 1).alias('http_endpoint'),
            f.regexp_extract(f.col('value'), r'\s(\d{3})\s([-\d]*)', 1).alias('http_response_code'),   
            f.when(f.regexp_extract(f.col('value'), r'\s(\d{3})\s([-\d]*)', 2) == '-', f.lit(0)).otherwise(f.regexp_extract(f.col('value'), r'\s(\d{3})\s([-\d]*)', 2)).alias('http_response_size'),
            f.regexp_extract(f.col('value'), r'HTTP\/\d.\d', 0).alias('http_version'),
        ).select("*",
                f.when(f.regexp(f.col('http_endpoint'), f.lit(r"\.\w{1,}")), f.lit(1)).otherwise(f.lit(0)).alias('is_file'),
                f.when(f.col('http_response_size') > 0, f.lit(1)).otherwise(f.lit(0)).alias('has_response_size'),             
                f.when(f.col('http_response_code').rlike(r'^1'), f.lit('info'))
                .when(f.col('http_response_code').rlike(r'^2'), f.lit('success'))
                .when(f.col('http_response_code').rlike(r'^3'), f.lit('redirect'))
                .when(f.col('http_response_code').rlike(r'^4'), f.lit('client-error'))
                .otherwise(f.lit('server-error')).alias('http_response_type')
            )
        .withColumn('http_endpoint', 
                    f.when(f.length(f.col('http_endpoint')) > 1, f.regexp_replace(f.col('http_endpoint'), r'\/$', ''))
                    .otherwise(f.col('http_endpoint'))) # Remove a barra ao final, para evitar confusão de endpoints, como: /search/ e /search, que possuem o mesmo efeito prático    
    )
        (
            df_log_analysis_silver
            .write
            .format('delta')
            .mode('overwrite')
            .option('overwriteSchema', 'true')
            .option('delta.targetFileSize', '128MB')
            .option('delta.autoOptimize.optimizeWrite', 'true')
            .option('delta.dataSkippingStatsColumns', 'ip_address,log_date,log_date_offseted,http_method,http_response_size,is_file,has_response_size,http_response_type')
            .save(f'dbfs:/mnt/data/silver/{target_table}') 
        )

        spark.sql(f"OPTIMIZE DELTA.`dbfs:/mnt/data/silver/{target_table}` ZORDER BY (log_date_offseted, ip_address, is_file, http_response_type, has_response_size)")
    else:
        raise Exception(e)


#### 1. **Identifique as 10 maiores origens de acesso (Client IP) por quantidade de acessos.**


In [0]:
from pyspark.sql.window import Window
try:
    df_top_ten_ips_most_access = spark.read.load(f'dbfs:/mnt/data/gold/{target_table}_top_ten_ips_most_access')

except:

    df_top_ten_ips_most_access = (
        df_log_analysis_silver
        .groupBy('ip_address')
        .agg(f.count('ip_address').alias('access_frequency'))
        .withColumn('rank', f.dense_rank().over(Window.orderBy(f.col('access_frequency').desc()))) # Adição de ranking para ser mais fácil de manipular a classificação
        .orderBy(f.col('access_frequency').desc())
        .filter("rank <= 10")
    )

    # (
    #     df_top_ten_ips_most_access
    #     .write
    #     .format('delta')
    #     .mode('overwrite')
    #     .option('overwriteSchema', 'true')
    #     .option('delta.targetFileSize', '128MB')
    #     .option('delta.autoOptimize.optimizeWrite', 'true')
    #     .save(f'dbfs:/mnt/data/gold/{target_table}_top_ten_ips_most_access') 
    # )

display(df_top_ten_ips_most_access)


Databricks visualization. Run in Databricks to view.

#### 2. **Liste os 6 endpoints mais acessados, desconsiderando aqueles que representam arquivos.**

In [0]:
try:
    df_top_six_endpoints = spark.read.load(f'dbfs:/mnt/data/gold/{target_table}_top_six_endpoints')
except:

    df_top_six_endpoints = (
        df_log_analysis_silver
        .where("is_file = 0")
        # .where("is_file = 0 AND http_response_type = 'success'") # acho que faz mais sentido filtrar pelas páginas que tiveram sucesso, excluindo retornos info, redirect e error. Até porque 'is_file' é um parâmetro de z-ordering
        .groupBy('http_endpoint')
        .agg(f.count('http_endpoint').alias('http_endpoint_frequency'))
        .withColumn('rank', f.dense_rank().over(Window.orderBy(f.col('http_endpoint_frequency').desc())))
        .orderBy(f.col('http_endpoint_frequency').desc())
        .filter("rank <= 6")
    )

    (
        df_top_six_endpoints
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema', 'true')
        .option('delta.targetFileSize', '128MB')
        .option('delta.autoOptimize.optimizeWrite', 'true')
        .save(f'dbfs:/mnt/data/gold/{target_table}_top_six_endpoints') 
    )

print('Endpoints mais acessados')
display(df_top_six_endpoints)
print('Endpoints mais acessados excluindo a raíz')
display(df_top_six_endpoints.filter("http_endpoint != '/'")) # Também é possível elencar os endpoints que não são a página inicial


#### 2.1 Endpoints com mais erros, divididos por client-side e server-side

In [0]:
try:
    df_top_six_failing_endpoints = spark.read.load(f'dbfs:/mnt/data/gold/{target_table}_top_six_failing_endpoints') 

except:

    df_top_six_failing_endpoints = (
        df_log_analysis_silver
        .where("is_file = 0 AND http_response_type in ('client-error', 'server-error')")
        .groupBy('http_endpoint', 'http_response_type')
        .agg(f.count('http_endpoint').alias('http_endpoint_frequency'))
    )

    (
        df_top_six_failing_endpoints
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema', 'true')
        .option('delta.targetFileSize', '128MB')
        .option('delta.autoOptimize.optimizeWrite', 'true')
        .save(f'dbfs:/mnt/data/gold/{target_table}_top_six_failing_endpoints') 
    )

print('Client-errors')
display(
    df_top_six_failing_endpoints
    .filter("http_response_type = 'client-error'")
    .withColumn('rank', f.dense_rank().over(Window.orderBy(f.col('http_endpoint_frequency').desc())))
    .orderBy(f.col('http_endpoint_frequency').desc())
    .filter("rank <= 6")    
)

print('Server-errors')
display(
    df_top_six_failing_endpoints
    .filter("http_response_type = 'server-error'")
    .withColumn('rank', f.dense_rank().over(Window.orderBy(f.col('http_endpoint_frequency').desc())))
    .orderBy(f.col('http_endpoint_frequency').desc())
    .filter("rank <= 6")    
)


#### 2.2 Arquivos mais consumidos

In [0]:
try:
    df_top_10_most_accessed_files = spark.read.load(f'dbfs:/mnt/data/gold/{target_table}_top_10_most_accessed_files')
except:
    df_top_10_most_accessed_files = (
        df_log_analysis_silver
        .where("is_file = 1")
        .groupBy('http_endpoint', 'http_response_type')
        .agg(f.count('http_endpoint').alias('http_endpoint_frequency'))
        .withColumn('rank', f.dense_rank().over(Window.orderBy(f.col('http_endpoint_frequency').desc())))
        .orderBy(f.col('http_endpoint_frequency').desc())
        .filter("rank <= 10")    
    )

    # Escrita da tabela na camada gold
    (
        df_top_10_most_accessed_files
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema', 'true')
        .option('delta.targetFileSize', '128MB')
        .option('delta.autoOptimize.optimizeWrite', 'true')
        .save(f'dbfs:/mnt/data/gold/{target_table}_top_10_most_accessed_files') 
    )

display(df_top_10_most_accessed_files)


#### 3. **Qual a quantidade de Client IPs distintos?**

In [0]:
try:
    df_count_distinct_ips = spark.read.load(f'dbfs:/mnt/data/gold/{target_table}_count_distinct_ips')

except:
    df_count_distinct_ips = (
        df_log_analysis_silver
        .groupBy()
        .agg(f.countDistinct('ip_address').alias('distinct_ips'))
    )

    # Escrita da tabela na camada gold
    (
        df_count_distinct_ips
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema', 'true')
        .option('delta.targetFileSize', '128MB')
        .option('delta.autoOptimize.optimizeWrite', 'true')
        .save(f'dbfs:/mnt/data/gold/{target_table}_count_distinct_ips') 
    )

display(df_count_distinct_ips)

#### 4. **Quantos dias de dados estão representados no arquivo?**


In [0]:
try:
    df_days_between_log_dates = spark.read.load(f'dbfs:/mnt/data/gold/{target_table}_days_between_log_dates')
except:
    df_days_between_log_dates = (
        df_log_analysis_silver
        .groupBy()
        .agg(f.min('log_date_offseted').alias('earliest_date'),
            f.max('log_date_offseted').alias('latest_date'),
            f.date_diff(f.max('log_date_offseted'), f.min('log_date_offseted')).alias('days_between_dates'))
    )

    # Escrita da tabela na camada gold
    (
        df_days_between_log_dates
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema', 'true')
        .option('delta.targetFileSize', '128MB')
        .option('delta.autoOptimize.optimizeWrite', 'true')
        .save(f'dbfs:/mnt/data/gold/{target_table}_days_between_log_dates') 
    )

display(df_days_between_log_dates)

#### 5. **Com base no tamanho (em bytes) do conteúdo das respostas, faça a seguinte análise:**
   - O volume total de dados retornado.
   - O maior volume de dados em uma única resposta.
   - O menor volume de dados em uma única resposta.
   - O volume médio de dados retornado.
   - *Dica:* Considere como os dados podem ser categorizados por tipo de resposta para realizar essas análises.

In [0]:
try:
    df_log_analysis_response_sizes = spark.read.load(f'dbfs:/mnt/data/gold/{target_table}_log_analysis_response_sizes')

except: 
    df_log_analysis_response_sizes = (
        df_log_analysis_silver
        .where("has_response_size = 1")
        .groupBy()
        .agg(
            f.sum(f.col('http_response_size')).alias('total_response_size'),
            f.max(f.col('http_response_size')).alias('max_response_size'),
            f.min(f.col('http_response_size')).alias('min_response_size'),
            f.avg(f.col('http_response_size')).alias('avg_response_size'),
        )
    )

        # Escrita da tabela na camada gold
    (
        df_log_analysis_response_sizes
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema', 'true')
        .option('delta.targetFileSize', '128MB')
        .option('delta.autoOptimize.optimizeWrite', 'true')
        .save(f'dbfs:/mnt/data/gold/{target_table}_log_analysis_response_sizes') 
    )

display(df_log_analysis_response_sizes)

#### 6. **Qual o dia da semana com o maior número de erros do tipo "HTTP Client Error"?**

In [0]:


try: 
    df_week_day_with_more_client_errors = spark.read.load(f'dbfs:/mnt/data/gold/{target_table}_week_day_with_more_client_errors')

except:
    from pyspark.sql.types import StringType

    @f.udf(StringType())
    def get_weekday(day_of_week: int) -> str:
        """
        Retorna o dia da semana como a string com o nome, a partir de uma entrada de um número inteiro entre 0 e 6.

        Parameters:
            day_of_week (int): O número do dia da semana, entre 0 e 6.
        
        Returns:
            str: O nome do dia da semana, como uma string.
        """
        assert day_of_week >= 0 and day_of_week <= 6, "day_of_week must be between 0 and 6"
        days = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
        return days[day_of_week]


    df_week_day_with_more_client_errors = (
        df_log_analysis_silver
        .where("http_response_type = 'client-error'")
        .groupBy(get_weekday(f.weekday(f.col('log_date_offseted'))).alias('weekday'))
        .agg(f.count(get_weekday(f.weekday(f.col('log_date_offseted')))).alias('weekday_client_error_count'))
        .orderBy('weekday_client_error_count', ascending=False)
    )

    (
        df_week_day_with_more_client_errors
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema', 'true')
        .option('delta.targetFileSize', '128MB')
        .option('delta.autoOptimize.optimizeWrite', 'true')
        .save(f'dbfs:/mnt/data/gold/{target_table}_week_day_with_more_client_errors') 
    )


display(week_day_with_more_client_errors)

Databricks visualization. Run in Databricks to view.

#### 6.1 O quanto conseguimos explorar a diarização?

In [0]:
# Originalmente, o código foi estruturado em PySpark, conforme definição abaixo, mas preferi a alteração para uma query SQL para facilitação da subquery (evitando o uso do collect)

# Definição anterior, também funcional:
# max_process_date, max_process_date_minus_window  = (
#     df_log_analysis_silver
#     .select(f.to_date(f.max('log_date_offseted')).alias('max_process_date'), f.date_sub(f.to_date(f.max('log_date_offseted')), moving_window).alias('max_process_date_minus_window'))
#     .collect()[0]
# )


# df_log_analysis_type_and_date = (
    # df_log_analysis_silver
    # .groupBy(
    #     f.to_date(f.col('log_date_offseted')).alias('date_offseted'),
    #     f.col('http_response_type'),
    #     get_weekday(f.weekday(f.col('log_date_offseted'))).alias('weekday')
    # )
    # .agg(f.count(f.col('http_response_type').alias('http_response_type')))
    # .filter(f.col('date_offseted').between(max_process_date_minus_window, max_process_date)
    # )
# )



try:
    df_log_analysis_type_and_date = spark.read.load(f'dbfs:/mnt/data/gold/{target_table}_log_analysis_type_and_date')

except:
    moving_window = 30 # é possível modificar a janela para aumentar ou diminuir o horizonte de análise
    df_log_analysis_silver.createOrReplaceTempView('log_analysis_type_and_date')

    df_log_analysis_type_and_date = spark.sql(
        f"""
        SELECT 
            TO_DATE(log_date_offseted) AS date_offseted,
            http_response_type,
            CASE 
                WHEN WEEKDAY(log_date_offseted) = 0 THEN 'Monday'
                WHEN WEEKDAY(log_date_offseted) = 1 THEN 'Tuesday'
                WHEN WEEKDAY(log_date_offseted) = 2 THEN 'Wednesday'
                WHEN WEEKDAY(log_date_offseted) = 3 THEN 'Thursday'
                WHEN WEEKDAY(log_date_offseted) = 4 THEN 'Friday'
                WHEN WEEKDAY(log_date_offseted) = 5 THEN 'Saturday'
                WHEN WEEKDAY(log_date_offseted) = 6 THEN 'Sunday'
            END AS weekday,
            COUNT(http_response_type) AS http_response_type_count
        FROM 
            log_analysis_type_and_date
        WHERE 
            TO_DATE(log_date_offseted) BETWEEN (
                    SELECT date_sub(MAX(TO_DATE(log_date_offseted)), {moving_window})  FROM log_analysis_type_and_date
                ) AND (
                    SELECT MAX(TO_DATE(log_date_offseted)) FROM log_analysis_type_and_date
                )
        GROUP BY 
            TO_DATE(log_date_offseted),
            http_response_type
        """
    )

    (
        df_log_analysis_type_and_date
        .write
        .format('delta')
        .mode('overwrite')
        .option('overwriteSchema', 'true')
        .option('delta.targetFileSize', '128MB')
        .option('delta.autoOptimize.optimizeWrite', 'true')
        .save(f'dbfs:/mnt/data/gold/{target_table}_log_analysis_type_and_date') 
    )

display(df_log_analysis_type_and_date)

Databricks visualization. Run in Databricks to view.