<a href="https://colab.research.google.com/github/mingyuanhua/playground/blob/master/spark_basic_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz  
!tar xf /content/spark-3.4.0-bin-hadoop3.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"
!ls

sample_data  spark-3.4.0-bin-hadoop3  spark-3.4.0-bin-hadoop3.tgz


In [3]:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext()

# Resilient Distributed Datasets (RDDs)
Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

In [4]:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

s = distData.reduce(lambda a, b : a*b)

print (s)

#.textfile()


120


Once created, the distributed dataset (distData) can be operated on in parallel. For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list. We describe operations on distributed datasets later on.

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

# RDD Operations

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

# Basics

To illustrate RDD basics, consider the simple program below:

In [5]:
def transformFunc(line):
  return len(line)

def reduceFunc(a, b):
  return a+b

# Transform: map, filter, FlatMap
# Action:    reduce

#lines = sc.textFile("exa.txt")



file = ["line1", "This is line2"]
lines = sc.parallelize(file)
print (lines.collect())
lineLengths = lines.map(lambda line : len(line))

print (lineLengths.collect())
totalLength = lineLengths.reduce(reduceFunc)
print (totalLength)

# transform     map filter flatmap
# action        reduce, collect

['line1', 'This is line2']
[5, 13]
18


The first line defines a base RDD from an external file. This dataset is not loaded in memory or otherwise acted on: lines is merely a pointer to the file. The second line defines lineLengths as the result of a map transformation. Again, lineLengths is not immediately computed, due to laziness. Finally, we run reduce, which is an action. At this point Spark breaks the computation into tasks to run on separate machines, and each machine runs both its part of the map and a local reduction, returning only its answer to the driver program.

If we also wanted to use lineLengths again later, we could add:

In [7]:
lineLengths.persist()
print (lineLengths.collect())

[5, 13]


before the reduce, which would cause lineLengths to be saved in memory after the first time it is computed.

# Passing Functions to Spark
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are three recommended ways to do this:

* Lambda expressions, for simple functions that can be written as an expression. (Lambdas do not support multi-statement functions or statements that do not return a value.)
* Local defs inside the function calling into Spark, for longer code.
* Top-level functions in a module.

For example, to pass a longer function than can be supported using a lambda, consider the code below:

In [None]:
"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

# Working with Key-Value Pairs

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed “shuffle” operations, such as grouping or aggregating the elements by a key.

In Python, these operations work on RDDs containing built-in Python tuples such as (1, 2). Simply create such tuples and then call your desired operation.

For example, the following code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file:

In [8]:
words = ["hello this is line one", "hello this is line two"]
words_rdd = sc.parallelize(words)
print (words_rdd.collect())
words_rdd = words_rdd.flatMap(lambda line: line.split(" "))
print (words_rdd.collect())
pairs = words_rdd.map(lambda s: (s, 1))
print (pairs.collect())
counts = pairs.reduceByKey(lambda a, b: a + b)
print (counts.collect())

['hello this is line one', 'hello this is line two']
['hello', 'this', 'is', 'line', 'one', 'hello', 'this', 'is', 'line', 'two']
[('hello', 1), ('this', 1), ('is', 1), ('line', 1), ('one', 1), ('hello', 1), ('this', 1), ('is', 1), ('line', 1), ('two', 1)]
[('this', 2), ('is', 2), ('line', 2), ('two', 1), ('hello', 2), ('one', 1)]


#RDD API Examples

##Word Count
In this example, we use a few transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.

In [9]:
# text_file = sc.textFile("hdfs://...")
from google.colab import files
files.upload()

text_file = sc.textFile("example.txt")

Saving example.txt to example.txt


In [10]:
print (text_file.collect())

