### Starting with Spark
<img src="http://spark.apache.org/images/spark-logo.png" height=100>

* How To Install Java with Apt on Ubuntu:

```
$ sudo apt update
$ sudo apt install default-jre
$ sudo apt install openjdk-11-jre-headless
$ sudo apt install openjdk-8-jre-headless

* Installing Scala

$ sudo apt install scala
```
* Download Spark from https://spark.apache.org/downloads.html, extract it and put it in the **`~`**

```
~$ tar -zxvf spark-3.2.0-bin-hadoop3.2.tgz 

# rename it to spark
~$ mv spark-3.2.0-bin-hadoop3.2 spark  
```
* add this part to `.zshrc` or `.bashrc`

```
#Spark path (based on your computer)
export SPARK_HOME="$HOME/spark"
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
```
* Restart `.zshrc` or `.bashrc`

```
$ source ~/.bashrc   # Or source ~/.zshrc
```

In [1]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
sc.version

'3.2.0'

### Starting with Spark

In [3]:
spark = sc.textFile("data/spark.txt")
spark

data/spark.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

#### some RDD actions

In [5]:
spark.count()

33

In [6]:
spark.first()

'Apache Spark has its architectural foundation in the resilient distributed dataset '

#### let’s try a transformation

In [9]:
LinesWithSpark = spark.filter(lambda line:'Spark' in line)
LinesWithSpark

PythonRDD[7] at RDD at PythonRDD.scala:53

In [10]:
LinesWithSpark.count()

13

In [11]:
LinesWithSpark.first()

'Apache Spark has its architectural foundation in the resilient distributed dataset '

## More on RDD Operations
find the line from that "poem.txt" file with the most words in it

In [12]:
spark.map(lambda line: len(line.split())).reduce(lambda a, b: a if (a > b) else b)

18

Or 

In [13]:
def max_(a, b):
    if a > b:
        return a
    else:
        return b

In [14]:
spark.map(lambda line: len(line.split())).reduce(max_)

18

In [15]:
wordCounts = spark.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

In [17]:
wordCounts.take(1)

[('Apache', 6)]

In [18]:
wordCounts.collect()

[('Apache', 6),
 ('Spark', 12),
 ('has', 1),
 ('its', 2),
 ('architectural', 1),
 ('foundation', 1),
 ('in', 7),
 ('the', 23),
 ('resilient', 1),
 ('distributed', 8),
 ('dataset', 1),
 ('(RDD),', 1),
 ('a', 18),
 ('read-only', 1),
 ('multiset', 1),
 ('of', 12),
 ('data', 4),
 ('items', 1),
 ('over', 1),
 ('cluster', 5),
 ('machines,', 1),
 ('that', 2),
 ('is', 7),
 ('maintained', 1),
 ('fault-tolerant', 1),
 ('way.', 1),
 ('The', 3),
 ('Dataframe', 1),
 ('API', 3),
 ('was', 2),
 ('released', 1),
 ('as', 4),
 ('an', 1),
 ('abstraction', 1),
 ('on', 6),
 ('top', 1),
 ('RDD,', 1),
 ('followed', 1),
 ('by', 3),
 ('Dataset', 3),
 ('API.', 2),
 ('In', 1),
 ('1.x,', 1),
 ('RDD', 3),
 ('primary', 1),
 ('application', 1),
 ('programming', 1),
 ('interface', 2),
 ('(API),', 1),
 ('but', 1),
 ('2.x', 1),
 ('use', 2),
 ('encouraged', 1),
 ('even', 1),
 ('though', 1),
 ('not', 2),
 ('deprecated.', 1),
 ('technology', 1),
 ('still', 1),
 ('underlies', 1),
 ('and', 5),
 ('RDDs', 3),
 ('were', 1),
 ('


determine what is the most frequent word in the poem, and how many times was it used?

In [19]:
SortedWordCounts = sorted(wordCounts.collect(), key=lambda x:x[1], reverse=True)
SortedWordCounts[0:2]

[('the', 23), ('a', 18)]

In [20]:
wordCounts.reduce(lambda a, b: a if a[1] > b[1] else b)

('the', 23)

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. 

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.

In [21]:
print(LinesWithSpark.count())

13


In [22]:
def count():
    return LinesWithSpark.count()
def t():
    %timeit count()

In [25]:
print(t())

120 ms ± 27.7 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
None


In [26]:
LinesWithSpark.cache()
print(t())

85.2 ms ± 2.97 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
None
