## use findspark program to setup pyspark easily

In [1]:
import os,findspark
os.environ['SPARK_HOME'] = '/home/rohith/work/spark-1.6.1-bin-without-hadoop'
findspark.init()

Now pyspark and its modules are available for imports 

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row

In [3]:
import re

In [4]:
APP_NAME = "Logs Spark"
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("local[*]")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [7]:
logFiles = sc.textFile("file:/var/log/nginx/access.log")

In [8]:
NGINX_LOGPATT = '''^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\]  "(\S+) (\S+) (\S+)" (\S+) (\S+) "(\S+)" "(\w+\/\S+ \S+ \S+ \S+ \S+ \S+ \S+)" "(\S+)"'''

In [9]:
def parse_nginx_log(logline):
    match = re.search(NGINX_LOGPATT, logline)
    if match is None:
        raise Exception("Invalid logline : %s" % logline)
    return Row(
        ip_address = match.group(1),
        remote_user = match.group(3),
        time_stamp = match.group(4),
        request_type = match.group(5),
        end_point = match.group(6),
        protocol = match.group(7),
        http_status_code = match.group(8),
        content_size = int(match.group(9)),
        referer = match.group(10),
        user_agent = match.group(11),
        cookie_user = match.group(12)
    )

In [11]:
access_logs = logFiles.map(parse_nginx_log).cache()

In [12]:
content_sizes = access_logs.map(lambda log: log.content_size).cache()
print "Content Size Avg: %i, Min: %i, Max: %s" % (
    content_sizes.reduce(lambda a, b : a + b) / content_sizes.count(),
    content_sizes.min(),
    content_sizes.max()
    )

Content Size Avg: 283, Min: 0, Max: 514


Response Code to Count


In [13]:
responseCodeToCount = (access_logs.map(lambda log: (log.http_status_code, 1))
                       .reduceByKey(lambda a, b : a + b)
                       .take(100))
print "Response Code Counts: %s" % (responseCodeToCount)

Response Code Counts: [(u'302', 6), (u'502', 1), (u'200', 63), (u'404', 1), (u'499', 1)]


In [14]:
ipAddresses = (access_logs
               .map(lambda log: (log.ip_address, 1))
               .reduceByKey(lambda a, b : a + b)
               .filter(lambda s: s[1] > 10)
               .map(lambda s: s[0])
               .take(100))
print "IpAddresses that have accessed more then 10 times: %s" % (ipAddresses)

IpAddresses that have accessed more then 10 times: [u'127.0.0.1']


In [15]:
topEndpoints = (access_logs
                .map(lambda log: (log.end_point, 1))
                .reduceByKey(lambda a, b : a + b)
                .takeOrdered(10, lambda s: -1 * s[1]))
print "Top Endpoints: %s" % (topEndpoints)

Top Endpoints: [(u'/', 21), (u'/register', 8), (u'/recharge', 8), (u'/product1', 7), (u'/product2', 6), (u'/login?next=', 6), (u'/product3', 5), (u'/login', 4), (u'/recharge?next=', 3), (u'/register?next=', 2)]


In [16]:
access_logs.take(5)

