In [1]:
#pyspark Seesion
from pyspark import SparkContext, SparkConf

In [2]:
conf = SparkConf().setAppName("Log Analyzer")

In [3]:
sc = SparkContext(conf=conf)

In [4]:
import re  #regular expression

In [5]:
from pyspark.sql import Row

In [6]:
LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)'

In [7]:
def parse_apache_log_line(logline):
    match = re.search(LOG_PATTERN, logline)
    if match is None:
        raise Error("Invalid logline: %s" % logline)
    return Row(
        ip_address    = match.group(1),
        sep_1 = match.group(2),
        sep_2       = match.group(3),
        date_time     = match.group(4),
        method        = match.group(5),
        requested_url      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = long(match.group(9))
)

In [8]:
#read from .gz file and parse it as per above format
access_logs = (sc.textFile("/user/sylphsangeeta3307/BDS_Quiz/NASA_access_log_Aug95.gz")
               .map(parse_apache_log_line))  

In [9]:
access_logs

PythonRDD[2] at RDD at PythonRDD.scala:48

In [10]:

# import pyspark sql
from pyspark.sql import SQLContext

In [11]:
sqlContext = SQLContext(sc)

In [12]:
#create the Dataframe from the RDD
schema_access_logs = sqlContext.createDataFrame(access_logs)


In [13]:
# verify the tpe of schema
type(schema_access_logs)



pyspark.sql.dataframe.DataFrame

In [14]:
# just to verify the content
schema_access_logs.head(5)

