In [0]:
%fs
ls /FileStore/

path,name,size,modificationTime
dbfs:/FileStore/apache_logs.txt,apache_logs.txt,2370789,1729949819000
dbfs:/FileStore/cleaned_logs.csv/,cleaned_logs.csv/,0,0
dbfs:/FileStore/cleaned_logs.parquet/,cleaned_logs.parquet/,0,0
dbfs:/FileStore/status_counts.parquet/,status_counts.parquet/,0,0
dbfs:/FileStore/tables/,tables/,0,0
dbfs:/FileStore/url_sizes.parquet/,url_sizes.parquet/,0,0


In [0]:
log_data_path = "dbfs:/FileStore/apache_logs.txt"
raw_data = spark.read.text(log_data_path)

In [0]:
raw_data.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
from pyspark.sql.functions import regexp_extract

# Update the log pattern to use numbered groups instead of named groups
log_pattern = r'^(\S+) - - \[(.*?)\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+) "(.*?)" "(.*?)"'

# Assuming you have a DataFrame `logs_df` with a column named 'value' containing the log lines
parsed_logs_df = raw_data.select(
    regexp_extract('value', log_pattern, 1).alias('ip'),
    regexp_extract('value', log_pattern, 2).alias('timestamp'),
    regexp_extract('value', log_pattern, 3).alias('method'),
    regexp_extract('value', log_pattern, 4).alias('url'),
    regexp_extract('value', log_pattern, 5).alias('protocol'),
    regexp_extract('value', log_pattern, 6).alias('status'),
    regexp_extract('value', log_pattern, 7).alias('size'),
    regexp_extract('value', log_pattern, 8).alias('referrer'),
    regexp_extract('value', log_pattern, 9).alias('user_agent')
)

# Show the parsed DataFrame
parsed_logs_df.show(truncate=False)


+------------+--------------------------+------+----------------------------------------------------------------------------------+--------+------+------+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+
|ip          |timestamp                 |method|url                                                                               |protocol|status|size  |referrer                                                       |user_agent                                                                                                             |
+------------+--------------------------+------+----------------------------------------------------------------------------------+--------+------+------+---------------------------------------------------------------+--------------------------------------------------------------------------------------------------------

In [0]:
parsed_logs_df.show(1)
#parsed_logs_df.printSchema()


+------------+--------------------+------+--------------------+--------+------+------+--------------------+--------------------+
|          ip|           timestamp|method|                 url|protocol|status|  size|            referrer|          user_agent|
+------------+--------------------+------+--------------------+--------+------+------+--------------------+--------------------+
|83.149.9.216|17/May/2015:10:05...|   GET|/presentations/lo...|HTTP/1.1|   200|203023|http://semicomple...|Mozilla/5.0 (Maci...|
+------------+--------------------+------+--------------------+--------+------+------+--------------------+--------------------+
only showing top 1 row



In [0]:
from pyspark.sql.functions import to_timestamp

# Clean the DataFrame
cleaned_logs_df = parsed_logs_df.filter(parsed_logs_df.status.isNotNull())

# Convert timestamp to a timestamp type
cleaned_logs_df = cleaned_logs_df.withColumn('timestamp', to_timestamp('timestamp', 'dd/MMM/yyyy:HH:mm:ss Z'))

cleaned_logs_df.show(truncate=False)

+------------+-------------------+------+----------------------------------------------------------------------------------+--------+------+------+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+
|ip          |timestamp          |method|url                                                                               |protocol|status|size  |referrer                                                       |user_agent                                                                                                             |
+------------+-------------------+------+----------------------------------------------------------------------------------+--------+------+------+---------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------+
|83.

In [0]:
total=cleaned_logs_df.count()
print(total)

10000


In [0]:
#Count Requests by IP Address

ip_counts_df = cleaned_logs_df.groupBy('ip').count().orderBy('count', ascending=False)
ip_counts_df.show(5, truncate=False)

+--------------+-----+
|ip            |count|
+--------------+-----+
|              |670  |
|66.249.73.135 |432  |
|46.105.14.53  |364  |
|130.237.218.86|293  |
|50.16.19.13   |113  |
+--------------+-----+
only showing top 5 rows



In [0]:
# Count Response Status Codes

status_counts_df = cleaned_logs_df.groupBy('status').count().orderBy('count', ascending=False)
status_counts_df.show(truncate=False)

+------+-----+
|status|count|
+------+-----+
|200   |8912 |
|      |670  |
|404   |205  |
|301   |163  |
|206   |45   |
|403   |2    |
|416   |2    |
|500   |1    |
+------+-----+



In [0]:
blank_df = cleaned_logs_df.filter(cleaned_logs_df.status.isNull())
count_blank_status = blank_df.count()
print(count_blank_status)

0


In [0]:
#Total Response Size by URL

url_size_df = cleaned_logs_df.groupBy('url').agg({'size': 'sum'}).withColumnRenamed('sum(size)', 'total_size')
url_size_df.show(truncate=False)

+--------------------------------------------------------------------------+----------+
|url                                                                       |total_size|
+--------------------------------------------------------------------------+----------+
|/~psionic/projects/securitrack/config.xsl                                 |353.0     |
|/blog/site/site-design-2009.html                                          |33588.0   |
|/presentations/logstash-preso-1.0/images/simple-inputs-filters-outputs.jpg|1168622.0 |
|/blog/geekery/pyblosxom-mdate-vim-hack.html                               |10331.0   |
|/blog/geekery/sysadvent-2010-begins.html?source=rss20                     |9505.0    |
|/presentations/logstash-1/css/reset.css                                   |1015.0    |
|/blog/tags/regexp                                                         |20513.0   |
|/projects/pam_captcha/                                                    |42056.0   |
|/images/ec2_m1large_cost.png   

In [0]:
#Store the cleaned and analyzed data

output_path = "dbfs:/FileStore/cleaned_logs.parquet"
cleaned_logs_df.write.parquet(output_path)

status_counts_output_path = "dbfs:/FileStore/status_counts.parquet"
status_counts_df.write.parquet(status_counts_output_path)

url_size_output_path = "dbfs:/FileStore/url_sizes.parquet"
url_size_df.write.parquet(url_size_output_path)

In [0]:
num=cleaned_logs_df.filter(cleaned_logs_df.ip.isin('')).count()
print(num)

670