[Row(content_size=464, cookie_user=u'\\x22rohith.reddy@customercentria.com\\x22', end_point=u'/register', http_status_code=u'200', ip_address=u'127.0.0.1', protocol=u'HTTP/1.1', referer=u'http://localhost:8090/product3', remote_user=u'-', request_type=u'GET', time_stamp=u'25/May/2016:17:17:21 +0530', user_agent=u'Mozilla/5.0 (X11; Linux x86_64; rv:48.0) Gecko/20100101 Firefox/48.0'),
 Row(content_size=182, cookie_user=u'\\x22rohith.reddy@customercentria.com\\x22', end_point=u'/', http_status_code=u'200', ip_address=u'127.0.0.1', protocol=u'HTTP/1.1', referer=u'http://localhost:8090/register', remote_user=u'-', request_type=u'GET', time_stamp=u'25/May/2016:17:17:22 +0530', user_agent=u'Mozilla/5.0 (X11; Linux x86_64; rv:48.0) Gecko/20100101 Firefox/48.0'),
 Row(content_size=464, cookie_user=u'\\x22rohith.reddy@customercentria.com\\x22', end_point=u'/recharge', http_status_code=u'200', ip_address=u'127.0.0.1', protocol=u'HTTP/1.1', referer=u'http://localhost:8090/', remote_user=u'-', req

In [17]:
type(access_logs)

pyspark.rdd.PipelinedRDD

In [18]:
logDf = access_logs.toDF()

In [19]:
logDf.take(5)

[Row(content_size=464, cookie_user=u'\\x22rohith.reddy@customercentria.com\\x22', end_point=u'/register', http_status_code=u'200', ip_address=u'127.0.0.1', protocol=u'HTTP/1.1', referer=u'http://localhost:8090/product3', remote_user=u'-', request_type=u'GET', time_stamp=u'25/May/2016:17:17:21 +0530', user_agent=u'Mozilla/5.0 (X11; Linux x86_64; rv:48.0) Gecko/20100101 Firefox/48.0'),
 Row(content_size=182, cookie_user=u'\\x22rohith.reddy@customercentria.com\\x22', end_point=u'/', http_status_code=u'200', ip_address=u'127.0.0.1', protocol=u'HTTP/1.1', referer=u'http://localhost:8090/register', remote_user=u'-', request_type=u'GET', time_stamp=u'25/May/2016:17:17:22 +0530', user_agent=u'Mozilla/5.0 (X11; Linux x86_64; rv:48.0) Gecko/20100101 Firefox/48.0'),
 Row(content_size=464, cookie_user=u'\\x22rohith.reddy@customercentria.com\\x22', end_point=u'/recharge', http_status_code=u'200', ip_address=u'127.0.0.1', protocol=u'HTTP/1.1', referer=u'http://localhost:8090/', remote_user=u'-', req

In [20]:
logDf.collect()

[Row(content_size=464, cookie_user=u'\\x22rohith.reddy@customercentria.com\\x22', end_point=u'/register', http_status_code=u'200', ip_address=u'127.0.0.1', protocol=u'HTTP/1.1', referer=u'http://localhost:8090/product3', remote_user=u'-', request_type=u'GET', time_stamp=u'25/May/2016:17:17:21 +0530', user_agent=u'Mozilla/5.0 (X11; Linux x86_64; rv:48.0) Gecko/20100101 Firefox/48.0'),
 Row(content_size=182, cookie_user=u'\\x22rohith.reddy@customercentria.com\\x22', end_point=u'/', http_status_code=u'200', ip_address=u'127.0.0.1', protocol=u'HTTP/1.1', referer=u'http://localhost:8090/register', remote_user=u'-', request_type=u'GET', time_stamp=u'25/May/2016:17:17:22 +0530', user_agent=u'Mozilla/5.0 (X11; Linux x86_64; rv:48.0) Gecko/20100101 Firefox/48.0'),
 Row(content_size=464, cookie_user=u'\\x22rohith.reddy@customercentria.com\\x22', end_point=u'/recharge', http_status_code=u'200', ip_address=u'127.0.0.1', protocol=u'HTTP/1.1', referer=u'http://localhost:8090/', remote_user=u'-', req

In [21]:
dir(logDf)

['__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__format__',
 '__getattr__',
 '__getattribute__',
 '__getitem__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_jcols',
 '_jdf',
 '_jmap',
 '_jseq',
 '_lazy_rdd',
 '_sc',
 '_schema',
 '_sort_cols',
 'agg',
 'alias',
 'cache',
 'coalesce',
 'collect',
 'columns',
 'corr',
 'count',
 'cov',
 'crosstab',
 'cube',
 'describe',
 'distinct',
 'drop',
 'dropDuplicates',
 'drop_duplicates',
 'dropna',
 'dtypes',
 'explain',
 'fillna',
 'filter',
 'first',
 'flatMap',
 'foreach',
 'foreachPartition',
 'freqItems',
 'groupBy',
 'groupby',
 'head',
 'insertInto',
 'intersect',
 'isLocal',
 'is_cached',
 'join',
 'limit',
 'map',
 'mapPartitions',
 'na',
 'orderBy',
 'persist',
 'printSchema',
 'randomSplit',
 'rdd',
 'registerAsTable',
 'registerTempTable',
 'repartition',
 'replace',
 'rollup',
 'sample',


In [22]:
logDf.printSchema()

root
 |-- content_size: long (nullable = true)
 |-- cookie_user: string (nullable = true)
 |-- end_point: string (nullable = true)
 |-- http_status_code: string (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- referer: string (nullable = true)
 |-- remote_user: string (nullable = true)
 |-- request_type: string (nullable = true)
 |-- time_stamp: string (nullable = true)
 |-- user_agent: string (nullable = true)



In [25]:
logDf.write.json("save")

Py4JJavaError: An error occurred while calling o278.json.
: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeException): Cannot create directory /user/rohith/save/_temporary/0. Name node is in safe mode.
The reported blocks 0 needs additional 19 blocks to reach the threshold 0.9990 of total blocks 19.
The number of live datanodes 1 has reached the minimum number 0. Safe mode will be turned off automatically once the thresholds have been reached.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkNameNodeSafeMode(FSNamesystem.java:1327)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3893)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:983)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:622)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

	at org.apache.hadoop.ipc.Client.call(Client.java:1475)
	at org.apache.hadoop.ipc.Client.call(Client.java:1412)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
	at com.sun.proxy.$Proxy20.mkdirs(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:558)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at com.sun.proxy.$Proxy21.mkdirs(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3000)
	at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2970)
	at org.apache.hadoop.hdfs.DistributedFileSystem$21.doCall(DistributedFileSystem.java:1047)
	at org.apache.hadoop.hdfs.DistributedFileSystem$21.doCall(DistributedFileSystem.java:1043)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:1043)
	at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:1036)
	at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1877)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:305)
	at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:108)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:147)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
	at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
	at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
	at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
	at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:323)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
