# Foreword: Spark Overview
Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

## Machine Learning Library (MLlib)
MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:

- ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
- Featurization: feature extraction, transformation, dimensionality reduction, and selection
- Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
- Persistence: saving and load algorithms, models, and Pipelines
- Utilities: linear algebra, statistics, data handling, etc.

> ### Announcement: DataFrame-based API is primary API

> **The MLlib RDD-based API is now in maintenance mode.**

As of Spark 2.0, the RDD-based APIs in the ``spark.mllib`` package have entered maintenance mode. The primary Machine Learning API for Spark is now the DataFrame-based API in the ``spark.ml`` package.

What are the implications?

- MLlib will still support the RDD-based API in spark.mllib with bug fixes.
- MLlib will not add new features to the RDD-based API.
- In the Spark 2.x releases, MLlib will add features to the DataFrames-based API to reach feature parity with the RDD-based API.
- After reaching feature parity (roughly estimated for Spark 2.3), the RDD-based API will be deprecated.
- The RDD-based API is expected to be removed in Spark 3.0.

> **Why is MLlib switching to the DataFrame-based API?**

- DataFrames provide a more user-friendly API than RDDs. The many benefits of DataFrames include Spark Datasources, SQL/DataFrame queries, Tungsten and Catalyst optimizations, and uniform APIs across languages.
- The DataFrame-based API for MLlib provides a uniform API across ML algorithms and across multiple languages.
- DataFrames facilitate practical ML Pipelines, particularly feature transformations. See the Pipelines guide for details.

What is “Spark ML”?

- **“Spark ML” is not an official name but occasionally used to refer to the MLlib DataFrame-based API.** This is majorly due to the org.apache.spark.ml Scala package name used by the DataFrame-based API, and the “Spark ML Pipelines” term we used initially to emphasize the pipeline concept.

Is MLlib deprecated?

- No. MLlib includes both the RDD-based API and the DataFrame-based API. The RDD-based API is now in maintenance mode. But neither API is deprecated, nor MLlib as a whole.

# Apache Spark Examples
These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects. You create a dataset from external data, then apply parallel operations to it. The building block of the Spark API is its RDD API. In the RDD API, there are two types of operations: transformations, which define a new dataset based on previous ones, and actions, which kick off a job to execute on a cluster. On top of Spark’s RDD API, high level APIs are provided, e.g. DataFrame API and Machine Learning API. These high level APIs provide a concise way to conduct certain data operations. In this page, we will show examples using RDD API as well as examples using high level APIs.

To check the version of python installed on your system, you can go through the sys module by typing in a python script the following lines:

In [1]:
import sys
print(sys.version)

3.6.6 |Anaconda, Inc.| (default, Jun 28 2018, 11:27:44) [MSC v.1900 64 bit (AMD64)]


### RDD-based APIs Examples (``spark.mllib`` package)
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.


#### RDD Operations
RDDs support two types of operations: **transformations**, which create a new dataset from an existing one, and **actions**, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

RDDs have two types of methods:

    - The transformations that output another RDD.

    - Actions that output something other than a RDD.

#### Spark Transformations
- map()
- flatMap()
- mapPartitions()
- filter()
- sample()
- union()
- intersection()
- distinct()
- join()
- ...

#### Spark Actions
- reduce()             
- collect()               
- count()
- first()    
- takeSample(withReplacement, num, [seed]) 
- ...

> ### Word Count
In this example, we use a few transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.

In [2]:
import findspark
findspark.init()

from pyspark import SparkConf, SparkContext

## write snappy compressed output

sparkConf = SparkConf().setAll([
    ("spark.cores.max", "4"),
    ("spark.executor.memory", "2G"),
    ("spark.ui.port", "8080")
    ]).setMaster("local[*]").setAppName("Word Count")
 
sc = SparkContext(conf=sparkConf)

import os, shutil
dir_name = "pyspark-examples"
if os.path.isdir(dir_name):
    shutil.rmtree(dir_name)
       
