In [1]:
from __future__ import print_function
import sys
from random import random
from operator import add

from pyspark.sql import SparkSession

if __name__ == "__main__":
    """
        Pi
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = 1
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()

Pi is roughly 3.137520


In [2]:
"""
This is an example implementation of ALS for learning how to use Spark. Please refer to
pyspark.ml.recommendation.ALS for more conventional use.
This example requires numpy (http://www.numpy.org/)
"""
from __future__ import print_function
import sys
import numpy as np
from numpy.random import rand
from numpy import matrix
from pyspark.sql import SparkSession

LAMBDA = 0.01   # regularization
np.random.seed(42)

def rmse(R, ms, us):
    diff = R - ms * us.T
    return np.sqrt(np.sum(np.power(diff, 2)) / (M * U))

def update(i, mat, ratings):
    uu = mat.shape[0]
    ff = mat.shape[1]
    XtX = mat.T * mat
    Xty = mat.T * ratings[i, :].T
    for j in range(ff):
        XtX[j, j] += LAMBDA * uu
    return np.linalg.solve(XtX, Xty)

if __name__ == "__main__":

    """
    Usage: als [M] [U] [F] [iterations] [partitions]"
    """

    print("""WARN: This is a naive implementation of ALS and is given as an
      example. Please use pyspark.ml.recommendation.ALS for more
      conventional use.""", file=sys.stderr)

    spark = SparkSession\
        .builder\
        .appName("PythonALS")\
        .getOrCreate()

    sc = spark.sparkContext

    M = 100
    U = 500
    F = 10
    ITERATIONS = 5
    partitions = 2

    print("Running ALS with M=%d, U=%d, F=%d, iters=%d, partitions=%d\n" %
          (M, U, F, ITERATIONS, partitions))

    R = matrix(rand(M, F)) * matrix(rand(U, F).T)
    ms = matrix(rand(M, F))
    us = matrix(rand(U, F))

    Rb = sc.broadcast(R)
    msb = sc.broadcast(ms)
    usb = sc.broadcast(us)

    for i in range(ITERATIONS):
        ms = sc.parallelize(range(M), partitions) \
               .map(lambda x: update(x, usb.value, Rb.value)) \
               .collect()
        # collect() returns a list, so array ends up being
        # a 3-d array, we take the first 2 dims for the matrix
        ms = matrix(np.array(ms)[:, :, 0])
        msb = sc.broadcast(ms)

        us = sc.parallelize(range(U), partitions) \
               .map(lambda x: update(x, msb.value, Rb.value.T)) \
               .collect()
        us = matrix(np.array(us)[:, :, 0])
        usb = sc.broadcast(us)

        error = rmse(R, ms, us)
        print("Iteration %d:" % i)
        print("\nRMSE: %5.4f\n" % error)

    spark.stop()

Running ALS with M=100, U=500, F=10, iters=5, partitions=2

Iteration 0:

RMSE: 0.2229

Iteration 1:

RMSE: 0.0731

Iteration 2:

RMSE: 0.0317

Iteration 3:

RMSE: 0.0315

Iteration 4:

RMSE: 0.0315



WARN: This is a naive implementation of ALS and is given as an
      example. Please use pyspark.ml.recommendation.ALS for more
      conventional use.


In [3]:
from __future__ import print_function
from pyspark.ml.feature import Word2Vec

from pyspark.sql import SparkSession

spark= SparkSession\
                .builder \
                .appName("dataFrame") \
                .getOrCreate()

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
result.show()
spark.stop()

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.03238218240439892,0.03422726094722748,-0.014331452548503876]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [0.06346431374549866,0.004565070516296795,0.004498231091669628]

Text: [Logistic, regression, models, are, neat] => 
Vector: [0.005696431919932366,0.003507903963327408,-0.008991636149585248]

+--------------------+--------------------+
|                text|              result|
+--------------------+--------------------+
|[Hi, I, heard, ab...|[0.03238218240439...|
|[I, wish, Java, c...|[0.06346431374549...|
|[Logistic, regres...|[0.00569643191993...|
+--------------------+--------------------+



In [None]:
from __future__ import print_function
import sys
from pyspark.sql import SparkSession

if __name__ == "__main__":

    spark = SparkSession\
        .builder\
        .appName("PythonSort")\
        .getOrCreate()

    lines = spark.read.text('numbers.txt').rdd.map(lambda r: r[0])
    sortedCount = lines.flatMap(lambda x: x.split()) \
        .map(lambda x: (int(x), 1)) \
        .sortByKey()
    # This is just a demo on how to bring all the sorted data back to a single node.
    # In reality, we wouldn't want to collect all the data to the driver node.
    output = sortedCount.collect()
    for (num, unitcount) in output:
        print(num)

    spark.stop()

In [None]:
from __future__ import print_function
import sys
from operator import add
# SparkSession：是一个对Spark的编程入口，取代了原本的SQLContext与HiveContext，方便调用Dataset和DataFrame API
# SparkSession可用于创建DataFrame，将DataFrame注册为表，在表上执行SQL，缓存表和读取parquet文件。
from pyspark.sql import SparkSession

if __name__ == "__main__": 
    # appName 为 Spark 应用设定一个应用名，改名会显示在 Spark Web UI 上
    # 假如SparkSession 已经存在就取得已存在的SparkSession，否则创建一个新的。
    spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()
        
    # 读取传入的文件内容，并写入一个新的RDD实例lines中，此条语句所做工作有些多，不适合初学者，可以截成两条语句以便理解。
    # map是一种转换函数，将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。
    # 原始RDD中的数据项与新RDD中的数据项是一一对应的关系。
    lines = spark.read.text('words.txt').rdd.map(lambda r: r[0])
   
    # flatMap与map类似，但每个元素输入项都可以被映射到0个或多个的输出项，最终将结果”扁平化“后输出 
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
                
    # collect() 在驱动程序中将数据集的所有元素作为数组返回。 这在返回足够小的数据子集的过滤器或其他操作之后通常是有用的。
    # 由于collect 是将整个RDD汇聚到一台机子上，所以通常需要预估返回数据集的大小以免溢出。             
    output = counts.collect()
    
    for (word, count) in output:
        print("%s: %i" % (word, count))

    spark.stop()