### Checking Java Version

In [1]:
!java -version

openjdk version "11.0.7" 2020-04-14
OpenJDK Runtime Environment (build 11.0.7+10-post-Ubuntu-2ubuntu218.04)
OpenJDK 64-Bit Server VM (build 11.0.7+10-post-Ubuntu-2ubuntu218.04, mixed mode, sharing)


### Setting Java 8 Environment

In [2]:
!sudo update-alternatives --config java

There is only one alternative in link group java (providing /usr/bin/java): /usr/lib/jvm/java-11-openjdk-amd64/bin/java
Nothing to configure.


### Downloading Spark 

In [0]:
!wget -q http://apache.spinellicreations.com/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop2.7.tgz

### Extracting Spark Files

In [0]:
!tar xf spark-3.0.0-preview2-bin-hadoop2.7.tgz

### Installing FindSpark

In [0]:
!pip install -q findspark

### Setting up Home environment

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview2-bin-hadoop2.7"

### Creating Spark Session

In [0]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

### Stopping the session

In [0]:
spark.stop()

### Installing PySpark

In [9]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 59kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 41.9MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=d2f451670f2b091e84c19253acfbbae62d520976867a38a7133d1f26c7cee1da
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


## First PySpark Job

#### Importing PySpark

In [0]:
import pyspark

#### Creating a Spark Context

In [0]:
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setMaster("local")
conf.setAppName("spark-basic")
sc = SparkContext(conf=conf)

#### Function to calculate mod

In [0]:
import numpy as np
def mod(x):
    return (x, np.mod(x,2))

#### Creating an RDD

In [13]:
print(range(5))

rdd = sc.parallelize(range(1000)).map(mod).take(10)
print(rdd)

range(0, 5)
[(0, 0), (1, 1), (2, 0), (3, 1), (4, 0), (5, 1), (6, 0), (7, 1), (8, 0), (9, 1)]


#### Creating an RDD using List

In [14]:
values = [1, 2, 3, 4, 5]
rdd = sc.parallelize(values)

## Print all the 5 elements of RDD
print(rdd.take(5)) ## take() to print lol

[1, 2, 3, 4, 5]


#### Load the text file

In [16]:
## load the
from google.colab import files
uploaded = files.upload()

Saving macbeth.txt to macbeth.txt


#### Loading a text file to Spark

In [0]:
rdd = sc.textFile("macbeth.txt")

#### Print the rdd data

In [0]:
# rdd.collect()

#### RDD Persistence

In [19]:
aba = sc.parallelize(range(1, 10000, 2))
aba.persist()

PythonRDD[7] at RDD at PythonRDD.scala:53

#### RDD Caching

In [20]:
textFile = sc.textFile("macbeth.txt")
textFile.cache()

macbeth.txt MapPartitionsRDD[9] at textFile at NativeMethodAccessorImpl.java:0

#### Map

In [21]:
x = sc.parallelize(["spark", "rdd", "example", "sample", "example"])
y = x.map(lambda x:(x, 1))
y.collect()

[('spark', 1), ('rdd', 1), ('example', 1), ('sample', 1), ('example', 1)]

#### FlatMap

In [22]:
rdd = sc.parallelize([2, 3, 4])
rdd_y = rdd.flatMap(lambda x: range(1, x))
rdd_y.collect()

[1, 1, 2, 1, 2, 3]

#### Filter

In [23]:
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()

[2, 4]

#### Sample

In [24]:
parallel =  sc.parallelize(range(9))
parallel.sample(True, 0.2).count()

2

In [25]:
parallel.sample(True, 0.2).collect()

[0]

In [26]:
parallel.sample(False, 1).count()

9

In [27]:
parallel.sample(False, 1).collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8]

#### Union

In [28]:
parallel = sc.parallelize(range(1, 9))
par = sc.parallelize(range(5, 15))
parallel.union(par).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

#### Intersection

In [29]:
parallel = sc.parallelize(range(1, 9))
par = sc.parallelize(range(5, 15))
parallel.intersection(par).collect()

[6, 8, 5, 7]

#### Distinct

In [30]:
parallel = sc.parallelize(range(1, 9))
par = sc.parallelize(range(5, 15))
parallel.union(par).distinct().collect()

[2, 4, 6, 8, 10, 12, 14, 1, 3, 5, 7, 9, 11, 13]

#### SortBy

In [31]:
y = sc.parallelize([5, 7, 1, 3, 2, 1])
y.sortBy(lambda c: c, True).collect()

[1, 1, 2, 3, 5, 7]

In [32]:
z = sc.parallelize([("H", 10), ("A", 26), ("Z", 1), ("L", 5)])
z.sortBy(lambda c: c, False).collect()

[('Z', 1), ('L', 5), ('H', 10), ('A', 26)]

#### MapPartitions

In [33]:
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): yield sum(iterator)
rdd.mapPartitions(f).collect()

[3, 7]

#### MapPartitions = WithIndex

In [34]:
rdd = sc.parallelize([1, 2, 3, 4], 4)
def f(splitIndex, iterator): yield splitIndex
rdd.mapPartitionsWithIndex(f).sum()

6

#### GroupBy

In [35]:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.map(lambda x: x%2).collect()
print(result)

# sorted([(x, sorted(y)) for (x,y) in result])

[1, 1, 0, 1, 1, 0]


In [36]:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x%2).collect()
print(result)

sorted([(x, sorted(y)) for (x,y) in result])

[(1, <pyspark.resultiterable.ResultIterable object at 0x7f7f544b2780>), (0, <pyspark.resultiterable.ResultIterable object at 0x7f7f544b2e48>)]


[(0, [2, 8]), (1, [1, 1, 3, 5])]

#### KeyBy

