In [1]:
import re,glob
import pandas as pd
import datetime
import findspark
findspark.init()
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
sc = pyspark.SparkContext('local', 'spark')
sqlContext = pyspark.SQLContext(sc)

In [3]:
file_pattern = './generators/kdlog/*'
pattern = r'(\S+) (\S+) (\S+) \[(\S+) \S+\] "(\S+) (\S+) (\S+)" (\S+) (\S+) "(\S*)" "(.*?)"'
col_names = ['host', 'name', 'login_name', 'date_time', 'method', 'request_url', 'version', 'status', 'size', 'referrer', 'agent']

In [4]:
def gen_files(filenames):
    for filename in filenames:
        if filename.endswith('.bz2'):
            file = bz2.open(filename, 'rt')
        elif filename.endswith('.gz'):
            file = gzip.open(filename, 'rt')
        else:
            file = open(filename)
        yield file

def gen_lines(files):
    for file in files:
        for line in file:
            yield line

def gen_extract(lines, pattern):
    for line in lines:
        m = re.search(pattern, line)
        try:
            yield m.groups()
        except:
            print(m, line)
            
def gen_transform_col(rows, col_names, func):
    errors = set()
    for row in rows:
        try:
            for column in col_names:
                row[column] = func(row[column])
            yield row
        except Exception as e:
            error = (e.args, type(e))
            if error not in errors:
                errors.add(error)
                print(error)

In [5]:
#generator
filenames = glob.iglob(file_pattern, recursive=True)
files = gen_files(filenames)
lines = gen_lines(files)
#pandas
filenames = glob.iglob(file_pattern, recursive=True) #this line has to be written again because filenames will be used up 
pd_df = pd.concat((pd.read_csv(filename, sep='$', header=None, names=['value',]) for filename in filenames),
               ignore_index=True)
#Spark
##Spark takes care of generating filenames
spark_df = sqlContext.read.text(file_pattern)


#generator
gen_df = gen_extract(lines, pattern)
##rename columns in gen_df
gen_df = ({col_name: row[i] for i, col_name in enumerate(col_names)} for row in gen_df)
#pandas
pd_df = pd_df['value'].str.extract(pattern).rename(columns = lambda i: col_names[i])
#Spark
extract_expr = [F.regexp_extract(spark_df['value'], pattern, i+1).alias(col_name) 
                for i, col_name in enumerate(col_names)]
spark_df = spark_df.select(extract_expr)


#generator
gen_df = gen_transform_col(gen_df, col_names, lambda x: None if x == '-' else x)
#pandas
##without [] around None, pandas replace will think it should fill in the holes in its own method 
pd_df = pd_df.replace(to_replace=['-'], value=[None])
#Spark
##when regexp_extract fails to match the pattern, it will return '' instead of None
replace_expr = [F.when(spark_df[col].isin('', '-'), None).otherwise(spark_df[col]).alias(col)
                for col in col_names]
spark_df = spark_df.select(replace_expr)


#generator
def to_int(x):
    if x is not None:
        return int(x)
    return None
def to_datetime(x):
    return datetime.datetime.strptime(x, '%d/%b/%Y:%H:%M:%S')
gen_df = gen_transform_col(gen_df, ['size', 'status'], to_int)
gen_df = gen_transform_col(gen_df, ['date_time'], to_datetime)
#pandas
pd_df['status'] = pd.to_numeric(pd_df['status'], errors = 'coerce')
pd_df['size'] = pd.to_numeric(pd_df['size'], errors = 'coerce')
pd_df['date_time'] = pd.to_datetime(pd_df['date_time'], format='%d/%b/%Y:%H:%M:%S')
#Spark
spark_df = spark_df.withColumn('status', spark_df['status'].astype(T.IntegerType()))
spark_df = spark_df.withColumn('size', spark_df['size'].astype(T.IntegerType()))                        
spark_df = spark_df.withColumn('date_time', 
    F.unix_timestamp('date_time', 'd/MMM/y:H:m:s').astype(T.TimestampType()))



In [7]:
#At this point, pandas has processed data, but generators and Spark haven't yet. 
#We can look at what gererators do by printing a few lines
for i in range(5):
    print(next(gen_df))

