In [None]:
# Apache Spark is a preferred quick, real-time framework using to manipulate big data. 
# The base language used is Scala, but some people prefer to use Python --> PySpark
# Hadoop performs the batched data processing while spark can stream, compute, and also batch process. 
# Spark uses HDFS (Hadoop distribute file system) for storing data and runs apps on YARN too. 
# PySpark offers PySpark Shell which allows us to integrate python libraries with the spark framework. 

In [None]:
# SparkContext is the entryway to spark. It uses the library P44J to launch a JVM. 
# [https://www.tutorialspoint.com/pyspark/pyspark_sparkcontext.htm] for more. 

In [None]:
# important concept in Spark - RDD (Resilient Distributed Dataset)
# --> the elements that run on multiple nodes to do parallel processing on a data cluster.
# (what makes working with big data so much faster than it would be)
# RDDs are immutable and fault tolerant, you can transform them and act on them. 

# Transformation: operations applied to an RDD to make a new one. ex. Filter, map
# Action: operations appied on RDD that compute something and send response back

# To apply any operation in Spark, must create a PySpark RDD first

In [None]:
[https://www.tutorialspoint.com/pyspark/pyspark_rdd.htm]

In [None]:
# PySpark RDD class

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

In [None]:
# run basic operations with pyspark

words = sc.parallelize ( # this is an RDD
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)


In [None]:
# count
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)

# cmd: $SPARK_HOME/bin/spark-submit count.py
# output: Number of elements in RDD → 8

In [None]:
# foreach
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 

# cmd: $SPARK_HOME/bin/spark-submit foreach.py

# output:
# scala
# java
# hadoop
# spark
# akka
# spark vs hadoop
# pyspark
# pyspark and spark

In [None]:
# filter

from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)

# cmd: $SPARK_HOME/bin/spark-submit filter.py
# output: 
Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

In [None]:
# map 

from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)

# cmd: $SPARK_HOME/bin/spark-submit map.py
# output:
Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

In [None]:
# reduce

from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)

# cmd: $SPARK_HOME/bin/spark-submit reduce.py
# output: Adding all the elements -> 15

In [None]:
# join

from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)

# cmd: $SPARK_HOME/bin/spark-submit join.py
# output:
Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]