# version 2020.11.16
#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# **基于 Spark 进行Web服务器日志分析**
 
#### 通过这个作业，大家可以发现，利用Spark进行Web服务器日志分析并不是一件很困难的事情。
 
#### 服务器日志分析一般被认为是Spark的经典应用案例之一。这是因为，服务器日志通常是一种数据量较大的常见数据源，而且它包含了丰富的信息。Spark让我们能够方便地对它们进行分析。日志数据具有很多用途，例如服务器监控、帮助构建推荐系统、欺诈检测、商业智能等。

### 说明与提示 
 
#### 我们使用Spark来处理日志时，是对数据以一种并行、分布式的方式进行编程的。因此，我们会使用各种各样的 [RDD](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD) 以及 [`lambda` functions](https://docs.python.org/3/tutorial/controlflow.html#lambda-expressions) 。在一个分布式的Spark 集群中，它们事实上是运行在每个worker进程上的。
 
#### 作业包括4部分:
#### *Part 1*: Apache Web 服务器日志文件的格式
#### *Part 2*: 示例
#### *Part 3*: 服务器日志文件分析
#### *Part 4*: 探索 404 响应码

### **Part 1: Apache Web 服务器日志文件的格式**
#### 本作业使用的日志文件来自于 [Apache Common Log Format (CLF)](http://httpd.apache.org/docs/1.3/logs.html#common). 其中的记录如下所示:
`127.0.0.1 - - [01/Aug/1995:00:00:01 -0400] "GET /images/launch-logo.gif HTTP/1.0" 200 1839`
 
#### 下面我们来看看这个记录中的每部分都表示什么意思

* `127.0.0.1`
#### 这是向服务器发送请求的客户端（远程主机）的IP地址（或主机名，如果有的话），表明访问网站的是哪个机器

 
* `-`
####  接下来的英文"连接符（hyphen）" 表示请求信息（远端用户名，如email等）未知


* `-`
#### 再接下来的英文"连接符hyphen" 标识请求信息（远端登录名）未知


* `[01/Aug/1995:00:00:01 -0400]`
#### 服务器完成请求处理的时间，格式为:
`[day/month/year:hour:minute:second timezone]`
  * day = 2 digits
  * month = 3 letters
  * year = 4 digits
  * hour = 2 digits
  * minute = 2 digits
  * second = 2 digits
  * zone = (\+ | \-) 4 digits
 

* `"GET /images/launch-logo.gif HTTP/1.0"`
#### 方法+端点+协议：服务器收到的是一个什么样的请求。该项信息的典型格式是“METHOD ENDPOINT PROTOCOL”，即“方法 端点 协议”。方法通常为HTTP方法 (例如, `GET`, `POST`等), 端点资源 ( [Uniform Resource Identifier](http://en.wikipedia.org/wiki/Uniform_resource_identifier)), 以及客户端协议版本号
 

* `200`
#### 请求是否成功，或者遇到了什么样的错误。大多数时候，这项值是200，它表示服务器已经成功地响应浏览器的请求，一切正常。 该文档有完整的状态码定义 ([RFC 2616](https://www.ietf.org/rfc/rfc2616.txt) section 10).

 
* `1839`
#### 发送字节数。表示发送给客户端的总字节数。它告诉我们传输是否被打断（该数值是否和文件的大小相同）。把日志记录中的这些值加起来就可以得知服务器在一天、一周或者一月内发送了多少数据。如果没有数据从客户端发送过来，此项为 "-" (有时为 0).
 
请注意，日志文件包含客户端直接提供的信息，没有转义。因此，恶意客户有可能在日志文件中插入控制字符，*所以在处理原始日志时必须小心。*
 
