# Spark Basics

CWD `Apache-Spark-with-Python/notebooks` -> `Apache-Spark-with-Python`

(Current Working Directory)

In [1]:
import os
os.chdir(os.environ.get("PWD"))

Then import from `src`

In [2]:
from src.helper.decorators import timing

## SparkContext

The first thing a Spark program must do is to create a `SparkContext` object, which tells Spark how to access a cluster. In this case we are using a `local` cluster

In [3]:
from pyspark.context import SparkContext

sc = SparkContext(appName="SparkBasics", master="local[2]")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/26 20:49:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Download data

In [4]:
from urllib.request import urlretrieve
url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz"
destination = "data/kddcup.data_10_percent.gz"
f = urlretrieve(url, destination)

## Resilient Distributed Datasets (RDDs)

Spark revolves around the concept of a resilient distributed dataset (RDD).
- `resilient` immutable collection of your data
- `distributed` data can be partitioned across nodes in your cluster

1. An RDD can be created by parallelizing existing collections:

In [5]:
existing_collection = [1, 2, 3, 4, 5]
rdd = sc.parallelize(existing_collection)

2. An RDD can be created from external datasets, here from local file system:

In [6]:
rdd = sc.textFile(destination)

## RDD Operations
- **transformations** are piecewise operations on an RDD, e.g. `map`, `filter`, `groupBy`.
- **actions** are operations that require the entire data of an RDD, e.g. `reduce`, `count` or `min`, `max` and `variance`.

More on transformations and actions: https://www.analyticsvidhya.com/blog/2016/10/using-pyspark-to-perform-transformations-and-actions-on-rdd/

In the following, we will wrap all method calls in `timing`

Note: `rdd.some_method(arg)` <=> `timing(rdd.some_method)(arg)`

In [7]:
rdd_filtered = timing(rdd.filter)(lambda x: 'normal.' in x)

func: filter(args=(<function <lambda> at 0x108cd7dc0>,), kwargs={}) took: 0.0058 sec


In [8]:
timing(rdd_filtered.count)()

[Stage 0:>                                                          (0 + 1) / 1]

func: count(args=(), kwargs={}) took: 2.1918 sec


                                                                                

97278

In [9]:
timing(rdd_filtered.take)(5)

func: take(args=(5,), kwargs={}) took: 0.0827 sec


['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [10]:
from pprint import pprint
rdd_split = timing(rdd.map)(lambda x: x.split(","))
head_rows = timing(rdd_split.take)(5)
pprint(head_rows[0])

func: map(args=(<function <lambda> at 0x105bd6280>,), kwargs={}) took: 0.0017 sec
func: take(args=(5,), kwargs={}) took: 0.0764 sec
['0',
 'tcp',
 'http',
 'SF',
 '181',
 '5450',
 '0',
 '0',
 '0',
 '0',
 '0',
 '1',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '8',
 '8',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '1.00',
 '0.00',
 '0.00',
 '9',
 '9',
 '1.00',
 '0.00',
 '0.11',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 'normal.']


In [11]:
head_rows = timing(rdd_split.take)(100000)

                                                                                

func: take(args=(100000,), kwargs={}) took: 1.4344 sec


In [12]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[41]
    return (tag, elems)

In [13]:
rdd_tagged = timing(rdd.map)(parse_interaction)
head_rows = timing(rdd_tagged.take)(5)
pprint(head_rows[0])

func: map(args=(<function parse_interaction at 0x105bd6310>,), kwargs={}) took: 0.0010 sec
func: take(args=(5,), kwargs={}) took: 0.0814 sec
('normal.',
 ['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'])


In [14]:
all_raw_data = timing(rdd.collect)()

                                                                                

func: collect(args=(), kwargs={}) took: 3.1322 sec


# All together

In [15]:
# get data from file
rdd = sc.textFile(destination)

# parse into key-value pairs
rdd2 = timing(rdd.map)(parse_interaction)

# filter normal key interactions
rdd3 = timing(rdd2.filter)(lambda x: x[0] == "normal.")

# collect all (returns list)
all_normal = timing(rdd3.collect)()
normal_count = len(all_normal)
print(f"There are {normal_count} 'normal' interactions")

func: map(args=(<function parse_interaction at 0x105bd6310>,), kwargs={}) took: 0.0005 sec
func: filter(args=(<function <lambda> at 0x1227414c0>,), kwargs={}) took: 0.0000 sec


                                                                                

func: collect(args=(), kwargs={}) took: 3.0451 sec
There are 97278 'normal' interactions
