In [None]:
#Spark achieves simplicity by providing a fundamental abstraction of a simple logical data structure called a Resilient Distributed Dataset (RDD)
#upon which all other higher-level structured data abstractions, such as DataFrames and Datasets, are constructed.
#Unlike Apache Hadoop, which included both storage and compute, Spark decouples the two.

# Spark SQL, Spark MLlib, Spark Structured Streaming, and GraphX


##Spark Components

### Spark SQL

In [None]:

## Read data off Amazon S3 bucket into a Spark DataFrame
spark.read.json("s3://apache_spark/data/committers.json")
  .createOrReplaceTempView("committers")
## Issue a SQL query and return the result as a Spark DataFrame
val results = spark.sql("""SELECT name, org, module, release, num_commits
    FROM committers WHERE module = 'mllib' AND num_commits > 10
    ORDER BY num_commits DESC""")

### Spark ML

In [None]:
##Spark comes with a library containing common machine learning (ML) algorithms called MLlib
#spark.mllib - RDD Based
#spark.ml - Dataframe Based

In [None]:
# Build Model Spark In Python
from pyspark.ml.classification import LogisticRegression
...
training = spark.read.csv("s3://...")
test = spark.read.csv("s3://...")

# Load training data
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Predict
lrModel.transform(test)

### Spark Structured Streaming

In [None]:
## Get Wordcount
# In Python
# Read a stream from a local host
from pyspark.sql.functions import explode, split
lines = (spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load())

# Perform transformation
# Split the lines into words
words = lines.select(explode(split(lines.value, " ")).alias("word"))

# Generate running word count
word_counts = words.groupBy("word").count()

# Write out to the stream to Kafka
query = (word_counts
  .writeStream
  .format("kafka")
  .option("topic", "output"))

### GraphX

In [None]:
## In Scala
val graph = Graph(vertices, edges)
messages = spark.textFile("hdfs://...")
val graph2 = graph.joinVertices(messages) {
  (id, vertex, msg) => ...
}

## Spark Sessions

In [None]:

# In Spark 2.0, the SparkSession became a unified conduit to all Spark operations and data. Not only did it subsume previous entry points to Spark like the SparkContext, SQLContext, HiveContext,
# SparkConf, and StreamingContext, but it also made working with Spark simpler and easier.

In [None]:
# In Scala
import org.apache.spark.sql.SparkSession

# Build SparkSession
val spark = SparkSession
  .builder
  .appName("LearnSpark")
  .config("spark.sql.shuffle.partitions", 6)
  .getOrCreate()
...
# Use the session to read JSON
val people = spark.read.json("...")
...
# Use the session to issue a SQL query
val resultsDF = spark.sql("SELECT city, pop, state, zip FROM table_name")

In [None]:
## Currently, Spark supports four cluster managers: the built-in standalone cluster manager, Apache Hadoop YARN, Apache Mesos, and Kubernetes.

#Mode	Spark driver	Spark executor	Cluster manager
#Local	 - Runs on a single JVM, like a laptop or single node	Runs on the same JVM as the driver	Runs on the same host
#Standalone	Can run on any node in the cluster	Each node in the cluster will launch its own executor JVM	Can be allocated arbitrarily to any host in the cluster
#YARN (client)	Runs on a client, not part of the cluster	YARN’s NodeManager’s container	YARN’s Resource Manager works with YARN’s Application Master to allocate the containers on NodeManagers for executors
#YARN (cluster)	Runs with the YARN Application Master	Same as YARN client mode	Same as YARN client mode
#Kubernetes	Runs in a Kubernetes pod	Each worker runs within its own pod	Kubernetes Master