In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
import pyspark

conf = pyspark.SparkConf()
conf.setAppName('Sparkl Python RDD Basics')
conf.set("spark.cores.max", "16")
conf.set("spark.yarn.executor.memoryOverhead", "0")
conf.set("spark.yarn.executor.memory", "512M")
conf.set("spark.yarn.driver.memory", "512M")
conf.set("spark.submit.deployMode", "client")
# sc.stop()
# sc = pyspark.SparkContext(conf=conf)
sc = pyspark.SparkContext.getOrCreate(conf=conf)

In [9]:
from urllib.request import urlretrieve
f = urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

# d = urlopen("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz")
# print(d.info())

In [19]:
data_file = "file:///home/hduser/sandbox/kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [20]:
raw_data.count()

494021

In [21]:
raw_data.take(5)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [22]:
a = range(100)
    
data = sc.parallelize(a)

In [23]:
data.count()

100

In [24]:
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

In [27]:
from time import time
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print("There are {} 'normal' interactions".format(normal_count))
print("Count completed in {} seconds".format(round(tt,3)))

There are 97278 'normal' interactions
Count completed in 9.877 seconds


In [29]:
from pprint import pprint

csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print("Prase completed in {} seconds".format(round(tt, 3)))

Prase completed in 0.853 seconds


In [30]:
pprint(head_rows[0])

['0',
 'tcp',
 'http',
 'SF',
 '181',
 '5450',
 '0',
 '0',
 '0',
 '0',
 '0',
 '1',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '8',
 '8',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '1.00',
 '0.00',
 '0.00',
 '9',
 '9',
 '1.00',
 '0.00',
 '0.11',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 'normal.']


In [32]:
t0 = time()
head_rows = csv_data.take(100000)
tt = time() - t0
print("Parse completed in {} seconds".format(round(tt,3)))

Parse completed in 141.976 seconds


In [37]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

In [38]:
key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])

('normal.',
 ['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'])


In [39]:
# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

In [41]:
# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print("Data collected in {} seconds".format(round(tt,3)))
print("There are {} 'normal' interactions".format(normal_count))

Data collected in 307.219 seconds
There are 97278 'normal' interactions
