# Web Server Log Analysis

## Outline
### History 
I started this project at 2015. That time, with Spark 1.4,  the RDD was the only structure to use. The main focus of the project was exploring the basic functionality of pypark and possibilities of exploratory analysis for distributed data.
The data used in 2015 [http://ita.ee.lbl.gov/html/contrib/Calgary-HTTP.html](http://ita.ee.lbl.gov/html/contrib/Calgary-HTTP.html) do not exist anymore. 

### Focus in 2022
I found many logs at [https://www.sec.gov/dera/data/edgar-log-file-data-set.html](https://www.sec.gov/dera/data/edgar-log-file-data-set.html)
Here I take the 2003 data, because the file is relatively small, about 3-4M. In 2017 the file size is about 300-400M.

Now the project has a new focus. This Jupyter Notebok and production scripts for batch processing serve as investigation steps. The main goal is stream processing of logs wiht Spark Streams and scheduling with Airflow. 

### Why RDD und Spark Streams?
For historical reasons I decided to keep the project focused on RDDs and Spark Streams. They are suitable for unstructured data. The processing described here will bring the structure to those.

For initially sructured data the better tools are DataFrames and StructuredStreams. For those I will find a new playground.


## Collecting data

### Read the file from URL and write it on hard drive

In 2015 I assumed that the log file is already downloaded and in the current directory as TXT file.
This I load ZIP from url and unzip locally.

In [1]:
import io, os, requests, zipfile
from pyspark.sql import Row
from datetime import datetime

In [11]:
date = "20030303"
year = date[:4]
Qtr = (int(date[4:6])-1)//3+1; print('Qtr', Qtr)
#
data_directory = './data/'
csv_file = os.path.join(data_directory, f'log{date}.csv')
print(csv_file)

Qtr 1
./data/log20030303.csv


In [3]:
if not os.path.exists(csv_file):
    zip_file_url = f'http://www.sec.gov/dera/data/Public-EDGAR-log-file-data/{year}/Qtr{Qtr:d}/log{date}.zip'
    r = requests.get(zip_file_url)
    z = zipfile.ZipFile(io.BytesIO(r.content))
    z.extractall(data_directory)
else:
    pass  # The file is already there

### Read the file into RDD

In Jupyter notebook the SparkSession `spark` and the SparkContext `sc` are created for us

In [4]:
spark

In [5]:
sc

We create "logFileRDD" of the file by reading it as a collection of lines. For a start we have a look at the first several lines

In [19]:
assert os.path.exists(csv_file)
logFileRDD = sc.textFile(csv_file, 4).cache()
#logFileRDD = spark.read.option("header", "true").format("csv")
logFileRDD.take(5)

['ip,date,time,zone,cik,accession,extention,code,size,idx,norefer,noagent,find,crawler,browser',
 '208.252.214.jbf,2003-03-03,00:00:00,500.0,919642.0,0000891836-02-000291,-index.htm,304.0,,1.0,0.0,0.0,1.0,0.0,mie',
 '66.48.138.ach,2003-03-03,00:00:02,500.0,1084230.0,0001104659-03-003192,.txt,200.0,330570.0,0.0,1.0,0.0,0.0,0.0,',
 '209.172.247.haf,2003-03-03,00:00:04,500.0,78003.0,0000914121-02-001461,pf121702-8k.txt,200.0,2527.0,0.0,0.0,0.0,9.0,0.0,win',
 '32.100.133.aej,2003-03-03,00:00:05,500.0,772197.0,0000950144-02-004587,g75920def14a.txt,200.0,74288.0,0.0,0.0,0.0,9.0,0.0,win']

The file has a header now, which should be stripped. The format of the file is diffrent with 15 fields (variables). 

In [20]:
header = logFileRDD.first() #extract header
logFileRDD = logFileRDD.filter(lambda row: row != header)

## Parsing

### Read docuemntation for expected format


In [9]:
from IPython.display import IFrame
IFrame("images/EDGAR_variables_FINAL.pdf", width=800, height=1050)

We briefly summarize the content of the fields
1. ip
2. date - Apache log file date (yyyy-mm-dd)
3. time - Apache log file time (hh:mm:ss)
4. zone - Apache log file zone
5. cik - SEC Central Index Key (CIK) associated with the document requested
6. accession - accession SEC document accession number associated with the document requested
7. extention - filename
8. code - Apache log file status code for the request
9. size - document file size
10. idx - takes on a value of 1 if the requester landed on the index page of a set of documents
11. norefer
12. noagent
13. find
14. crawler
15. browser

Let us parse each line of our RDD accordingly

### Construct regular expression for parsing

the common regexp patterns can be found here [https://regexpattern.com/date-time/](https://regexpattern.com/date-time/) 

In [51]:
# A regular expression pattern to extract fields from the log line
re_ip = '\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}'
re_date = '\d{4}\-\d{2}\-\d{2}'
re_time = '\d{2}:\d{2}:\d{2}'
re_zone = '\d+\.?\d*'
re_cik = '\d+\.?\d*'
re_accession = '[\w\-]+'
re_filename = '[\w\-\.]+'
re_code = '\d+\.?\d*'
re_size = '\d+\.?\d*'
re_idx = '[01]\.?0?' 
re_norefer = '[01]\.?0?'
re_noagent = '[01]\.?0?' 
re_find = '1?[0-9]\.?0?'
re_crawler = '[01]\.?0?' 
re_browser = '[a-z]{3}'
#
LOG_PATTERN_EDGAR = f'^({re_ip:s}),({re_date:s}),({re_time:s}),({re_zone:s}),({re_cik:s}),\
({re_accession:s}),({re_filename:s}),({re_code:s}),({re_size:s}),({re_idx:s}),\
({re_norefer:s}),({re_noagent:s}),({re_find:s}),({re_crawler:s}),({re_browser:s})$'
print(LOG_PATTERN_EDGAR)
pattern=re.compile(LOG_PATTERN_EDGAR)

^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}),(\d{4}\-\d{2}\-\d{2}),(\d{2}:\d{2}:\d{2}),(\d+\.?\d*),(\d+\.?\d*),([\w\-]+),([\w\-\.]+),(\d+\.?\d*),(\d+\.?\d*),([01]\.?0?),([01]\.?0?),([01]\.?0?),(1?[0-9]\.?0?),([01]\.?0?),([a-z]{3})$


In [52]:
# answer from regex101.com
# ^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}),(\d{4}\-\d{2}\-\d{2}),(\d{2}:\d{2}:\d{2}),(\d+\.?\d*),(\d+\.?\d*),([\w\-]+),([\w\-\.]+),(\d+\.?\d*),(\d+\.?\d*),([01]\.?0?),([01]\.?0?),([01]\.?0?),(1?[0-9]\.?0?),([01]\.?0?),([a-z]{3})$

In [54]:
# check correct pattern for datetime - we neglect the time zone shift for the moment
#import datetime
#stri = "24/Oct/1994:13:41:41 -0600"
#dt = datetime.datetime.strptime(stri[:20], "%d/%b/%Y:%H:%M:%S")
#print(dt)
from datetime import datetime
datetime.strptime('2003-10-03', "%Y-%m-%d").date()
datetime.strptime('14:02:13', "%H:%M:%S").time()

datetime.time(14, 2, 13)

We will parse each text line into `pyspark.sql.Row`

In [73]:
def parseApacheLogLine(logline):
    """ Parse a line in the Apache Common Log format
    Inputs:
        logline (str): a line of text in the Apache Common Log format
    Outputs:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """
    match = re.search(pattern, logline)
    if match is None:   # failed match
        #print('failed  ', logline)
        return (logline, 0)
    
    #size_field = match.group(9)
    #if size_field == '-':
    #    size = float(0)
    #else:
    #    size = float(match.group(9))
    
    parsed_row = Row(
        ip          = match.group(1),
        date = datetime.strptime(match.group(2), "%Y-%m-%d").date(),
        time       = datetime.strptime(match.group(3), "%H:%M:%S").time(),
        zone        = match.group(4),
        cik      = match.group(5),
        accesion      = match.group(6),
        filename = match.group(7),
        response_code = int(float(match.group(8))),
        content_size  = match.group(9), # size,
        idx = bool(match.group(10)),
        browser = match.group(15)
    )
    #print('parsed   ', logline)
    
    return (parsed_row, 1)

We will keep track of correctly parsed and failed logs are returned those as two RDDs

In [74]:
def access_fail_logs(parsed_logs):
    """ Read and parse log file, print a 20-sample of failed log-lines
    Inputs:
        parsed_logs (RDD): an RDD obtained via parseApacheLogLine(...)
    Outputs:
        tuple of RDDs: access_logs, failed_logs
    """
    access_logs = (parsed_logs
                   .filter(lambda s: s[1] == 1)
                   .map(lambda s: s[0])
                   .cache())

    failed_logs = (parsed_logs
                   .filter(lambda s: s[1] == 0)
                   .map(lambda s: s[0]))
    failed_logs_count = failed_logs.count()
    if failed_logs_count > 0:
        print(f'Number of invalid logline: {failed_logs.count():d}')
        for line in failed_logs.take(20):
            print(f'Invalid logline: {line}')

    print(f'Read {parsed_logs.count():d} lines, successfully parsed { access_logs.count():d} lines, \
           failed to parse {failed_logs.count():d} lines')
    return access_logs, failed_logs

Now that our functions are ready let us start with parsing lines

In [75]:
#logFileRDD.take(3)

['208.252.214.jbf,2003-03-03,00:00:00,500.0,919642.0,0000891836-02-000291,-index.htm,304.0,,1.0,0.0,0.0,1.0,0.0,mie',
 '66.48.138.ach,2003-03-03,00:00:02,500.0,1084230.0,0001104659-03-003192,.txt,200.0,330570.0,0.0,1.0,0.0,0.0,0.0,',
 '209.172.247.haf,2003-03-03,00:00:04,500.0,78003.0,0000914121-02-001461,pf121702-8k.txt,200.0,2527.0,0.0,0.0,0.0,9.0,0.0,win']

In [76]:
#parseApacheLogLine('209.172.247.haf,2003-03-03,00:00:04,500.0,78003.0,0000914121-02-001461,pf121702-8k.txt,200.0,2527.0,0.0,0.0,0.0,9.0,0.0,win')

(Row(ip='209.172.247.haf', date=datetime.date(2003, 3, 3), time=datetime.time(0, 0, 4), zone='500.0', cik='78003.0', accesion='0000914121-02-001461', filename='pf121702-8k.txt', response_code=200, content_size=2527.0, idx=True, browser='win'),
 1)

In [77]:
parsedLogsRDD = logFileRDD.map(parseApacheLogLine).cache()

In [78]:
#parsedLogsRDD.take(1)

[('208.252.214.jbf,2003-03-03,00:00:00,500.0,919642.0,0000891836-02-000291,-index.htm,304.0,,1.0,0.0,0.0,1.0,0.0,mie',
  0)]

In [79]:
accessLogsRDD, failedLogsRDD = access_fail_logs(parsedLogsRDD)

Number of invalid logline: 48160
Invalid logline: 208.252.214.jbf,2003-03-03,00:00:00,500.0,919642.0,0000891836-02-000291,-index.htm,304.0,,1.0,0.0,0.0,1.0,0.0,mie
Invalid logline: 66.48.138.ach,2003-03-03,00:00:02,500.0,1084230.0,0001104659-03-003192,.txt,200.0,330570.0,0.0,1.0,0.0,0.0,0.0,
Invalid logline: 208.252.214.jbf,2003-03-03,00:00:07,500.0,919642.0,0000891836-02-000291,sc0167-02d.txt,304.0,,0.0,0.0,0.0,9.0,0.0,mie
Invalid logline: 66.48.138.ach,2003-03-03,00:00:21,500.0,1004980.0,0001004980-03-000014,.txt,200.0,354518.0,0.0,1.0,0.0,0.0,0.0,
Invalid logline: 66.48.138.ach,2003-03-03,00:01:05,500.0,78239.0,0000950136-03-000445,.txt,200.0,476664.0,0.0,1.0,0.0,0.0,0.0,
Invalid logline: 149.77.10.fed,2003-03-03,00:01:10,500.0,789019.0,0001032210-02-001614,-index.htm,304.0,,1.0,0.0,0.0,1.0,0.0,win
Invalid logline: 66.48.138.ach,2003-03-03,00:01:14,500.0,810136.0,0000810136-03-000008,.txt,200.0,85869.0,0.0,1.0,0.0,0.0,0.0,
Invalid logline: 149.77.10.fed,2003-03-03,00:01:16,500.0,789

There are many lines that failed to parse. Some of then are because of missing content_size and other because of the missing browser. 

Before dealing with this issue, let us check if parsed results are meaningful.

### Explore parsed data

How can we be sure that our parsing delivered meaningful results? Let us have a look at the unique values of the "method", "response code", and "protocol". 

In [85]:
def distinct_responsecodes_browsers(accessLogsRDD):
    """
    Prints distinct values for  response codes and browsers
    Inputs:
        accessLogsRDD 
    """
    ResponseCodesRDD = accessLogsRDD.map(lambda log: log.response_code)
    uniqueResponseCodesRDD = ResponseCodesRDD.distinct()
    print("Response codes are", sorted(uniqueResponseCodesRDD.collect()))
    BrowserRDD = accessLogsRDD.map(lambda log: log.browser)
    uniqueBrowserRDD = BrowserRDD.distinct()
    print("Browsers are", sorted(uniqueBrowserRDD.collect()))
    #return MethodsRDD, ResponseCodesRDD, ProtocolsRDD
    return None

distinct_responsecodes_browsers(accessLogsRDD)

Response codes are [200, 206, 302, 404]
Browsers are ['iem', 'lin', 'mac', 'mie', 'opr', 'win']


## Adjust parsing


In [92]:
# A regular expression pattern to extract fields from the log line
re_ip = '\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}'
re_date = '\d{4}\-\d{2}\-\d{2}'
re_time = '\d{2}:\d{2}:\d{2}'
re_zone = '\d+\.?\d*'
re_cik = '\d+\.?\d*'
re_accession = '[\w\-]+'
re_filename = '[\w\-\.]+'
re_code = '\d+\.?\d*'
re_size = '\d*\.?\d*'
re_idx = '[01]\.?0?' 
re_norefer = '[01]\.?0?'
re_noagent = '[01]\.?0?' 
re_find = '1?[0-9]\.?0?'
re_crawler = '[01]\.?0?' 
re_browser = '[a-z]{0,3}'
#
LOG_PATTERN_EDGAR = f'^({re_ip:s}),({re_date:s}),({re_time:s}),({re_zone:s}),({re_cik:s}),\
({re_accession:s}),({re_filename:s}),({re_code:s}),({re_size:s}),({re_idx:s}),\
({re_norefer:s}),({re_noagent:s}),({re_find:s}),({re_crawler:s}),({re_browser:s})$'
print(LOG_PATTERN_EDGAR)
pattern=re.compile(LOG_PATTERN_EDGAR)

^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}),(\d{4}\-\d{2}\-\d{2}),(\d{2}:\d{2}:\d{2}),(\d+\.?\d*),(\d+\.?\d*),([\w\-]+),([\w\-\.]+),(\d+\.?\d*),(\d*\.?\d*),([01]\.?0?),([01]\.?0?),([01]\.?0?),(1?[0-9]\.?0?),([01]\.?0?),([a-z]{0,3})$