text_file = sc.textFile("data/wonderland.txt")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

# To display the counting result, use the following action:
#counts.collect()
# To save the counting result in text files, use the following action:
#counts.saveAsTextFile("wordcountresults.txt")
# print the 10 first words
print(counts.take(10))

sc.stop()    

[("ALICE'S", 3), ('ADVENTURES', 1), ('', 1924), ('Lewis', 1), ('Carroll', 1), ('MILLENNIUM', 1), ('FULCRUM', 1), ('EDITION', 1), ('I.', 1), ('Down', 1)]


> ### Pi Estimation
Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be $\frac{π}{4}$, so we use this to get our estimate.

In [3]:
import findspark
findspark.init()

import pyspark
import random

sc = pyspark.SparkContext(appName="Pi")
num_samples = 1000000

def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1

count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4*count/num_samples
print("Pi is roughly %f" % pi)

sc.stop()

Pi is roughly 3.140732


>### Building a spam classifier locally, with PySpark and some Spark MLLib classification algorithms

In [4]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("Spam classifier with Spark MLLib classification algorithms")
sc = SparkContext(conf=conf)

from pyspark.mllib.feature import HashingTF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
from pyspark.mllib.tree import DecisionTree, RandomForest
from pyspark.mllib.evaluation import MulticlassMetrics

# Load 2 types of emails from text files: spam and ham (non-spam).
# Each line has text from one email.

spam = sc.textFile("data/spam")
ham = sc.textFile("data/ham") 

spam_words = spam.map(lambda email: email.split())
ham_words = ham.map(lambda email: email.split())

print(spam_words.take(1))
print(ham_words.take(1))

# Create a HashingTF instance to map email text to vectors of features.
tf = HashingTF(numFeatures = 200)
spam_features = tf.transform(spam_words)
ham_features = tf.transform(ham_words)

print(spam_features.take(1))
print(ham_features.take(1))

# Create binary LabeledPoint datasets for positive (spam) and zero (ham) examples.
spam_samples = spam_features.map(lambda features:LabeledPoint(1, features))
ham_samples = ham_features.map(lambda features:LabeledPoint(0, features))

print(spam_samples.take(1))
print(ham_samples.take(1))

#Split the data set 80/20
samples = spam_samples.union(ham_samples)
[training_data, test_data] = samples.randomSplit([0.8, 0.2])
training_data.cache()
test_data.cache()

def score(model, test_data):
    predictions = model.predict(test_data.map(lambda x: x.features))
    labels_and_preds = test_data.map(lambda x: x.label).zip(predictions)
    accuracy = labels_and_preds.filter(lambda x: x[0] == x[1]).count()/float(test_data.count())
    return accuracy

#Applied some classification algorithms 
MLA1 = [LogisticRegressionWithSGD(), SVMWithSGD(), NaiveBayes()]
MLA2 = [DecisionTree()]
MLA3 = [RandomForest()]

MLA = MLA1+MLA2+MLA3

for algo in MLA:
    #set name and parameters
    MLA_name = algo.__class__.__name__
    if algo in MLA1:
        model = algo.train(training_data)
        print(MLA_name+' accuracy = {:.2f}'.format(score(model, test_data)))
    elif algo in MLA2:
        model = algo.trainClassifier(training_data,numClasses=2,categoricalFeaturesInfo={})
        print(MLA_name+' accuracy = {:.2f}'.format(score(model, test_data)))
    else:
        model = algo.trainClassifier(training_data,numClasses=2,categoricalFeaturesInfo={},numTrees=16)
        print(MLA_name+' accuracy = {:.2f}'.format(score(model, test_data)))

sc.stop()