['In a May 16 letter obtained by CNN, acting Archivist Debra Steidel Wall writes to Trump, “The 16 records in question all reflect communications involving close presidential advisers, some of them directed to you personally, concerning whether, why, and how you should declassify certain classified records.”', '', 'The 16 presidential records, which were subpoenaed earlier this year, may provide critical evidence establishing the former president’s awareness of the declassification process, a key part of the criminal investigation into Trump’s mishandling of classified documents.']


In [14]:
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .filter(lambda x: x[0].strip() != "") \
             .reduceByKey(lambda a, b: a + b)

# print(counts.collect())

for x in counts.collect():
  print (x)

('May', 1)
('16', 3)
('letter', 1)
('obtained', 1)
('CNN,', 1)
('acting', 1)
('Archivist', 1)
('Debra', 1)
('in', 1)
('question', 1)
('reflect', 1)
('involving', 1)
('close', 1)
('presidential', 2)
('of', 4)
('them', 1)
('certain', 1)
('The', 1)
('records,', 1)
('this', 1)
('may', 1)
('provide', 1)
('evidence', 1)
('investigation', 1)
('into', 1)
('mishandling', 1)
('In', 1)
('a', 2)
('by', 1)
('Steidel', 1)
('Wall', 1)
('writes', 1)
('to', 2)
('Trump,', 1)
('“The', 1)
('records', 1)
('all', 1)
('communications', 1)
('advisers,', 1)
('some', 1)
('directed', 1)
('you', 2)
('personally,', 1)
('concerning', 1)
('whether,', 1)
('why,', 1)
('and', 1)
('how', 1)
('should', 1)
('declassify', 1)
('classified', 2)
('records.”', 1)
('which', 1)
('were', 1)
('subpoenaed', 1)
('earlier', 1)
('year,', 1)
('critical', 1)
('establishing', 1)
('the', 3)
('former', 1)
('president’s', 1)
('awareness', 1)
('declassification', 1)
('process,', 1)
('key', 1)
('part', 1)
('criminal', 1)
('Trump’s', 1)
('docu

## Pi Estimation
Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.

In [13]:
import random

def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

NUM_SAMPLES = 10000000
count = sc.parallelize(range(0, NUM_SAMPLES)) \
             .filter(inside).count()
print("Pi is roughly %f" % (4.0 * count / NUM_SAMPLES))

Pi is roughly 3.141282


#DataFrame API Examples
In Spark, a DataFrame is a distributed collection of data organized into named columns. Users can use DataFrame API to perform various relational operations on both external data sources and Spark’s built-in distributed collections without providing specific procedures for processing data. Also, programs based on DataFrame API will be automatically optimized by Spark’s built-in optimizer, Catalyst.

##Text Search
In this example, we search through the error messages in a log file.

In [15]:
from pyspark.sql import SparkSession

In [16]:
print (hasattr(text_file, "toDF"))
# False

SparkSession(sc)
print (hasattr(text_file, "toDF"))
## True

True
True


In [17]:
df = sc.parallelize([("Tom", "M", 20), ("Jack", "M", 18), ("Marry", "F", 20)])\
       .toDF(['name', 'sex', 'age'])

df.show()

+-----+---+---+
| name|sex|age|
+-----+---+---+
|  Tom|  M| 20|
| Jack|  M| 18|
|Marry|  F| 20|
+-----+---+---+



In [18]:
# Counts by certain column
countsByCol = df.groupBy("age").count()
countsByCol.show()

+---+-----+
|age|count|
+---+-----+
| 20|    2|
| 18|    1|
+---+-----+



In [19]:
from pyspark.sql.functions import col

errors = df.filter(col("age") > 19)
errors.show()

+-----+---+---+
| name|sex|age|
+-----+---+---+
|  Tom|  M| 20|
|Marry|  F| 20|
+-----+---+---+



In [21]:
# Creates a DataFrame having a single column named "line"
df = text_file.map(lambda k: k.split("\\t")).toDF(["line"])

errors = df.filter(col("line").like("%you%"))
# Counts all the errors
print (errors.count())

1