In [102]:
def parseApacheLogLine(logline):
    """ Parse a line in the Apache Common Log format
    Inputs:S
        logline (str): a line of text in the Apache Common Log format
    Outputs:
        tuple: either a dictionary containing the parts of the Apache Access Log and 1,
               or the original invalid log line and 0
    """
    match = re.search(pattern, logline)
    if match is None:   # failed match
        print('failed  ', logline)
        return (logline, 0)
    
    size_field = match.group(9)
    if size_field:
        size = float(match.group(9))
    else:    
        size = float(0)
        
    browser_field = match.group(15)
    if browser_field:
        browser = browser_field
    else:
        browser = 'not_found'
    
    parsed_row = Row(
        ip          = match.group(1),
        date = datetime.strptime(match.group(2), "%Y-%m-%d").date(),
        time       = datetime.strptime(match.group(3), "%H:%M:%S").time(),
        zone        = match.group(4),
        cik      = match.group(5),
        accesion      = match.group(6),
        filename = match.group(7),
        response_code = int(float(match.group(8))),
        content_size  = size,
        idx = bool(match.group(10)),
        browser = browser
    )
    print('parsed   ', logline)
    
    return (parsed_row, 1)

In [101]:
#parseApacheLogLine('208.252.214.jbf,2003-03-03,00:00:00,500.0,919642.0,0000891836-02-000291,-index.htm,304.0,,1.0,0.0,0.0,1.0,0.0,mie')