[['You', 'have', '1', 'new', 'message.', 'Please', 'call', '08712400200.']]
[['Rofl.', 'Its', 'true', 'to', 'its', 'name']]
[SparseVector(200, {22: 2.0, 59: 1.0, 82: 1.0, 95: 1.0, 100: 1.0, 106: 1.0, 125: 1.0})]
[SparseVector(200, {8: 1.0, 16: 1.0, 28: 1.0, 63: 1.0, 70: 1.0, 165: 1.0})]
[LabeledPoint(1.0, (200,[22,59,82,95,100,106,125],[2.0,1.0,1.0,1.0,1.0,1.0,1.0]))]
[LabeledPoint(0.0, (200,[8,16,28,63,70,165],[1.0,1.0,1.0,1.0,1.0,1.0]))]
LogisticRegressionWithSGD accuracy = 0.86
SVMWithSGD accuracy = 0.87
NaiveBayes accuracy = 0.93
DecisionTree accuracy = 0.89
RandomForest accuracy = 0.87


### DataFrame-based API Examples (``spark.ml`` package)
In Spark, a DataFrame is a distributed collection of data organized into named columns. Users can use DataFrame API to perform various relational operations on both external data sources and Spark’s built-in distributed collections without providing specific procedures for processing data. Also, programs based on DataFrame API will be automatically optimized by Spark’s built-in optimizer, Catalyst.

> ### Automatic classification of texts
We will create a text classification model that can predict what is the subject of a conversation. We will use a public dataset containing conversations that took place in Usenet newsgroups.

In [5]:
# Instantiation of a SparkContext
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

#Start a SparkContext and a SparkSession
confspark = SparkConf().setMaster("local[*]").setAppName("Automatic classification of texts with Spark")
sc = SparkContext(conf=confspark)
spark = SparkSession.builder.getOrCreate()

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql import Row

def load_dataframe(path):
    rdd = sc.textFile(path)\
        .map(lambda line: line.split())\
        .map(lambda words: Row(label=words[0], words=words[1:]))
    return spark.createDataFrame(rdd)

train_data = load_dataframe("data/20ng-train-all-terms.txt")
test_data = load_dataframe("data/20ng-test-all-terms.txt")

# Learn the vocabulary of our training data
vectorizer = CountVectorizer(inputCol = "words", outputCol = "bag_of_words", vocabSize=6000)
# Convert string labels to floats
label_indexer = StringIndexer(inputCol = "label", outputCol = "label_index")
# Learn multiclass classifier on training data
classifier = NaiveBayes(labelCol="label_index", featuresCol="bag_of_words", predictionCol="label_index_predicted")

pipeline = Pipeline(stages=[vectorizer, label_indexer, classifier])

# Fit the pipeline to training documents.
pipeline_model = pipeline.fit(train_data)

# Predict labels on test data
test_predicted = pipeline_model.transform(test_data)

# Classifier evaluation
evaluator = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="label_index_predicted", metricName="accuracy")
accuracy = evaluator.evaluate(test_predicted)
print("Accuracy = {:.2f}".format(accuracy))

sc.stop()

Accuracy = 0.78


>### Building a spam classifier locally, with PySpark and some Spark ML classification algorithms

In [6]:
#Instantiation and start of a SparkSession
import findspark
findspark.init()

from pyspark.sql import SparkSession 

spark = SparkSession.builder.master("local[*]").appName("Spam classifier with Spark ML classification algorithms").getOrCreate()

#Read Data
df_ham = spark.read.csv("data/ham", sep = "\t", inferSchema=True, header = False)
df_spam = spark.read.csv("data/spam", sep = "\t", inferSchema=True, header = False)

#Add column to designate the message status
from pyspark.sql.functions import lit, concat

df_ham = df_ham.withColumn("status",lit("ham"))
df_spam = df_spam.withColumn("status",lit("spam"))

#Rename Columns
df_ham = df_ham.withColumnRenamed('_c0', 'message').withColumnRenamed('status', 'status')
df_ham.show(5, truncate = False)
df_spam = df_spam.withColumnRenamed('_c0', 'message').withColumnRenamed('status', 'status')
df_spam.show(5)

#Create a unique dataframe
df = df_ham.unionAll(df_spam)

#Change the status column to numeric
df.createOrReplaceTempView('temp')
df = spark.sql('select case status when "ham" then 1.0  else 0 end as label, message from temp')
df.show(5, truncate = False)

