In [1]:
import pyspark

In [2]:
sc = pyspark.SparkContext("local", "Simple App")   # Spark Context

In [3]:
data = sc.textFile('s3n://adp.spark.app.logs.us-west-1.prd/logs/aggregates/*.log')
# data is an RDD (spark) =  resilient distributed dataset

In [4]:
type(data)

pyspark.rdd.RDD

In [None]:
data

In [5]:
data.take(10)

['log4j:WARN No such property [rollingPolicy] in org.apache.log4j.RollingFileAppender.',
 '18/01/24 05:18:33 INFO s3OperationsLog: Method=HEAD ResponseCode=404 URI=http://com.autodesk.edl.prd.s3.amazonaws.com/apps%2Fqubole%2Faccount_id%2F4322%2Flogs%2Fhadoop%2F44195%2F759707%2Fapp-logs%2Fasrd.cp.big.data.services.team%2Flogs%2Fapplication_1513303661803_104617',
 '18/01/24 05:18:33 INFO s3OperationsLog: Method=GET ResponseCode=200 URI=http://com.autodesk.edl.prd.s3.amazonaws.com/?delimiter=%2F&max-keys=1000&prefix=apps%2Fqubole%2Faccount_id%2F4322%2Flogs%2Fhadoop%2F44195%2F759707%2Fapp-logs%2Fasrd.cp.big.data.services.team%2Flogs%2Fapplication_1513303661803_104617%2F',
 '18/01/24 05:18:33 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum',
 '18/01/24 05:18:33 INFO s3OperationsLog: Method=HEAD ResponseCode=200 URI=http://com.autodesk.edl.prd.s3.amazonaws.com/apps%2Fqubole%2Faccount_id%2F4322%2Flogs%2Fhadoop%2F44195%2F759707%2Fapp-log

In [4]:
data_sample = data.sample(False, 0.01, 42)

In [4]:
data_f = data.filter(lambda line: 'asks yarn to' in line.lower())

In [5]:
data_f.take(100)

['18/01/20 14:43:10 Asks YARN to kill this spark job INFO YarnClientImpl: Killed application application_1513303661803_104618',
 '18/01/20 14:26:42 Asks YARN to kill this spark job INFO YarnClientImpl: Killed application application_1513303661803_104736',
 '18/01/20 15:11:21 Asks YARN to kill this spark job INFO YarnClientImpl: Killed application application_1513303661803_104848',
 '18/01/20 14:23:38 Asks YARN to kill this spark job INFO YarnClientImpl: Killed application application_1513303661803_104831',
 '18/01/20 15:30:57 Asks YARN to kill this spark job INFO YarnClientImpl: Killed application application_1513303661803_104930',
 '18/01/20 15:36:41 Asks YARN to kill this spark job INFO YarnClientImpl: Killed application application_1513303661803_104977',
 '18/01/20 15:38:04 Asks YARN to kill this spark job INFO YarnClientImpl: Killed application application_1513303661803_104987',
 '18/01/20 15:38:18 Asks YARN to kill this spark job INFO YarnClientImpl: Killed application application

In [8]:
#split the lines to words
documents = data_sample.map(lambda line: line.split(" "))

In [None]:
documents.count()

In [8]:
documents.take(2)

[['18/01/20',
  '14:07:44',
  'Thread-3',
  'INFO',
  'HadoopFsRelation:',
  'Ignore',
  'File',
  'not',
  'found',
  'exceptions',
  '-',
  'true'],
 ['18/01/20',
  '16:18:50',
  'Thread-3',
  'INFO',
  'HadoopFsRelation:',
  'Final',
  'Fake',
  'Statuses',
  'count',
  '179054',
  'Total',
  'time',
  'to',
  'get',
  'all',
  'the',
  'file',
  'statuses',
  '-',
  '31',
  'secs']]

In [None]:
from pyspark.mllib.feature import HashingTF, IDF

#hashingTF = HashingTF()
#tf = hashingTF.transform(documents)

# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
#tf.cache()
#idf = IDF().fit(tf)
#tfidf = idf.transform(tf)

# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)

In [10]:
import collections
from numpy import array
from math import sqrt

from pyspark.mllib.clustering import KMeans, KMeansModel


n_clusters = 50
# Build the model (cluster the data)
kmeans = KMeans.train(tfidfIgnore, n_clusters, maxIterations=7, initializationMode="random")


In [11]:
#generate the clustering labels
kmeans_pre = kmeans.predict(tfidfIgnore)

In [12]:
type(kmeans_pre)

pyspark.rdd.PipelinedRDD

In [13]:
#combine lines with clustering labels
lines_cluster = data.zip(kmeans_pre)

In [14]:
lines_cluster.take(50)

[('18/02/07 16:40:49 main INFO Utils: Registered signal handlers for exception exit hook [TERM, HUP, INT]',
  0),
 ('18/02/07 16:41:42 main INFO HadoopFsRelation: Ignore File not found exceptions - true',
  0)]