parsed    208.252.214.jbf,2003-03-03,00:00:00,500.0,919642.0,0000891836-02-000291,-index.htm,304.0,,1.0,0.0,0.0,1.0,0.0,mie


In [97]:
#parseApacheLogLine('209.172.247.haf,2003-03-03,00:00:04,500.0,78003.0,0000914121-02-001461,pf121702-8k.txt,200.0,2527.0,0.0,0.0,0.0,9.0,0.0,win')

parsed    209.172.247.haf,2003-03-03,00:00:04,500.0,78003.0,0000914121-02-001461,pf121702-8k.txt,200.0,2527.0,0.0,0.0,0.0,9.0,0.0,win


In [98]:
#parseApacheLogLine('66.48.138.ach,2003-03-03,00:01:52,500.0,1037949.0,0001047469-03-006343,.txt,200.0,13224.0,0.0,1.0,0.0,0.0,0.0,')

parsed    66.48.138.ach,2003-03-03,00:01:52,500.0,1037949.0,0001047469-03-006343,.txt,200.0,13224.0,0.0,1.0,0.0,0.0,0.0,


In [103]:
parsedLogsRDD1 = logFileRDD.map(parseApacheLogLine).cache()

In [104]:
parsedLogsRDD1.take(1)

[(Row(ip='208.252.214.jbf', date=datetime.date(2003, 3, 3), time=datetime.time(0, 0), zone='500.0', cik='919642.0', accesion='0000891836-02-000291', filename='-index.htm', response_code=304, content_size=0.0, idx=True, browser='mie'),
  1)]

