## Init Env

本文件代码来自 tutorialspoint 的 Quick Guide 内容：https://www.tutorialspoint.com/pyspark/pyspark_quick_guide.htm

- 涉及到的文件需要规整一下比较方便（后续完成）

In [1]:
from pyspark import SparkContext
sc = SparkContext("local", "First App")

In [3]:
!java -version

java version "1.8.0_212"
Java(TM) SE Runtime Environment (build 1.8.0_212-b10)
Java HotSpot(TM) 64-Bit Server VM (build 25.212-b10, mixed mode)


## Example

In [5]:
logFile = "test_README.md"
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print ("Lines with a: %i, lines with b: %i" % (numAs, numBs))

Lines with a: 672, lines with b: 347


## RDD

In [6]:
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

In [7]:
# count()
counts = words.count()
print("Number of elements in RDD -> %i" % (counts))

Number of elements in RDD -> 8


In [8]:
# collect()
coll = words.collect()
print("Elements in RDD -> %s" % (coll))

Elements in RDD -> ['scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [15]:
# foreach(f)
def f(x): print(x)
fore = words.foreach(f) 
print("%s" % (fore))
# 这里没有输出，例子中是输出每一个元素
# 版本相差较多，后续研究

None


In [13]:
# filter(f)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print("Fitered RDD -> %s" % (filtered))

Fitered RDD -> ['spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [16]:
# map(f)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print("Key value pair -> %s" % (mapping))

Key value pair -> [('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1)]


In [18]:
# reduce(f)
from pyspark import SparkContext
from operator import add

# 遇到报错 Cannot run multiple SparkContexts at once 是因为前面有 sc 还在运行呢
# 使用 sc.stop() 
sc.stop()

sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print("Adding all the elements -> %i" % (adding))

Adding all the elements -> 15


In [19]:
# join(other)
sc.stop()
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print("Join RDD -> %s" % (final))

Join RDD -> [('hadoop', (4, 5)), ('spark', (1, 2))]


In [20]:
# cache()
sc.stop()
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print("Words got chached > %s" % (caching))

Words got chached > True


## Broadcast / Accumulator
For parallel processing, Apache Spark uses shared variables. A copy of shared variable goes on each node of the cluster when the driver sends a task to the executor on the cluster, so that it can be used for performing tasks.

Broadcast variables are used to save the copy of data across all nodes.

In [31]:
# broadcast()
sc.stop()

sc = SparkContext("local", "Broadcast app") 
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
data = words_new.value 
print("Stored data -> %s" % (data)) 
elem = words_new.value[2] 
print("Printing a particular element in RDD -> %s" % (elem))

Stored data -> ['scala', 'java', 'hadoop', 'spark', 'akka']
Printing a particular element in RDD -> hadoop


In [32]:
# Accumulator
sc.stop()
sc = SparkContext("local", "Accumulator app") 
num = sc.accumulator(10) 
def f(x): 
   global num 
   num+=x 
rdd = sc.parallelize([20,30,40,50]) 
rdd.foreach(f) 
final = num.value 
print("Accumulated value is -> %i" % (final))

Accumulated value is -> 150


## SparkConf

To run a Spark application on the local/cluster, you need to set a few configurations and parameters, this is what SparkConf helps with. It provides configurations to run a Spark application. The following code block has the details of a SparkConf class for PySpark.

- Attributes
    - set(key, value) − To set a configuration property.
    - setMaster(value) − To set the master URL.
    - setAppName(value) − To set an application name.
    - get(key, defaultValue=None) − To get a configuration value of a key.
    - setSparkHome(value) − To set Spark installation path on worker nodes.


In [35]:
from pyspark import SparkConf, SparkContext
#conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
conf = SparkConf().setAppName("PySpark App").setMaster("local")
## set URL 会卡住，后续EMR中测试

sc.stop()
sc = SparkContext(conf=conf)

## SparkFiles

In Apache Spark, you can upload your files using **sc.addFile** (sc is your default SparkContext) and get the path on a worker using **SparkFiles.get**. Thus, SparkFiles resolve the paths to files added through **SparkContext.addFile()**.

SparkFiles contain the following classmethods −

* get(filename)
* getrootdirectory()


## StorageLevel


StorageLevel decides how RDD should be stored. In Apache Spark, StorageLevel decides whether RDD should be stored in the memory or should it be stored over the disk, or both. It also decides whether to serialize RDD and whether to replicate RDD partitions.

```python
class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)
```

## MLlib

Apache Spark offers a Machine Learning API called **MLlib**. PySpark has this machine learning API in Python as well. It supports different kind of algorithms, which are mentioned below −

* **mllib.classification** − The **spark.mllib** package supports various methods for binary classification, multiclass classification and regression analysis. Some of the most popular algorithms in classification are **Random Forest, Naive Bayes, Decision Tree**, etc.

* **mllib.clustering** − Clustering is an unsupervised learning problem, whereby you aim to group subsets of entities with one another based on some notion of similarity.

* **mllib.fpm** − Frequent pattern matching is mining frequent items, itemsets, subsequences or other substructures that are usually among the first steps to analyze a large-scale dataset. This has been an active research topic in data mining for years.

* **mllib.linalg** − MLlib utilities for linear algebra.

* **mllib.recommendation** − Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user item association matrix.

* **spark.mllib** − It ¬currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.mllib uses the Alternating Least Squares (ALS) algorithm to learn these latent factors.

* **mllib.regression** − Linear regression belongs to the family of regression algorithms. The goal of regression is to find relationships and dependencies between variables. The interface for working with linear regression models and model summaries is similar to the logistic regression case.


In [39]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

sc.stop()

if __name__ == "__main__":
   sc = SparkContext(appName="Pspark mllib Example")
   data = sc.textFile("test.data")
   ratings = data.map(lambda l: l.split(','))\
      .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
   
   # Build the recommendation model using Alternating Least Squares
   rank = 10
   numIterations = 10
   model = ALS.train(ratings, rank, numIterations)
   
   # Evaluate the model on training data
   testdata = ratings.map(lambda p: (p[0], p[1]))
   predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
   ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
   MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
   print("Mean Squared Error = " + str(MSE))
   
   # Save and load model
   model.save(sc, "target/tmp/myCollaborativeFilter")
   sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")

Mean Squared Error = 4.443558894358244e-06


## Serializers

Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations.

In [40]:
sc.stop()

from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