[Row(content_size=1839, date_time=u'01/Aug/1995:00:00:01 -0400', ip_address=u'in24.inetnebr.com', method=u'GET', protocol=u'HTTP/1.0', requested_url=u'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', response_code=200, sep_1=u'-', sep_2=u'-'),
 Row(content_size=0, date_time=u'01/Aug/1995:00:00:07 -0400', ip_address=u'uplherc.upl.com', method=u'GET', protocol=u'HTTP/1.0', requested_url=u'/', response_code=304, sep_1=u'-', sep_2=u'-'),
 Row(content_size=0, date_time=u'01/Aug/1995:00:00:08 -0400', ip_address=u'uplherc.upl.com', method=u'GET', protocol=u'HTTP/1.0', requested_url=u'/images/ksclogo-medium.gif', response_code=304, sep_1=u'-', sep_2=u'-'),
 Row(content_size=0, date_time=u'01/Aug/1995:00:00:08 -0400', ip_address=u'uplherc.upl.com', method=u'GET', protocol=u'HTTP/1.0', requested_url=u'/images/MOSAIC-logosmall.gif', response_code=304, sep_1=u'-', sep_2=u'-'),
 Row(content_size=0, date_time=u'01/Aug/1995:00:00:08 -0400', ip_address=u'uplherc.upl.com', method=u'GET', protocol=u'HT

In [15]:
#show in tabular format
schema_access_logs.show(2,truncate=False)

+------------+--------------------------+-----------------+------+--------+-----------------------------------------------+-------------+-----+-----+
|content_size|date_time                 |ip_address       |method|protocol|requested_url                                  |response_code|sep_1|sep_2|
+------------+--------------------------+-----------------+------+--------+-----------------------------------------------+-------------+-----+-----+
|1839        |01/Aug/1995:00:00:01 -0400|in24.inetnebr.com|GET   |HTTP/1.0|/shuttle/missions/sts-68/news/sts-68-mcc-05.txt|200          |-    |-    |
|0           |01/Aug/1995:00:00:07 -0400|uplherc.upl.com  |GET   |HTTP/1.0|/                                              |304          |-    |-    |
+------------+--------------------------+-----------------+------+--------+-----------------------------------------------+-------------+-----+-----+
only showing top 2 rows



In [20]:
# limiting the number of row to process to 60 as beyond this throwing error
limited_Access_log=  schema_access_logs.limit(60)

In [21]:
# check the count
limited_Access_log.count()
    

60

In [22]:
#Quiz-1: Write spark code( using RDD) to find out top 10 requested URLs along with count of number of times they have been requested 

In [23]:
limited_Access_log.groupBy('requested_url').count().orderBy('count', ascending=False).show(10)

+--------------------+-----+
|       requested_url|count|
+--------------------+-----+
|/images/ksclogosm...|    4|
|/images/launch-lo...|    3|
|/shuttle/resource...|    3|
|/history/apollo/i...|    3|
|/images/NASA-logo...|    3|
|/images/KSC-logos...|    3|
|                   /|    2|
|/images/ksclogo-m...|    2|
|/images/MOSAIC-lo...|    2|
|/images/USA-logos...|    2|
+--------------------+-----+
only showing top 10 rows



In [24]:
#Quiz-2: Write spark code to find out top 5 hosts / IP making the request along with count

In [25]:
limited_Access_log.groupBy('ip_address').count().orderBy('count', ascending=False).show(5)

+--------------------+-----+
|          ip_address|count|
+--------------------+-----+
|     uplherc.upl.com|   17|
|        133.43.96.45|    8|
|slppp6.intermind.net|    6|
|ix-esc-ca2-07.ix....|    6|
|kgtyk4.kj.yamagat...|    6|
+--------------------+-----+
only showing top 5 rows



In [26]:
#Quiz-3: Write spark code to find out top 5 time frame for high traffic

In [27]:
# just to check the content from date_time,content_size
limited_Access_log.select('date_time' ,'content_size').show(10)


+--------------------+------------+
|           date_time|content_size|
+--------------------+------------+
|01/Aug/1995:00:00...|        1839|
|01/Aug/1995:00:00...|           0|
|01/Aug/1995:00:00...|           0|
|01/Aug/1995:00:00...|           0|
|01/Aug/1995:00:00...|           0|
|01/Aug/1995:00:00...|        1713|
|01/Aug/1995:00:00...|           0|
|01/Aug/1995:00:00...|        1687|
|01/Aug/1995:00:00...|       11853|
|01/Aug/1995:00:00...|        9202|
+--------------------+------------+
only showing top 10 rows



In [28]:
# import split function
import pyspark.sql.functions 

In [29]:
 #split the Date_Time based on blank character to separate the date time  and the timezone 

In [30]:
split_col = pyspark.sql.functions.split(limited_Access_log['date_time'], ' ')

In [31]:
limited_Access_log = limited_Access_log.withColumn('TimeStamp', split_col.getItem(0))

In [32]:
limited_Access_log = limited_Access_log.withColumn('Time_Zone', split_col.getItem(1))

In [33]:
#verify the schema
limited_Access_log.printSchema()

root
 |-- content_size: long (nullable = true)
 |-- date_time: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- method: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- requested_url: string (nullable = true)
 |-- response_code: long (nullable = true)
 |-- sep_1: string (nullable = true)
 |-- sep_2: string (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- Time_Zone: string (nullable = true)



In [34]:
#verify the sepration
limited_Access_log.select('date_time','TimeStamp','Time_Zone').show(10,truncate=False)

+--------------------------+--------------------+---------+
|date_time                 |TimeStamp           |Time_Zone|
+--------------------------+--------------------+---------+
|01/Aug/1995:00:00:01 -0400|01/Aug/1995:00:00:01|-0400    |
|01/Aug/1995:00:00:07 -0400|01/Aug/1995:00:00:07|-0400    |
|01/Aug/1995:00:00:08 -0400|01/Aug/1995:00:00:08|-0400    |
|01/Aug/1995:00:00:08 -0400|01/Aug/1995:00:00:08|-0400    |
|01/Aug/1995:00:00:08 -0400|01/Aug/1995:00:00:08|-0400    |
|01/Aug/1995:00:00:09 -0400|01/Aug/1995:00:00:09|-0400    |
|01/Aug/1995:00:00:10 -0400|01/Aug/1995:00:00:10|-0400    |
|01/Aug/1995:00:00:10 -0400|01/Aug/1995:00:00:10|-0400    |
|01/Aug/1995:00:00:10 -0400|01/Aug/1995:00:00:10|-0400    |
|01/Aug/1995:00:00:11 -0400|01/Aug/1995:00:00:11|-0400    |
+--------------------------+--------------------+---------+
only showing top 10 rows



In [35]:
from pyspark.sql.functions import unix_timestamp, from_unixtime, date_format

In [36]:

#convert into unix timestamp format
limited_Access_log = limited_Access_log.withColumn('TimeStamp_temp', from_unixtime(unix_timestamp(limited_Access_log['TimeStamp'], 'd/MMM/yyyy:hh:mm:ss')))
  

In [37]:
limited_Access_log.printSchema() #verify the content

root
 |-- content_size: long (nullable = true)
 |-- date_time: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- method: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- requested_url: string (nullable = true)
 |-- response_code: long (nullable = true)
 |-- sep_1: string (nullable = true)
 |-- sep_2: string (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- Time_Zone: string (nullable = true)
 |-- TimeStamp_temp: string (nullable = true)



In [38]:
limited_Access_log.show(5) #verify the content

+------------+--------------------+-----------------+------+--------+--------------------+-------------+-----+-----+--------------------+---------+-------------------+
|content_size|           date_time|       ip_address|method|protocol|       requested_url|response_code|sep_1|sep_2|           TimeStamp|Time_Zone|     TimeStamp_temp|
+------------+--------------------+-----------------+------+--------+--------------------+-------------+-----+-----+--------------------+---------+-------------------+
|        1839|01/Aug/1995:00:00...|in24.inetnebr.com|   GET|HTTP/1.0|/shuttle/missions...|          200|    -|    -|01/Aug/1995:00:00:01|    -0400|1995-08-01 00:00:01|
|           0|01/Aug/1995:00:00...|  uplherc.upl.com|   GET|HTTP/1.0|                   /|          304|    -|    -|01/Aug/1995:00:00:07|    -0400|1995-08-01 00:00:07|
|           0|01/Aug/1995:00:00...|  uplherc.upl.com|   GET|HTTP/1.0|/images/ksclogo-m...|          304|    -|    -|01/Aug/1995:00:00:08|    -0400|1995-08-01 00

In [39]:
#get the date in Date 
limited_Access_log = limited_Access_log.withColumn('Date',date_format(limited_Access_log['TimeStamp_temp'], 'yyyy-mm-dd'))

In [40]:
limited_Access_log.select('Date').show(5)

+----------+
|      Date|
+----------+
|1995-00-01|
|1995-00-01|
|1995-00-01|
|1995-00-01|
|1995-00-01|
+----------+
only showing top 5 rows



In [41]:
#get the Time in Time formt
limited_Access_log = limited_Access_log.withColumn('Time',date_format(limited_Access_log['TimeStamp_temp'], 'HH'))
limited_Access_log.select('Time').show(5)

+----+
|Time|
+----+
|  00|
|  00|
|  00|
|  00|
|  00|
+----+
only showing top 5 rows



In [42]:
#save the data in new dataframe
DateTime_Log = limited_Access_log.select('content_size','Date','Time')

In [43]:
#verfiy the content
DateTime_Log.show(60)

+------------+----------+----+
|content_size|      Date|Time|
+------------+----------+----+
|        1839|1995-00-01|  00|
|           0|1995-00-01|  00|
|           0|1995-00-01|  00|
|           0|1995-00-01|  00|
|           0|1995-00-01|  00|
|        1713|1995-00-01|  00|
|           0|1995-00-01|  00|
|        1687|1995-00-01|  00|
|       11853|1995-00-01|  00|
|        9202|1995-00-01|  00|
|        3635|1995-00-01|  00|
|        1173|1995-00-01|  00|
|        3047|1995-00-01|  00|
|           0|1995-00-01|  00|
|       10566|1995-00-01|  00|
|        7280|1995-00-01|  00|
|        5866|1995-00-01|  00|
|        2743|1995-00-01|  00|
|        6849|1995-00-01|  00|
|       14897|1995-00-01|  00|
|           0|1995-00-01|  00|
|           0|1995-00-01|  00|
|           0|1995-00-01|  00|
|           0|1995-00-01|  00|
|        1204|1995-00-01|  00|
|        8083|1995-00-01|  00|
|        1713|1995-00-01|  00|
|        4324|1995-00-01|  00|
|        1173|1995-00-01|  00|
|       

In [44]:
DateTime_Log.printSchema()  #verify the schema

root
 |-- content_size: long (nullable = true)
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)



In [45]:
from pyspark.sql import functions as func

In [47]:
#Quiz:3 Write spark code to find out 5 time frames of maximum traffic

In [48]:
# grouping by date and time(taken hours only) and adding the content size 
# note since data taken less only two day it visible 
DateTime_Log.groupBy('Date','Time').sum("content_size").orderBy('sum(content_size)', ascending=False).show(5)

+----------+----+-----------------+
|      Date|Time|sum(content_size)|
+----------+----+-----------------+
|1995-00-01|  00|           179718|
|1995-01-01|  00|            58013|
+----------+----+-----------------+



In [50]:
#Quiz 4:Write spark code to find out 5 time frames of least traffic

In [51]:
DateTime_Log.groupBy('Date','Time').sum("content_size").orderBy('sum(content_size)', ascending=True).show()

+----------+----+-----------------+
|      Date|Time|sum(content_size)|
+----------+----+-----------------+
|1995-01-01|  00|            58013|
|1995-00-01|  00|           179718|
+----------+----+-----------------+



In [53]:
#Quiz-5 Write spark code to find out unique HTTP codes returned by the server along with count (this information is hel

In [54]:
limited_Access_log.groupBy('response_code').count().orderBy('count', ascending=False).show(5)

+-------------+-----+
|response_code|count|
+-------------+-----+
|          200|   49|
|          304|   10|
|          302|    1|
+-------------+-----+