In [105]:
accessLogsRDD1, failedLogsRDD1 = access_fail_logs(parsedLogsRDD1)

Number of invalid logline: 17
Invalid logline: 144.214.5.cci,2003-03-03,01:49:11,500.0,313616.0,0000928385-02-001161,0000928385%2D02%2D001161.txt,200.0,150038.0,0.0,0.0,0.0,10.0,0.0,mie
Invalid logline: 144.214.5.cci,2003-03-03,01:55:33,500.0,313616.0,0000928385-02-001161,0000928385%2D02%2D001161.txt,200.0,103318.0,0.0,0.0,0.0,10.0,0.0,mie
Invalid logline: 12.222.107.fbd,2003-03-03,03:04:45,500.0,18230.0,0000018230-01-500093,0000018230%2D01%2D500093.txt,200.0,396293.0,0.0,0.0,0.0,10.0,0.0,win
Invalid logline: 203.218.153.jcd,2003-03-03,03:18:02,500.0,24491.0,0000950152-01-506621,0000950152%2D01%2D506621.txt,200.0,245669.0,0.0,0.0,0.0,10.0,0.0,win
Invalid logline: 204.4.131.egd,2003-03-03,06:26:37,500.0,1025773.0,0000912057-96-023543,.txt%20http://www.sonexis.com/company/in,404.0,3451.0,0.0,1.0,0.0,0.0,1.0,win
Invalid logline: 202.140.161.ecg,2003-03-03,08:01:34,500.0,310569.0,0000950138-01-500074,0000950138%2D01%2D500074.txt,200.0,116024.0,0.0,0.0,0.0,10.0,0.0,win
Invalid logline: 202.