#Tokenize the messages
from pyspark.ml.feature import  Tokenizer
tokenizer = Tokenizer(inputCol="message", outputCol="words")
wordsData = tokenizer.transform(df)
wordsData.show(5)

#Apply CountVectorizer
from pyspark.ml.feature import CountVectorizer
#count = CountVectorizer(inputCol="words", outputCol="bag_of_words")
count = CountVectorizer(inputCol="words", outputCol="bag_of_words", vocabSize=6000)
model = count.fit(wordsData)
df_bag_of_words = model.transform(wordsData)
df_bag_of_words.show(5)

#Apply term frequency–inverse document frequency (TF-IDF)
from pyspark.ml.feature import  IDF
idf = IDF(inputCol="bag_of_words", outputCol="features")
idfModel = idf.fit(df_bag_of_words)
idfData = idfModel.transform(df_bag_of_words)
idfData.select("label", "features").show(5)  

#Split data into training (80%) and testing (20%)
seed = 0  # set seed for reproducibility
trainDF, testDF = idfData.randomSplit([0.8,0.2], seed)

trainDF.groupBy('label').count().show()
testDF.groupBy('label').count().show()


#Applied some classification algorithms 

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import MultilayerPerceptronClassifier


from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Creation of the classification models.
lr = LogisticRegression(maxIter=10)
rf = RandomForestClassifier(numTrees=10)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
dt = DecisionTreeClassifier(maxDepth=2)
gbt = GBTClassifier(maxIter=10)
mpc = MultilayerPerceptronClassifier(maxIter=100, layers=[count.getVocabSize(), 15, 6, 2], blockSize=128)

MLA = [lr, rf, nb, dt, gbt, mpc]

for algo in MLA:
    #set name and parameters
    MLA_name = algo.__class__.__name__
    # train the model
    model = algo.fit(trainDF)
    #Predict using the test data and evaluate the predictions
    predictions = model.transform(testDF)
    # compute accuracy on the test set
    my_mc = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='label', metricName='accuracy')
    accuracy = my_mc.evaluate(predictions)
    #print("Test set accuracy = " + str(accuracy))
    print(MLA_name+' test set accuracy = {:.2f}'.format(accuracy))

+-----------------------------------------------------------------------------------------------------------------------------+------+
|message                                                                                                                      |status|
+-----------------------------------------------------------------------------------------------------------------------------+------+
|Rofl. Its true to its name                                                                                                   |ham   |
|The guy did some bitching but I acted like i'd be interested in buying something else next week and he gave it to us for free|ham   |
|Pity, * was in mood for that. So...any other suggestions?                                                                    |ham   |
|Will ü b going to esplanade fr home?                                                                                         |ham   |
|Huh y lei...                                          

> ### Text Search
In this example, we search through the authentication failures in a log file.

In [7]:
import findspark
findspark.init()

import pyspark

sc = pyspark.SparkContext(appName="1st example: Text Search")
spark = pyspark.sql.SparkSession(sc)

textFile = sc.textFile("data/Linux_2k.log")

# Creates a DataFrame having a single column named "line"

df = textFile.map(lambda s: s.split("\n")).toDF(["line"])

df.show(4)

# Counts lines mentioning "authentication failure"
failures = df.filter(df.line.like("%authentication failure%"))

print('Number of lines mentioning "authentication failure":', failures.count())

# Counts authentication failures mentioning "ruser= rhost=218.188.2.4"

print('Number of authentication failures mentioning "ruser= rhost=218.188.2.4":', failures.filter(failures.line.like("%ruser= rhost=218.188.2.4%")).count())

# Fetches the ruser= rhost=218.188.2.4 authentication failures as an array of strings
failures.filter(df.line.like("%ruser= rhost=218.188.2.4%")).collect()

sc.stop()

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Spam classifier with Spark ML classification algorithms, master=local[*]) created by getOrCreate at <ipython-input-6-7dfaa6b688a6>:7 