In [37]:
x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
y = sc.parallelize(zip(range(0, 5), range(0, 5)))
print(x, y)

[(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]

PythonRDD[64] at RDD at PythonRDD.scala:53 ParallelCollectionRDD[63] at readRDDFromFile at PythonRDD.scala:247


[(0, [[0], [0]]),
 (1, [[1], [1]]),
 (2, [[], [2]]),
 (3, [[], [3]]),
 (4, [[2], [4]])]

#### Zip

In [38]:
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()

[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

#### Zip - WithinIndex

In [39]:
sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()

[('a', 0), ('b', 1), ('c', 2), ('d', 3)]

#### Repartition

In [40]:
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7], 4)
sorted(rdd.glom().collect())

[[1], [2, 3], [4, 5], [6, 7]]

In [41]:
len(rdd.repartition(2).glom().collect())

2

#### Coalesce

In [42]:
sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()

[[1], [2, 3], [4, 5]]

In [43]:
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(2).glom().collect()

[[1], [2, 3, 4, 5]]

### RDDs: `Actions`





#### Reduce

In [44]:
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)

15

In [45]:
sc.parallelize(2 for _ in range(10)).map(lambda x: 1).cache().reduce(add)

10

#### First

In [46]:
sc.parallelize([2, 3, 4, 5, 5]).first()

2

#### TakeOrdered

In [47]:
nums = sc.parallelize([1, 5, 3, 9, 4, 0, 2])
nums.takeOrdered(5)

[0, 1, 2, 3, 4]

#### Take

In [48]:
nums = sc.parallelize([1, 5, 3, 9, 4, 0, 2])
nums.take(5)

[1, 5, 3, 9, 4]

#### Count

In [49]:
nums = sc.parallelize([1, 5, 3, 9, 4, 0, 2])
nums.count()

7

#### Collect

In [50]:
c = sc.parallelize(["Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"], 2)
c.collect()

['Gnu', 'Cat', 'Rat', 'Dog', 'Gnu', 'Rat']

In [51]:
c = sc.parallelize(["Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"], 2)
c.distinct().collect()

['Cat', 'Rat', 'Gnu', 'Dog']

In [52]:
alphanuermics = sc.parallelize([(1, "a"), (2, "b"), (3, "c")])
alphanuermics.collectAsMap()

{1: 'a', 2: 'b', 3: 'c'}

#### SaveAsTextfile

In [0]:
a = sc.parallelize(range(1, 1000), 3)
a.saveAsTextFile("mydata_a1")
# a.saveAsTextFile("C:\Users\yrobi\Google Drive\Data Science Job Search\1-PySpark\mydata_a1")

In [0]:
x = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 21], 3)
x.saveAsTextFile("sample1.txt")

### Foreach

In [0]:
def f(x):
    print(x)
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

### Foreach - Partition

In [0]:
def f(iterator):
    for x in iterator:
        print(x)
    sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)

### Mathematical Actions

In [75]:
numbers = sc.parallelize(range(1, 100))

print(numbers.sum())
print(numbers.min())
print(numbers.variance())
print(numbers.max())
print(numbers.max())
print(numbers.mean())
print(numbers.stdev())

4950
1
816.6666666666666
99
99
50.0
28.577380332470412


### CountByValue

In [0]:
# RDD - Resilient Distributed Datasets

rdd = sc.textFile("macbeth.txt")

In [61]:
rdd.take(5) # 5 elements from macbeth.txt file

["***The Project Gutenberg's Etext of Shakespeare's First Folio***",
 '********************The Tragedie of Macbeth*********************',
 '',
 'This is our 3rd edition of most of these plays.  See the index.',
 '']

In [0]:
def Func(lines):
    lines = lines.lower()
    lines = lines.split()
    return lines


In [63]:
# rdd.map(lambda lines: lines.split()).take(5)
# map(function)
rdd1 = rdd.map(Func)
rdd1.take(5)

[['***the',
  'project',
  "gutenberg's",
  'etext',
  'of',
  "shakespeare's",
  'first',
  'folio***'],
 ['********************the', 'tragedie', 'of', 'macbeth*********************'],
 [],
 ['this',
  'is',
  'our',
  '3rd',
  'edition',
  'of',
  'most',
  'of',
  'these',
  'plays.',
  'see',
  'the',
  'index.'],
 []]

In [64]:
# flatmap(function)
rdd2 = rdd.flatMap(Func) #flatMap is transformation
rdd2.take(15) # take is action

['***the',
 'project',
 "gutenberg's",
 'etext',
 'of',
 "shakespeare's",
 'first',
 'folio***',
 '********************the',
 'tragedie',
 'of',
 'macbeth*********************',
 'this',
 'is',
 'our']

In [66]:
# stopwords example
stopwords = ['a', 'all', 'the', 'as', 'is', 'am', 'an', 'and', 'be', 'been', 'from', 'had', 'I']
rdd3 = rdd2.filter(lambda x: x not in stopwords)
rdd3.take(15)

['***the',
 'project',
 "gutenberg's",
 'etext',
 'of',
 "shakespeare's",
 'first',
 'folio***',
 '********************the',
 'tragedie',
 'of',
 'macbeth*********************',
 'this',
 'our',
 '3rd']

In [67]:
# groupBy() will group words 
rdd4 = rdd3.groupBy(lambda w: w[0:3])
print(rdd4.take(2))

# print [(k, list(v)) for (k, v) in rdd4.take(1)]

[('***', <pyspark.resultiterable.ResultIterable object at 0x7f7f53ac8128>), ('pro', <pyspark.resultiterable.ResultIterable object at 0x7f7f53ac8198>)]


In [68]:
# sum of number ranging from 1 to 10,000
num_add = sc.parallelize(range(1, 10000))

num_add.reduce(lambda x,y: x+y) # reduce() is actions

49995000