Checking again that results are meaningful

At the moment I ignore the 17 unparsed lines and check that that parsed responses are meaningful

In [107]:
distinct_responsecodes_browsers(accessLogsRDD1)

Response codes are [200, 206, 302, 304, 403, 404, 416]
Browsers are ['iem', 'lin', 'mac', 'mie', 'not_found', 'opr', 'win']


Again we check that the results are reasonable

## Going to  production code

    * put the functions in the utils.py 
    * make parseApacheLogLine(logline) specific for each pattern and keep this in one file patter_EDGAR.py
    * logging instead of print statements
    * use pytest for testing patterns


In [117]:
import importlib
import pattern_EDGAR as pE; importlib.reload(pE)
import utils; importlib.reload(utils)

2022-02-09 22:44:12 INFO     pattern_EDGAR <module> line_35 LOG_PATTERN_EDGAR:  ^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\w{3}),(\d{4}\-\d{2}\-\d{2}),(\d{2}:\d{2}:\d{2}),(\d+\.?\d*),(\d+\.?\d*),([\w\-]+),([\w\-\.]+),(\d+\.?\d*),(\d*\.?\d*),([01]\.?0?),([01]\.?0?),([01]\.?0?),(1?[0-9]\.?0?),([01]\.?0?),([a-z]{0,3})$


