# PySpark Tutorial
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

Spark was originally developed using Scala, although there are Python and Java interfaces as well. This tutorial covers [most of the RDD API](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds) using Python bindings.

You may want to consult the [PySpark manual](http://spark.apache.org/docs/2.1.0/api/python/pyspark.html) as well.

In [2]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 63.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=b9c3b210a47ab6734b8272fee32cc26b5394e7710c7e300a72abbf035c8949ae
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [3]:
import pyspark
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

The `SparkContext` object tells Spark how to access a cluster. The `SparkConf` object defines information about our job.

The `master` of a Spark configuration is the cluster (YARN or Mesos) manager. It can also be "local" meaning that the Spark job runs on your local machine, which is what we'll do here; the `[*]` notation means to use all the available cores. In general, you shouldn't hardcode the `master` mechanism.

Spark uses a function chaining notation. We'll use that throughout unless it makes this confusing.

In [4]:
conf=SparkConf().setAppName("pyspark tutorial").setMaster("local[2]")
sc = SparkContext(conf=conf)
#sc.setLogLevel("DEBUG")

## Create Datasets
The basic Spark data structure is the RDD (resilient distributed data), which is essentially a vector distributed across the cluster of nodes or on the local system. In PySpark, the vector can contain a heterogenous collection of types (strings, ints, etc).

You can create an RDD from a list or tuple, read it from a local file or read it from networked distributions such as HDFS or S3.

The following shows creating three datasets from lists using the _parallelize_ method.

In [5]:
a = sc.parallelize([7, 2, 3, 1, 2, 3, 4, 5, 6, 7])
b = sc.parallelize([2, 3, 99, 22, -77])
c = sc.parallelize([ (1,2), (2,3), (1, 99), (3, 44), (2, 1), (4,5), (3, 19) ] )

In [6]:
a

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [7]:
passwd = sc.textFile("/etc/passwd")

It is also possible to read and write binary data files, including data formatted in Hadoop Sequence types.

Spark also supports _accumulators_ and _broadcast variables_.  Accumulators are designed to sum or aggregate values from across the cluster; they are really only suitable for commutative-associative operators. Broadcast variables are efficiently disseminated to all nodes in the cluster; they can be used for the equivilent of "map-side joins".

## Transformations and Actions
*Transformations* produce new RDD's by transforming existing RDD's  and *Actions* convert data *to* and *from* an RDD.

### Actions

Some of the most simple actions are:
* count() - Return the number of items in the RDD
* take(_n_) - Extract and return the first _n_ items from the RDD
* first() - Same as take(1)
* collect() - Same as take(count()) - **returns full RDD**
* takeSample(_withReplacement_:Boolean, _num_:int, [ seed:Int] ) - extract a random set of _num_ items from the RDD with or without replacement.
* takeOrdered( _num_ ) - extract _num_ items from the sorted RDD.

In [10]:
print(a.count())
print(a.take(2))
print(a.first())
print(a.collect())
print(a.takeSample(True, 3))
print(a.takeSample(False, 3))
print(a.takeOrdered(4))

10
[7, 2]
7
[7, 2, 3, 1, 2, 3, 4, 5, 6, 7]
[7, 7, 7]
[2, 4, 2]
[1, 2, 2, 3]


## Lambda Functions in Python

Lambda functions, or anonymous functions are common in other languages (e.g. Scala) and commonly used in PySpark. The Python lambda is restricted to simple single-line statements.

In [11]:
(lambda x: x + x)(1)

2

In [12]:
lambda x: x + 1

<function __main__.<lambda>(x)>

In [13]:
add2 = lambda x: x + 2
add2(3)

5

## Map, Reduce & flatMap

`map` is a transformation that produces a new RDD. `reduce` is an action that applies a specified function to the elements of an RDD. Map is applied using a single-argument (unary) function (often a *lambda*) while reduce takes a binary (or dyadic) function.

Examples:

In [14]:
a.collect()

[7, 2, 3, 1, 2, 3, 4, 5, 6, 7]

In [15]:
a.map( lambda x: x**2 ).collect()

[49, 4, 9, 1, 4, 9, 16, 25, 36, 49]

In [16]:
a.collect()

[7, 2, 3, 1, 2, 3, 4, 5, 6, 7]

**reduce** does a reduction for a function across the RDD. For example, the `operator.add` function in Python is the function that impements addition, so we can reduce an RDD using:

In [17]:
a.reduce(operator.add)

40

In [18]:
a.count()

10

The following `lambda` sums the first and second element of the tuples in `c`

In [20]:
c.collect()

[(1, 2), (2, 3), (1, 99), (3, 44), (2, 1), (4, 5), (3, 19)]

In [21]:
c.reduce( lambda x,y: (x[0] + y[0], x[1] + y[1]) )

(16, 173)

That should produce the same result as the more complex example
below, which returns an RDD for each field of the tuple and then
adds those those using reduce. The operator.add function is "+"

In [22]:
( c.map( lambda x : x[0] ).reduce(operator.add), 
  c.map( lambda x: x[1] ).reduce(operator.add) )

(16, 173)

**flatMap** applies a map operation across elements of a list, but then takes those elements and *appends* them to the list. The result is useful when processing a set of tuples or breaking documents into words and then processing the words rather than lines-of-words.

In [23]:
sent = sc.parallelize(["these are some", "sample words" ])

In [24]:
sent.collect()

['these are some', 'sample words']

In [25]:
sent.map( lambda x : x.split() ).collect()

[['these', 'are', 'some'], ['sample', 'words']]

In [26]:
sent.flatMap( lambda x : x.split() ).collect()

['these', 'are', 'some', 'sample', 'words']

## Filter, Sampling & Sorting

Filter can be used to remove or filter items from an RDD

In [27]:
isEven = lambda x: x %2 == 0

print(a.collect())
print(a.filter(isEven).collect())

[7, 2, 3, 1, 2, 3, 4, 5, 6, 7]
[2, 2, 4, 6]


In [29]:
passwd

/etc/passwd MapPartitionsRDD[4] at textFile at NativeMethodAccessorImpl.java:0

In [28]:
passwd.map( lambda x : x.split(':' ) )\
   .filter( lambda x : x[0] == 'root' )\
   .collect()

[['root', 'x', '0', '0', 'root', '/root', '/bin/bash']]

If we want to get a representative sample of the data, we can use **takeSample** to sample (and collect) that data or **sample** to get an RDD. The first argument determines if we sample with replacement (true) or not.

In [37]:
a.collect()

[7, 2, 3, 1, 2, 3, 4, 5, 6, 7]

In [30]:
a.takeSample(True, 3)

[2, 1, 2]

In [43]:
a.sample(True, 2).collect()

[2, 2, 2, 3, 3, 3, 3, 1, 2, 2, 3, 5, 5, 6, 6, 6, 7, 7, 7, 7, 7]

If you're examining data using **take** but you want to access the underlying data in a sorted order you can use **takeOrdered**. 

In [32]:
a.collect()

[7, 2, 3, 1, 2, 3, 4, 5, 6, 7]

In [33]:
a.take(4)

[7, 2, 3, 1]

In [34]:
a.takeOrdered(4)

[1, 2, 2, 3]

You can also use **takeOrdered** with more complex pairs if you provide an ordering function. For example, we can sort (K,V) pairs using the extra function to define the order:

In [44]:
c.collect()

[(1, 2), (2, 3), (1, 99), (3, 44), (2, 1), (4, 5), (3, 19)]

The default **takeOrdered** uses the sort-order of the type, which sorts on the keys (and the values if equal keys).

In [45]:
c.takeOrdered(3)

[(1, 2), (1, 99), (2, 1)]

We can provide an ordering function as a lambda or function of a single argument. We can also sort in reverse order.


In [46]:
c.takeOrdered(3, lambda x: x[1])

[(2, 1), (1, 2), (2, 3)]

In [47]:
c.takeOrdered(3, lambda x: -x[1])

[(1, 99), (3, 44), (3, 19)]

**sortBy** and **sortByKey** serve a similar role as takeOrdered but completely sorts an RDD rather than the just the returned results.

In [48]:
passwdLines = open('/etc/passwd', 'r').readlines()
passwd = sc.parallelize( passwdLines )

In [50]:
passwd.collect()

['root:x:0:0:root:/root:/bin/bash\n',
 'daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin\n',
 'bin:x:2:2:bin:/bin:/usr/sbin/nologin\n',
 'sys:x:3:3:sys:/dev:/usr/sbin/nologin\n',
 'sync:x:4:65534:sync:/bin:/bin/sync\n',
 'games:x:5:60:games:/usr/games:/usr/sbin/nologin\n',
 'man:x:6:12:man:/var/cache/man:/usr/sbin/nologin\n',
 'lp:x:7:7:lp:/var/spool/lpd:/usr/sbin/nologin\n',
 'mail:x:8:8:mail:/var/mail:/usr/sbin/nologin\n',
 'news:x:9:9:news:/var/spool/news:/usr/sbin/nologin\n',
 'uucp:x:10:10:uucp:/var/spool/uucp:/usr/sbin/nologin\n',
 'proxy:x:13:13:proxy:/bin:/usr/sbin/nologin\n',
 'www-data:x:33:33:www-data:/var/www:/usr/sbin/nologin\n',
 'backup:x:34:34:backup:/var/backups:/usr/sbin/nologin\n',
 'list:x:38:38:Mailing List Manager:/var/list:/usr/sbin/nologin\n',
 'irc:x:39:39:ircd:/var/run/ircd:/usr/sbin/nologin\n',
 'gnats:x:41:41:Gnats Bug-Reporting System (admin):/var/lib/gnats:/usr/sbin/nologin\n',
 'nobody:x:65534:65534:nobody:/nonexistent:/usr/sbin/nologin\n',
 '_apt:x:100:65

In [51]:
userAndShell = passwd.map( lambda x: x.rstrip('\n').split(':') )\
    .map( lambda y: ( y[0], y[6] ) )
userAndShell.collect()

[('root', '/bin/bash'),
 ('daemon', '/usr/sbin/nologin'),
 ('bin', '/usr/sbin/nologin'),
 ('sys', '/usr/sbin/nologin'),
 ('sync', '/bin/sync'),
 ('games', '/usr/sbin/nologin'),
 ('man', '/usr/sbin/nologin'),
 ('lp', '/usr/sbin/nologin'),
 ('mail', '/usr/sbin/nologin'),
 ('news', '/usr/sbin/nologin'),
 ('uucp', '/usr/sbin/nologin'),
 ('proxy', '/usr/sbin/nologin'),
 ('www-data', '/usr/sbin/nologin'),
 ('backup', '/usr/sbin/nologin'),
 ('list', '/usr/sbin/nologin'),
 ('irc', '/usr/sbin/nologin'),
 ('gnats', '/usr/sbin/nologin'),
 ('nobody', '/usr/sbin/nologin'),
 ('_apt', '/usr/sbin/nologin'),
 ('systemd-network', '/usr/sbin/nologin'),
 ('systemd-resolve', '/usr/sbin/nologin'),
 ('messagebus', '/usr/sbin/nologin'),
 ('nvidia-persistenced', '/usr/sbin/nologin')]

`sortBy( cmp: Func, ascending: Boolean)` takes a function that returns the sort key.

In [52]:
userAndShell.take(3)

[('root', '/bin/bash'),
 ('daemon', '/usr/sbin/nologin'),
 ('bin', '/usr/sbin/nologin')]

In [53]:
userAndShell.sortBy(lambda x : x[0] ).take(3)

[('_apt', '/usr/sbin/nologin'),
 ('backup', '/usr/sbin/nologin'),
 ('bin', '/usr/sbin/nologin')]

In [54]:
userAndShell.sortBy(lambda x : x[1] ).take(3)

[('root', '/bin/bash'), ('sync', '/bin/sync'), ('daemon', '/usr/sbin/nologin')]

`sortByKey( ascending: Boolean)` assumes the data is in (k,v) pairs. In this case, the example is the same as sortByUser above.

In [55]:
userAndShell.sortByKey().take(3)

[('_apt', '/usr/sbin/nologin'),
 ('backup', '/usr/sbin/nologin'),
 ('bin', '/usr/sbin/nologin')]

## Set Operations

**union** and **intersection** produce new RDD's where the elements can be thought of as being in a set. **distinct** returns the unique set of items in an RDD (*i.e.* converting a multi-set to a set). **sample**(withReplacement:Boolean, fraction:Float, [seed:int]) draws samples with or without replacement. Sample produces more representative samples with larger datasets and has seemingly erratic behavior with small sets.

In [56]:
a.collect()

[7, 2, 3, 1, 2, 3, 4, 5, 6, 7]

In [57]:
b.collect()

[2, 3, 99, 22, -77]

In [58]:
a.union(b).collect()

[7, 2, 3, 1, 2, 3, 4, 5, 6, 7, 2, 3, 99, 22, -77]

In [59]:
a.intersection(b).collect()

[2, 3]

`subtract` removes items from the RDD that are contained in a second RDD

In [60]:
print(a.collect(), " - ", b.collect(), " = ", a.subtract(b).collect())

[7, 2, 3, 1, 2, 3, 4, 5, 6, 7]  -  [2, 3, 99, 22, -77]  =  [4, 1, 5, 6, 7, 7]


In [61]:
a.distinct().collect()

[2, 4, 6, 7, 3, 1, 5]