### NASA-HTTP Web 服务器日志
这个作业使用的Web服务器日志数据来自于位于美国弗罗里达州的 NASA肯尼迪空间中心，原始数据集是可以免费获取的 (ftp://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html)，包含了1995年7、8两个月的HTTP请求日志。这个作业所使用的是其中的一小部分，只包含了几天的数据。

### **(1a) 对日志中的每一行记录进行解析**
我们创建了一个正则表达式来抽取一条日志记录中的9个字段。[`search` function](https://docs.python.org/3/library/re.html#regular-expression-objects). 函数返回一个键值对（一个Row对象，1）；如果日志记录不能匹配给定的正则表达式，则返回一个（该行内容，0）的键值对。日志记录中的连接符'-'我们用0替代。日志中的date字符串我们用`parse_apache_time`函数转换为一个Python的`datetime` 对象。


In [None]:
import re
import datetime

from pyspark.sql import Row

month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,
    'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12}

def parse_apache_time(s):
    """ Convert Apache time format into a Python datetime object
    Args:
        s (str): date and time in Apache time format
    Returns:
        datetime: datetime object (ignore timezone for now)
    """
    return datetime.datetime(int(s[7:11]),
                             month_map[s[3:6]],
                             int(s[0:2]),
                             int(s[12:14]),
                             int(s[15:17]),
                             int(s[18:20]))


def parseApacheLogLine(logline):
    """ Parse a line in the Apache Common Log format
    Args:
        logline (str): a line of text in the Apache Common Log format
    Returns:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """
    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)
    if match is None:
        return (logline, 0)
    size_field = match.group(9)
    if size_field == '-':
        size = int(0)
    else:
        size = int(match.group(9))
    return (Row(
        host          = match.group(1),
        client_identd = match.group(2),
        user_id       = match.group(3),
        date_time     = parse_apache_time(match.group(4)),
        method        = match.group(5),
        endpoint      = match.group(6),
        protocol      = match.group(7),
        response_code = int(match.group(8)),
        content_size  = size
    ), 1)

In [None]:
# A regular expression pattern to extract fields from the log line
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'

### **(1b) 配置和初始RDD创建**

我们已经准备好指定输入的日志文件，并创建一个包含解析日志文件数据的 RDD。请确保日志文件（apache.access.log.PROJECT）在本作业所在目录的data子目录中。
 
为了创建将在本作业的其余部分中使用的主RDD，我们首先使用 [`sc.textfile(logFile)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.textFile)加载文本文件，将文件的每一行转换成RDD中的元素。 
然后我们使用 [`map(parseApacheLogLine)`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.map) 作用到该RDD的每个元素（即日志文件中的一行记录）上，将每一行记录转换为一个键值对[`Row` object](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Row).
最后，我们将该RDD缓存到内存中，因为我们将在后面的代码中再次使用它。

In [None]:
import sys
import os
from test_helper import Test

baseDir = os.path.join('data')
inputPath = os.path.join('apache.access.log.PROJECT')
logFile = os.path.join(baseDir, inputPath)

def parseLogs():
    """ Read and parse log file """
    parsed_logs = (sc
                   .textFile(logFile)
                   .map(parseApacheLogLine)
                   .cache())

    access_logs = (parsed_logs
                   .filter(lambda s: s[1] == 1)
                   .map(lambda s: s[0])
                   .cache())

    failed_logs = (parsed_logs
                   .filter(lambda s: s[1] == 0)
                   .map(lambda s: s[0]))
    failed_logs_count = failed_logs.count()
    if failed_logs_count > 0:
        print('Number of invalid logline: %d' % failed_logs.count())
        for line in failed_logs.take(20):
            print('Invalid logline: %s' % line)

    print('Read %d lines, successfully parsed %d lines, failed to parse %d lines' 
          % (parsed_logs.count(), access_logs.count(), failed_logs.count()))
    return parsed_logs, access_logs, failed_logs


parsed_logs, access_logs, failed_logs = parseLogs()

### **(1c) 数据清洗**
请注意，通常在日志文件中有大量的日志行存在错误，并不能正确解析。因此，我们需要检查无效行的样本，并将它们与正确解析的行进行比较，下面是一个例子。根据你的观察，修改下面的`APACHE_ACCESS_LOG_PATTERN`正则表达式，使失败的行能正确解析，并按`Shift-Enter`重新运行`parseLogs()`。

 
`127.0.0.1 - - [01/Aug/1995:00:00:01 -0400] "GET /images/launch-logo.gif HTTP/1.0" 200 1839`
 
如果你想熟悉正则表达式 [`search` function](https://docs.python.org/3/library/re.html#regular-expression-objects), 现在是一个很好的机会重新温习 [documentation](https://developers.google.com/edu/python/regular-expressions)或者 [documentation](https://www.liaoxuefeng.com/wiki/1016959663602400/1017639890281664). 小提示，你可以使用一些在线的Python正则表达式测试器，如 http://pythex.org 或http://www.pythonregex.com. 在这些网站上，我们可以复制并粘贴下面的正则表达式字符串（位于单引号''之间），并用上面的 "无效日志记录 "之一进行测试。

In [None]:

# This was originally '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)" (\d{3}) (\S+)'
APACHE_ACCESS_LOG_PATTERN = '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+)\s*(\S*)\s*" (\d{3}) (\S+)'

parsed_logs, access_logs, failed_logs = parseLogs()

In [None]:
# TEST Data cleaning (1c)
Test.assertEquals(failed_logs.count(), 0, 'incorrect failed_logs.count()')
Test.assertEquals(parsed_logs.count(), 1043177 , 'incorrect parsed_logs.count()')
Test.assertEquals(access_logs.count(), parsed_logs.count(), 'incorrect access_logs.count()')

### **Part 2: 示例**
 
现在，我们已经有了一个包含日志文件的RDD，作为一组Row对象，我们可以对它进行各种分析。
 
#### **(2a) 示例: 内容大小统计**

让我们计算一些关于Web服务器返回的内容大小的统计数据。特别是，我们想知道内容的平均、最小和最大尺寸是多少

我们可以通过对 "access_logs "RDD应用 "map "来计算统计数据。我们希望map的 "lambda "函数是从RDD中提取 "content_size "字段。map产生一个新的RDD，只包含`content_sizes`（在`access_logs`RDD中每个Row对象中的一个元素）。为了计算最小和最大统计量，我们可以在新的RDD上使用[`min()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.min)和[`max()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.max)函数。我们可以通过使用[`reduce`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduce)函数和一个`lambda`函数来计算平均统计量。`lambda`函数将两个输入迭代累加起来，它的两个输入代表了新RDD中被一起reduce的两个元素。`reduce()`的结果是来自日志的总内容大小。将它除以使用[`count()`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.count)函数对新RDD确定的请求数，即为平均值。

In [None]:
# Calculate statistics based on the content size.
content_sizes = access_logs.map(lambda log: log.content_size).cache()
print('Content Size Avg: %i, Min: %i, Max: %s' % (
    content_sizes.reduce(lambda a, b : a + b) / content_sizes.count(),
    content_sizes.min(),
    content_sizes.max()))

#### **(2b) 示例: 响应码分析**
接下来，让我们看看日志中出现的响应码。与内容大小分析一样，首先我们使用`lambda`函数从`access_logs`RDD中提取`response_code`字段来创建一个新的RDD。这里的不同之处在于，我们将使用一个[pair_tuple](https://docs.python.org/2/tutorial/datastructures.html?highlight=tuple#tuples-and-sequences)，而不仅仅是字段本身。使用由响应码和 1 组成的pair_tuple，可以让我们计算有多少记录具有特定的响应码。使用这个新的 RDD，我们执行 [`reduceByKey`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey) 函数。`reduceByKey`通过将`lambda`函数应用于每个元素，对相同的key，在每个key的基础上执行reduce。我们使用简单的`lambda`函数将两个值相加。然后，我们将得到的RDD缓存起来，并使用[`take`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.take)函数创建一个列表。

In [None]:
# Response Code to Count
responseCodeToCount = (access_logs
                       .map(lambda log: (log.response_code, 1))
                       .reduceByKey(lambda a, b : a + b)
                       .cache())
responseCodeToCountList = responseCodeToCount.take(100)
print('Found %d response codes' % len(responseCodeToCountList))
print('Response Code Counts: %s' % responseCodeToCountList)
assert len(responseCodeToCountList) == 7
assert sorted(responseCodeToCountList) == [(200, 940847), (302, 16244), (304, 79824), (403, 58), (404, 6185), (500, 2), (501, 17)]

#### **(2c) 示例: 使用 `matplotlib`绘图**

现在，让我们把上一个例子的结果可视化。我们可以使用[`matplotlib`](http://matplotlib.org/)来可视化上一个例子的结果。首先，我们需要提取图形的标签和数值。我们用两个独立的`map`函数和一个`lambda`函数来完成。第一个`map`函数提取响应码值的列表，第二个`map`函数提取每个响应码计数除以访问日志总大小的列表。接下来，我们用`figure()`构造函数创建一个图，并使用`pie()`方法创建饼图。


In [None]:
labels = responseCodeToCount.map(lambda x_y: x_y[0]).collect()
print(labels)
count = access_logs.count()
fracs = responseCodeToCount.map(lambda x_y: (float(x_y[1]) / count)).collect()
print(fracs)

In [None]:
import matplotlib.pyplot as plt


def pie_pct_format(value):
    """ Determine the appropriate format string for the pie chart percentage label
    Args:
        value: value of the pie slice
    Returns:
        str: formated string label; if the slice is too small to fit, returns an empty string for label
    """
    return '' if value < 7 else '%.0f%%' % value

fig = plt.figure(figsize=(4.5, 4.5), facecolor='white', edgecolor='white')
colors = ['yellowgreen', 'lightskyblue', 'gold', 'purple', 'lightcoral', 'yellow', 'black']
explode = (0.05, 0.05, 0.1, 0, 0, 0, 0)
patches, texts, autotexts = plt.pie(fracs, labels=labels, colors=colors,
                                    explode=explode, autopct=pie_pct_format,
                                    shadow=False,  startangle=125)
for text, autotext in zip(texts, autotexts):
    if autotext.get_text() == '':
        text.set_text('')  # If the slice is small to fit, don't show a text label
plt.legend(labels, loc=(0.80, -0.1), shadow=True)
pass

#### **(2d) 示例: 频繁访问服务器的主机统计**
让我们看看那些多次访问服务器的主机（例如，超过十次）。与(2b)中的响应码分析一样，首先我们通过使用`lambda`函数从`access_logs`RDD中提取`host`字段来创建一个新的RDD，使用一个由host和1组成的pair_tuple，这将让我们计算特定主机的请求创建了多少记录。使用此RDD，我们执行 "reduceByKey "函数和 "lambda "函数，将两个值相加。然后，我们根据每个主机的访问次数（每个pair_tuple的第二个元素）大于10的情况来过滤结果。接下来，我们通过执行一个`map`与`lambda`函数来提取主机名，该函数返回每个pair的第一个元素。最后，我们从产生的RDD中提取20个元素 -- -- *注意，选择返回哪些元素并不保证是确定的。*


In [None]:
# Any hosts that has accessed the server more than 10 times.
hostCountPairTuple = access_logs.map(lambda log: (log.host, 1))

hostSum = hostCountPairTuple.reduceByKey(lambda a, b : a + b)

hostMoreThan10 = hostSum.filter(lambda s: s[1] > 10)

hostsPick20 = (hostMoreThan10
               .map(lambda s: s[0])
               .take(20))

print('Any 20 hosts that have accessed more then 10 times: %s' % hostsPick20)
# An example: [u'204.120.34.185', u'204.243.249.9', u'slip1-32.acs.ohio-state.edu', u'lapdog-14.baylor.edu', u'199.77.67.3', u'gs1.cs.ttu.edu', u'haskell.limbex.com', u'alfred.uib.no', u'146.129.66.31', u'manaus.bologna.maraut.it', u'dialup98-110.swipnet.se', u'slip-ppp02.feldspar.com', u'ad03-053.compuserve.com', u'srawlin.opsys.nwa.com', u'199.202.200.52', u'ix-den7-23.ix.netcom.com', u'151.99.247.114', u'w20-575-104.mit.edu', u'205.25.227.20', u'ns.rmc.com']

#### **(2e) 示例: 端点资源可视化**
现在，让我们可视化日志中对端点（URI）的点击次数。为了执行这个任务，我们首先通过使用`lambda`函数从`access_logs`RDD中提取`endpoint`字段来创建一个新的RDD，使用由endpoint和1组成的对元组，这将让我们计算特定主机的请求创建了多少记录。使用此RDD，我们执行一个`reduceByKey`函数和一个`lambda`函数，将两个值相加。然后我们将结果进行缓存。
 
接下来我们使用`matplotlib`将结果可视化。我们之前导入了`matplotlib.pyplot`库，所以我们不需要再次导入。我们用`lambda`函数分别执行两个`map`函数。第一个`map`函数提取端点值的列表，第二个`map`函数提取每个端点值的访问列表。接下来，我们用`figure()`构造函数创建一个图，设置图的各种特征(轴线约束、网格线和标签)，用`plot()`方法创建线图。


In [None]:
endpoints = (access_logs
             .map(lambda log: (log.endpoint, 1))
             .reduceByKey(lambda a, b : a + b)
             .cache())
ends = endpoints.map(lambda p : p[0]).collect()
counts = endpoints.map(lambda p : p[1]).collect()

fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.axis([0, len(ends), 0, max(counts)])
plt.grid(b=True, which='major', axis='y')
plt.xlabel('Endpoints')
plt.ylabel('Number of Hits')
plt.plot(counts)
pass

#### **(2f) 示例：热门端点资源**
对于最后一个例子，我们将查看日志中的热门端点（URI）。为了确定它们，我们首先创建一个新的RDD，使用`lambda`函数从`access_logs`RDD中提取`endpoint`字段，使用一个由endpoint和1组成的对元组，这将让我们计算一个特定主机的请求创建了多少记录。使用该RDD，我们执行`reduceByKey`函数与一个`lambda`函数，将两个值相加。然后，我们通过执行一个值为10的[`takeOrdered`](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.takeOrdered)和一个`lambda`函数，将计数(每个pair的第二个元素)乘以-1来提取前十个端点，以创建一个排序列表，最热门的端点在底部。


In [None]:
# Top Endpoints
endpointCounts = (access_logs
                  .map(lambda log: (log.endpoint, 1))
                  .reduceByKey(lambda a, b : a + b))

topEndpoints = endpointCounts.takeOrdered(10, lambda s: -1 * s[1])

print('Top Ten Endpoints: %s' % topEndpoints)
assert topEndpoints == [(u'/images/NASA-logosmall.gif', 59737), (u'/images/KSC-logosmall.gif', 50452), (u'/images/MOSAIC-logosmall.gif', 43890), (u'/images/USA-logosmall.gif', 43664), (u'/images/WORLD-logosmall.gif', 43277), (u'/images/ksclogo-medium.gif', 41336), (u'/ksc.html', 28582), (u'/history/apollo/images/apollo-logo1.gif', 26778), (u'/images/launch-logo.gif', 24755), (u'/', 20292)], 'incorrect Top Ten Endpoints'

### **Part 3: Web服务器日志文件分析**
 
现在轮到你对Web服务器日志文件进行分析了。


#### **(3a) 练习: 前10个出错的端点资源**

前十位没有返回代码200的端点是什么？创建一个包含前十个端点的排序列表，以及它们被访问的非200返回码的次数。

 
思考一下你需要执行的步骤，以确定哪些端点没有200的返回代码，你将如何对这些端点进行计数，并对列表进行排序。

提示：你可能要参考之前的上机作业（Lab 1 Word Count）来获得一些启发。

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# HINT: Each of these <FILL IN> below could be completed with a single transformation or action.
# You are welcome to structure your solution in a different way, so long as
# you ensure the variables used in the next Test section are defined (ie. endpointSum, topTenErrURLs).

not200 = access_logs.<FILL IN>

endpointCountPairTuple = not200.<FILL IN>

endpointSum = endpointCountPairTuple.<FILL IN>

topTenErrURLs = endpointSum.<FILL IN>
print('Top Ten failed URLs: %s' % topTenErrURLs)

In [None]:
# TEST Top ten error endpoints (3a)
Test.assertEquals(endpointSum.count(), 7689, 'incorrect count for endpointSum')
Test.assertEquals(topTenErrURLs, [(u'/images/NASA-logosmall.gif', 8761), (u'/images/KSC-logosmall.gif', 7236), (u'/images/MOSAIC-logosmall.gif', 5197), (u'/images/USA-logosmall.gif', 5157), (u'/images/WORLD-logosmall.gif', 5020), (u'/images/ksclogo-medium.gif', 4728), (u'/history/apollo/images/apollo-logo1.gif', 2907), (u'/images/launch-logo.gif', 2811), (u'/', 2199), (u'/images/ksclogosmall.gif', 1622)], 'incorrect Top Ten failed URLs (topTenErrURLs)')

#### **(3b) 练习: 唯一主机的数量统计**
整个日志中有多少个唯一的主机？
 
想一想，你需要执行哪些步骤来统计日志中不同主机的数量。

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# HINT: Do you recall the tips from (3a)? Each of these <FILL IN> could be an transformation or action.

hosts = access_logs.<FILL IN>

uniqueHosts = hosts.<FILL IN>

uniqueHostCount = uniqueHosts.<FILL IN>
print('Unique hosts: %d' % uniqueHostCount)

In [None]:
# TEST Number of unique hosts (3b)
Test.assertEquals(uniqueHostCount, 54507, 'incorrect uniqueHostCount')

#### **(3c) 练习: 唯一主机按天的统计数字**
作为一个高级练习题，让我们逐日确定整个日志中唯一主机的数量。这个计算将给我们提供每日唯一主机数量的计数。我们希望得到一个按月日递增排序的列表，其中包括该月的哪一天和该日相关的唯一主机数。确保你缓存产生的RDD`dailyHosts`，这样我们就可以在下一个练习中重复使用它。
 
考虑一下你需要执行的步骤，以统计每*天发出请求的不同主机的数量。
*由于该日志只涵盖了一个月，所以你可以忽略月份*。


In [None]:
# TODO: Replace <FILL IN> with appropriate code

#print access_logs.take(1)[0].date_time.date()

dayToHostPairTuple = access_logs.<FILL IN>

dayGroupedHosts = dayToHostPairTuple.<FILL IN>

dayHostCount = dayGroupedHosts.<FILL IN>

dailyHosts = (dayHostCount
              .<FILL IN>)
dailyHostsList = dailyHosts.take(30)
print('Unique hosts per day: %s' % dailyHostsList)

In [None]:
# TEST Number of unique daily hosts (3c)
Test.assertEquals(dailyHosts.count(), 21, 'incorrect dailyHosts.count()')
Test.assertEquals(dailyHostsList, [(1, 2582), (3, 3222), (4, 4190), (5, 2502), (6, 2537), (7, 4106), (8, 4406), (9, 4317), (10, 4523), (11, 4346), (12, 2864), (13, 2650), (14, 4454), (15, 4214), (16, 4340), (17, 4385), (18, 4168), (19, 2550), (20, 2560), (21, 4134), (22, 4456)], 'incorrect dailyHostsList')
Test.assertTrue(dailyHosts.is_cached, 'incorrect dailyHosts.is_cached')

#### **(3d) 练习: 对按天统计的唯一主机数字进行可视化**
使用上一练习结果，使用`matplotlib`绘制一个按天计算的唯一主机请求的 "Line "图。`daysWithHosts`应该是一个天数的列表，`hosts`应该是每天相应的唯一主机数量的列表。


*如何将 RDD 转换为一个列表？参见[`collect()`方法](http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=collect#pyspark.RDD.collect)*。


In [None]:
# TODO: Replace <FILL IN> with appropriate code

daysWithHosts = dailyHosts.<FILL IN>
hosts = dailyHosts.<FILL IN>

In [None]:
# TEST Visualizing unique daily hosts (3d)
test_days = list(range(1, 23))
test_days.remove(2)
Test.assertEquals(daysWithHosts, test_days, 'incorrect days')
Test.assertEquals(hosts, [2582, 3222, 4190, 2502, 2537, 4106, 4406, 4317, 4523, 4346, 2864, 2650, 4454, 4214, 4340, 4385, 4168, 2550, 2560, 4134, 4456], 'incorrect hosts')

In [None]:
fig = plt.figure(figsize=(8,4.5), facecolor='white', edgecolor='white')
plt.axis([min(daysWithHosts), max(daysWithHosts), 0, max(hosts)+500])
plt.grid(b=True, which='major', axis='y')
plt.xlabel('Day')
plt.ylabel('Hosts')
plt.plot(daysWithHosts, hosts)
pass

#### **(3e) 练习 : 每个主机的日均请求数**

接下来，我们来确定每日平均请求数。我们想要一个按日递增的列表，以及每日每台主机的相关平均请求数。确保你缓存了产生的RDD`avgDailyReqPerHost`，这样我们就可以在下一次练习中重复使用它。
要计算每台主机的平均请求数，需要得到所有主机的总请求数，然后除以唯一主机的数量。


*由于日志只涵盖了一个月，所以可以跳过对月份的检查。*


*另外，为了简单起见，在计算近似平均值时，使用整数值--你不需要转换到float*。

In [None]:
# TODO: Replace <FILL IN> with appropriate code

dayAndHostTuple = access_logs.<FILL IN>

groupedByDay = dayAndHostTuple.<FILL IN>

sortedByDay = groupedByDay.<FILL IN>

avgDailyReqPerHost = (sortedByDay
                      .<FILL IN>)

avgDailyReqPerHostList = avgDailyReqPerHost.take(30)
print('Average number of daily requests per Hosts is %s' % avgDailyReqPerHostList)

In [None]:
# TEST Average number of daily requests per hosts (3e)
Test.assertEquals(avgDailyReqPerHostList, [(1, 13), (3, 12), (4, 14), (5, 12), (6, 12), (7, 13), (8, 13), (9, 14), (10, 13), (11, 14), (12, 13), (13, 13), (14, 13), (15, 13), (16, 13), (17, 13), (18, 13), (19, 12), (20, 12), (21, 13), (22, 12)], 'incorrect avgDailyReqPerHostList')
Test.assertTrue(avgDailyReqPerHost.is_cached, 'incorrect avgDailyReqPerHost.is_cache')

#### **(3f) 练习：可视化每个主机的平均每日请求**
使用上一个练习的结果avgDailyReqPerHost，使用matplotlib绘制一个 "线 "图，按天计算每台唯一主机的平均日请求量。

daysWithAvg应该是一个天数的列表，avgs应该是对应该天的每台唯一主机的平均日请求列表。


In [None]:
# TODO: Replace <FILL IN> with appropriate code

daysWithAvg = avgDailyReqPerHost.<FILL IN>
avgs = avgDailyReqPerHost.<FILL IN>

In [None]:
# TEST Average Daily Requests per Unique Host (3f)
Test.assertEquals(daysWithAvg, [1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22], 'incorrect days')
Test.assertEquals(avgs, [13, 12, 14, 12, 12, 13, 13, 14, 13, 14, 13, 13, 13, 13, 13, 13, 13, 12, 12, 13, 12], 'incorrect avgs')

In [None]:
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.axis([0, max(daysWithAvg), 0, max(avgs)+2])
plt.grid(b=True, which='major', axis='y')
plt.xlabel('Day')
plt.ylabel('Average')
plt.plot(daysWithAvg, avgs)
pass

### **Part 4: 探索404响应码**

让我们深入探讨一下错误的404响应码记录。当一个端点资源没有被服务器找到时（即缺少一个页面或对象），就会返回404错误。

#### **(4a) 练习: 404响应码计数**
创建一个RDD，只包含404响应码的日志记录。**确保你`cache()`了RDD`badRecords`，因为我们将在这个练习的其余部分使用它。**
 
日志中有多少条404记录？

In [None]:
# TODO: Replace <FILL IN> with appropriate code

badRecords = (access_logs
              .<FILL IN>
print('Found %d 404 URLs' % badRecords.count())

In [None]:
# TEST Counting 404 (4a)
Test.assertEquals(badRecords.count(), 6185, 'incorrect badRecords.count()')
Test.assertTrue(badRecords.is_cached, 'incorrect badRecords.is_cached')

#### **(4b) 练习: 打印404响应码记录**
使用您在第(4a)部分中缓存的带有404响应码日志记录的RDD，打印出一个产生404错误的最多40个**明确**的端点资源的列表

--*在您的列表中，任何端点都不应出现一次以上。*


In [None]:
# TODO: Replace <FILL IN> with appropriate code

badEndpoints = badRecords.<FILL IN>

badUniqueEndpoints = badEndpoints.<FILL IN>

badUniqueEndpointsPick40 = badUniqueEndpoints.take(40)
print('404 URLS: %s' % badUniqueEndpointsPick40)

In [None]:
# TEST Listing 404 records (4b)

badUniqueEndpointsSet40 = set(badUniqueEndpointsPick40)
Test.assertEquals(len(badUniqueEndpointsSet40), 40, 'badUniqueEndpointsPick40 not distinct')

#### **(4c) 练习：列出前20个返回404响应码最多的端点资源**
使用您在第(4a)部分中缓存的带有404响应码日志记录的RDD，打印出产生404错误最多的前20个端点的列表。


*提醒，打印出的端点应按排序顺序排列*。


In [None]:
# TODO: Replace <FILL IN> with appropriate code

badEndpointsCountPairTuple = badRecords.<FILL IN>

badEndpointsSum = badEndpointsCountPairTuple.<FILL IN>

badEndpointsTop20 = badEndpointsSum.<FILL IN>
print('Top Twenty 404 URLs: %s' % badEndpointsTop20)

In [None]:
# TEST Top twenty 404 URLs (4c)
Test.assertEquals(badEndpointsTop20, [(u'/pub/winvn/readme.txt', 633), (u'/pub/winvn/release.txt', 494), (u'/shuttle/missions/STS-69/mission-STS-69.html', 431), (u'/images/nasa-logo.gif', 319), (u'/elv/DELTA/uncons.htm', 178), (u'/shuttle/missions/sts-68/ksc-upclose.gif', 156), (u'/history/apollo/sa-1/sa-1-patch-small.gif', 146), (u'/images/crawlerway-logo.gif', 120), (u'/://spacelink.msfc.nasa.gov', 117), (u'/history/apollo/pad-abort-test-1/pad-abort-test-1-patch-small.gif', 100), (u'/history/apollo/a-001/a-001-patch-small.gif', 97), (u'/images/Nasa-logo.gif', 85), (u'/shuttle/resources/orbiters/atlantis.gif', 64), (u'/history/apollo/images/little-joe.jpg', 62), (u'/images/lf-logo.gif', 59), (u'/shuttle/resources/orbiters/discovery.gif', 56), (u'/shuttle/resources/orbiters/challenger.gif', 54), (u'/robots.txt', 53), (u'/elv/new01.gif>', 43), (u'/history/apollo/pad-abort-test-2/pad-abort-test-2-patch-small.gif', 38)], 'incorrect badEndpointsTop20')

#### **（4d）练习：列出前25个收到404响应码的主机**
现在我们不看产生404错误的端点，让我们看看接收到404错误的主机。使用您在第(4a)部分中缓存的带有404响应码日志记录的RDD，打印出产生404错误最多的前25个主机的列表。


In [None]:
# TODO: Replace <FILL IN> with appropriate code

errHostsCountPairTuple = badRecords.<FILL IN>

errHostsSum = errHostsCountPairTuple.<FILL IN>

errHostsTop25 = errHostsSum.<FILL IN>
print('Top 25 hosts that generated errors: %s' % errHostsTop25)

In [None]:
# TEST Top twenty-five 404 response code hosts (4d)

Test.assertEquals(len(errHostsTop25), 25, 'length of errHostsTop25 is not 25')
Test.assertEquals(len(set(errHostsTop25) - set([(u'maz3.maz.net', 39), (u'piweba3y.prodigy.com', 39), (u'gate.barr.com', 38), (u'm38-370-9.mit.edu', 37), (u'ts8-1.westwood.ts.ucla.edu', 37), (u'nexus.mlckew.edu.au', 37), (u'204.62.245.32', 33), (u'163.206.104.34', 27), (u'spica.sci.isas.ac.jp', 27), (u'www-d4.proxy.aol.com', 26), (u'www-c4.proxy.aol.com', 25), (u'203.13.168.24', 25), (u'203.13.168.17', 25), (u'internet-gw.watson.ibm.com', 24), (u'scooter.pa-x.dec.com', 23), (u'crl5.crl.com', 23), (u'piweba5y.prodigy.com', 23), (u'onramp2-9.onr.com', 22), (u'slip145-189.ut.nl.ibm.net', 22), (u'198.40.25.102.sap2.artic.edu', 21), (u'gn2.getnet.com', 20), (u'msp1-16.nas.mr.net', 20), (u'isou24.vilspa.esa.es', 19), (u'dial055.mbnet.mb.ca', 19), (u'tigger.nashscene.com', 19)])), 0, 'incorrect errHostsTop25')

#### **（4e）练习：按天列出404响应码记录**
让我们从时间上探索404记录。将404按天分解（`cache()` RDD`errDateSorted`），并以列表的形式得到按天排序的每日计数。


*由于该日志只涵盖了一个月份，所以在检查时可以忽略月份*。



In [None]:
# TODO: Replace <FILL IN> with appropriate code

errDateCountPairTuple = badRecords.<FILL IN>

errDateSum = errDateCountPairTuple.<FILL IN>

errDateSorted = (errDateSum
                 .<FILL IN>)

errByDate = errDateSorted.<FILL IN>
print('404 Errors by day: %s' % errByDate)

In [None]:
# TEST 404 response codes per day (4e)
Test.assertEquals(errByDate, [(1, 243), (3, 303), (4, 346), (5, 234), (6, 372), (7, 532), (8, 381), (9, 279), (10, 314), (11, 263), (12, 195), (13, 216), (14, 287), (15, 326), (16, 258), (17, 269), (18, 255), (19, 207), (20, 312), (21, 305), (22, 288)], 'incorrect errByDate')
Test.assertTrue(errDateSorted.is_cached, 'incorrect errDateSorted.is_cached')

#### **(4f)练习：按天对404响应码进行可视化**
使用上个练习的结果，用`matplotlib`来绘制404响应码按天统计的 "线图"或 "柱状图"。


In [None]:
# TODO: Replace <FILL IN> with appropriate code

daysWithErrors404 = errDateSorted.<FILL IN>
errors404ByDay = errDateSorted.<FILL IN>

In [None]:
# TEST Visualizing the 404 Response Codes by Day (4f)
Test.assertEquals(daysWithErrors404, [1, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22], 'incorrect daysWithErrors404')
Test.assertEquals(errors404ByDay, [243, 303, 346, 234, 372, 532, 381, 279, 314, 263, 195, 216, 287, 326, 258, 269, 255, 207, 312, 305, 288], 'incorrect errors404ByDay')

In [None]:
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.axis([0, max(daysWithErrors404), 0, max(errors404ByDay)])
plt.grid(b=True, which='major', axis='y')
plt.xlabel('Day')
plt.ylabel('404 Errors')
plt.plot(daysWithErrors404, errors404ByDay)
pass

#### **（4g）练习：前五个404响应码最多的日期**
使用你在（4e）中缓存的RDD`errDateSorted`，404响应码最多的前五个日期是几号？它们分别对应的404响应码计数是多少？


In [None]:
# TODO: Replace <FILL IN> with appropriate code

topErrDate = errDateSorted.<FILL IN>
print('Top Five dates for 404 requests: %s' % topErrDate)

In [None]:
# TEST Five dates for 404 requests (4g)
Test.assertEquals(topErrDate, [(7, 532), (8, 381), (6, 372), (4, 346), (15, 326)], 'incorrect topErrDate')

#### **(4h)练习：按小时统计404响应码**
使用你在(4a)部分中缓存的RDD "badRecords"，并按一天中的小时和递增顺序，创建一个RDD，其中包含一天中每个小时有多少请求有404返回代码（午夜从0开始）。缓存产生的RDD hourRecordsSorted，并将其打印为一个列表。

In [None]:
# TODO: Replace <FILL IN> with appropriate code

hourCountPairTuple = badRecords.<FILL IN>

hourRecordsSum = hourCountPairTuple.<FILL IN>

hourRecordsSorted = (hourRecordsSum
                     .<FILL IN>)

errHourList = hourRecordsSorted.<FILL IN>
print('Top hours for 404 requests: %s' % errHourList)

In [None]:
# TEST Hourly 404 response codes (4h)
Test.assertEquals(errHourList, [(0, 175), (1, 171), (2, 422), (3, 272), (4, 102), (5, 95), (6, 93), (7, 122), (8, 199), (9, 185), (10, 329), (11, 263), (12, 438), (13, 397), (14, 318), (15, 347), (16, 373), (17, 330), (18, 268), (19, 269), (20, 270), (21, 241), (22, 234), (23, 272)], 'incorrect errHourList')
Test.assertTrue(hourRecordsSorted.is_cached, 'incorrect hourRecordsSorted.is_cached')

#### **（4i）练习：按小时对404响应码进行可视化**
使用上个练习的结果，用`matplotlib`绘制按小时404响应码统计数字的 "线 "或 "柱状 "图。

In [None]:
# TODO: Replace <FILL IN> with appropriate code

hoursWithErrors404 = hourRecordsSorted.<FILL IN>
errors404ByHours = hourRecordsSorted.<FILL IN>

In [None]:
# TEST Visualizing the 404 Response Codes by Hour (4i)
Test.assertEquals(hoursWithErrors404, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23], 'incorrect hoursWithErrors404')
Test.assertEquals(errors404ByHours, [175, 171, 422, 272, 102, 95, 93, 122, 199, 185, 329, 263, 438, 397, 318, 347, 373, 330, 268, 269, 270, 241, 234, 272], 'incorrect errors404ByHours')

In [None]:
fig = plt.figure(figsize=(8,4.2), facecolor='white', edgecolor='white')
plt.axis([0, max(hoursWithErrors404), 0, max(errors404ByHours)])
plt.grid(b=True, which='major', axis='y')
plt.xlabel('Hour')
plt.ylabel('404 Errors')
plt.plot(hoursWithErrors404, errors404ByHours)
pass