<module 'utils' from '/home/olga/github-my/Web-Server-Log-Analysis-with-PySpark/utils.py'>

In [118]:
parsedLogsRDD1 = logFileRDD.map(pE.parseApacheLogLine).cache()

In [119]:
parsedLogsRDD1.take(3)

[(Row(ip='208.252.214.jbf', date=datetime.date(2003, 3, 3), time=datetime.time(0, 0), zone='500.0', cik='919642.0', accesion='0000891836-02-000291', filename='-index.htm', response_code=304, content_size=0.0, idx=True, browser='mie'),
  1),
 (Row(ip='66.48.138.ach', date=datetime.date(2003, 3, 3), time=datetime.time(0, 0, 2), zone='500.0', cik='1084230.0', accesion='0001104659-03-003192', filename='.txt', response_code=200, content_size=330570.0, idx=True, browser='not_found'),
  1),
 (Row(ip='209.172.247.haf', date=datetime.date(2003, 3, 3), time=datetime.time(0, 0, 4), zone='500.0', cik='78003.0', accesion='0000914121-02-001461', filename='pf121702-8k.txt', response_code=200, content_size=2527.0, idx=True, browser='win'),
  1)]

In [120]:
accessLogsRDD1, failedLogsRDD1 = utils.access_fail_logs(parsedLogsRDD1)

2022-02-09 22:44:29 INFO     utils        access_fail_logs line_28 Number of invalid logline: 17
2022-02-09 22:44:29 INFO     utils        access_fail_logs line_30 Invalid logline: 144.214.5.cci,2003-03-03,01:49:11,500.0,313616.0,0000928385-02-001161,0000928385%2D02%2D001161.txt,200.0,150038.0,0.0,0.0,0.0,10.0,0.0,mie
2022-02-09 22:44:29 INFO     utils        access_fail_logs line_30 Invalid logline: 144.214.5.cci,2003-03-03,01:55:33,500.0,313616.0,0000928385-02-001161,0000928385%2D02%2D001161.txt,200.0,103318.0,0.0,0.0,0.0,10.0,0.0,mie
2022-02-09 22:44:29 INFO     utils        access_fail_logs line_30 Invalid logline: 12.222.107.fbd,2003-03-03,03:04:45,500.0,18230.0,0000018230-01-500093,0000018230%2D01%2D500093.txt,200.0,396293.0,0.0,0.0,0.0,10.0,0.0,win
2022-02-09 22:44:29 INFO     utils        access_fail_logs line_30 Invalid logline: 203.218.153.jcd,2003-03-03,03:18:02,500.0,24491.0,0000950152-01-506621,0000950152%2D01%2D506621.txt,200.0,245669.0,0.0,0.0,0.0,10.0,0.0,win
2022-02-09

In [121]:
failedLogsRDD1.count()

17

In [122]:
uniqueResponseCodes, uniqueBrowsers = pE.distinct_responsecodes_browsers(accessLogsRDD)

2022-02-09 22:44:44 DEBUG    pattern_EDGAR distinct_responsecodes_browsers line_91 Response codes are [200, 206, 302, 404]
2022-02-09 22:44:44 DEBUG    pattern_EDGAR distinct_responsecodes_browsers line_95 Browsers are ['iem', 'lin', 'mac', 'mie', 'opr', 'win']


In [123]:
print('uniqueResponseCodes', uniqueResponseCodes)
print('uniqueBrowsers', uniqueBrowsers)

uniqueResponseCodes [200, 206, 302, 404]
uniqueBrowsers ['iem', 'lin', 'mac', 'mie', 'opr', 'win']


## Spark Streaming

In [7]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

import pattern_EDGAR as pE  #; importlib.reload(pE)
import utils;

In [8]:
#sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

In [None]:
lines = ssc.socketTextStream("localhost", 8890)

In [None]:
parsedLogsRDD1 = logFileRDD.map(pE.parseApacheLogLine).cache()

In [10]:
parsedLogsRDD1.pprint()

In [None]:
ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

In [None]:
# run Netcat as a data server
%sh
cat ./data/log20030303.csv | nc -u localhost 8890 -w0