{'referrer': 'http://www.google.com/search?hs=JnE&hl=en&lr=&client=opera&rls=en&q=lift+curve&btnG=Search', 'date_time': datetime.datetime(2005, 11, 16, 0, 2, 46), 'version': 'HTTP/1.1', 'request_url': '/gpspubs/kdd99-est-ben-lift/sld021.htm', 'name': None, 'agent': 'Mozilla/4.0 (compatible; MSIE 6.0; X11; Linux i686; en) Opera 8.5', 'status': 200, 'login_name': None, 'method': 'GET', 'host': 'ip1389.net', 'size': 1385}
{'referrer': 'http://www.kdnuggets.com/gpspubs/kdd99-est-ben-lift/sld021.htm', 'date_time': datetime.datetime(2005, 11, 16, 0, 2, 46), 'version': 'HTTP/1.1', 'request_url': '/gpspubs/kdd99-est-ben-lift/img021.gif', 'name': None, 'agent': 'Mozilla/4.0 (compatible; MSIE 6.0; X11; Linux i686; en) Opera 8.5', 'status': 200, 'login_name': None, 'method': 'GET', 'host': 'ip1389.net', 'size': 7465}
{'referrer': 'http://www.kdnuggets.com/gpspubs/kdd99-est-ben-lift/sld021.htm', 'date_time': datetime.datetime(2005, 11, 16, 0, 2, 47), 'version': 'HTTP/1.1', 'request_url': '/favicon

In [8]:
#we can look at pandas dataframe directly
pd_df.head(5)

Unnamed: 0,host,name,login_name,date_time,method,request_url,version,status,size,referrer,agent
0,ip1664.com,,,2005-11-16 00:00:43,GET,/robots.txt,HTTP/1.0,200,173.0,,msnbot/1.0 (+http://search.msn.com/msnbot.htm)
1,ip1664.com,,,2005-11-16 00:00:43,GET,/gpspubs/sigkdd-kdd99-panel.html,HTTP/1.0,200,14199.0,,msnbot/1.0 (+http://search.msn.com/msnbot.htm)
2,ip1115.unr,,,2005-11-16 00:01:00,GET,/news/99/n23/i12.html,HTTP/1.1,200,3171.0,http://discount-blah1.professional-doctor.com/,Mozilla/4.0 (compatible; MSIE 5.5; Windows 98;...
3,ip2283.unr,,,2005-11-16 00:01:02,GET,/dmcourse/data_mining_course/assignments/assig...,HTTP/1.1,200,8090.0,http://www.google.com/search?hl=en&q=use+of+da...,Mozilla/4.0 (compatible; MSIE 6.0; Windows NT ...
4,ip2283.unr,,,2005-11-16 00:01:03,GET,/dmcourse/dm.css,HTTP/1.1,200,155.0,http://www.kdnuggets.com/dmcourse/data_mining_...,Mozilla/4.0 (compatible; MSIE 6.0; Windows NT ...


In [9]:
# We can look at Spark dataframe by executing an action
spark_df.take(5)
#spark_df.show(5)

[Row(host='ip1664.com', name=None, login_name=None, date_time=datetime.datetime(2005, 11, 16, 0, 0, 43), method='GET', request_url='/robots.txt', version='HTTP/1.0', status=200, size=173, referrer=None, agent='msnbot/1.0 (+http://search.msn.com/msnbot.htm)'),
 Row(host='ip1664.com', name=None, login_name=None, date_time=datetime.datetime(2005, 11, 16, 0, 0, 43), method='GET', request_url='/gpspubs/sigkdd-kdd99-panel.html', version='HTTP/1.0', status=200, size=14199, referrer=None, agent='msnbot/1.0 (+http://search.msn.com/msnbot.htm)'),
 Row(host='ip1115.unr', name=None, login_name=None, date_time=datetime.datetime(2005, 11, 16, 0, 1), method='GET', request_url='/news/99/n23/i12.html', version='HTTP/1.1', status=200, size=3171, referrer='http://discount-blah1.professional-doctor.com/', agent='Mozilla/4.0 (compatible; MSIE 5.5; Windows 98; SAFEXPLORER TL)'),
 Row(host='ip2283.unr', name=None, login_name=None, date_time=datetime.datetime(2005, 11, 16, 0, 1, 2), method='GET', request_url=