In [1]:
import findspark
findspark.init()

# test if pyspark is working. Takes a minute to finish.
import pyspark
import random

sc = pyspark.SparkContext(appName="Pi")
num_samples = 10000
def inside(p):     
  x, y = random.random(), random.random()
  return x*x + y*y < 1
count = sc.parallelize(range(0, num_samples)).filter(inside).count()
pi = 4 * count / num_samples
print(pi)
sc.stop()

3.1216


In [5]:
import numpy as np
import pandas as pd

In [6]:
# create a SparkSession.
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
spark

In [29]:
df = spark.read.format('csv') \
          .option('header', 'true') \
          .option('delimiter', ',') \
          .option('inferschema', 'true') \
          .load('s3://justinngbucket/Data/movie_lens/ratings.csv')

In [8]:
df.dtypes

[('userId', 'int'),
 ('movieId', 'int'),
 ('rating', 'double'),
 ('timestamp', 'int')]

In [30]:
df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
|     1|    112|   3.5|1094785740|
|     1|    151|   4.0|1094785734|
|     1|    223|   4.0|1112485573|
|     1|    253|   4.0|1112484940|
|     1|    260|   4.0|1112484826|
|     1|    293|   4.0|1112484703|
|     1|    296|   4.0|1112484767|
|     1|    318|   4.0|1112484798|
|     1|    337|   3.5|1094785709|
|     1|    367|   3.5|1112485980|
|     1|    541|   4.0|1112484603|
|     1|    589|   3.5|1112485557|
|     1|    593|   3.5|1112484661|
|     1|    653|   3.0|1094785691|
|     1|    919|   3.5|1094785621|
+------+-------+------+----------+
only showing top 20 rows



In [16]:
df.columns

['userId', 'movieId', 'rating', 'timestamp']

In [17]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [18]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])
documentDF.show()

+--------------------+
|                text|
+--------------------+
|[Hi, I, heard, ab...|
|[I, wish, Java, c...|
|[Logistic, regres...|
+--------------------+



In [19]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)

result.show()

+--------------------+--------------------+
|                text|              result|
+--------------------+--------------------+
|[Hi, I, heard, ab...|[0.04915456660091...|
|[I, wish, Java, c...|[-0.0332094979073...|
|[Logistic, regres...|[0.02375314719974...|
+--------------------+--------------------+



In [20]:
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.04915456660091877,-0.022600096464157105,0.010528827086091042]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [-0.03320949790733201,-0.015708756932456578,-0.05784856740917478]

Text: [Logistic, regression, models, are, neat] => 
Vector: [0.023753147199749948,0.016884601768106224,0.0050010856240987785]



In [21]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)

+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+



In [22]:
df.show()

+--------------------+
|            features|
+--------------------+
| (5,[1,3],[1.0,7.0])|
|[2.0,0.0,3.0,4.0,...|
|[4.0,0.0,0.0,6.0,...|
+--------------------+



In [23]:
model.explainedVariance 

DenseVector([0.7944, 0.2056, 0.0])

In [24]:
model

PCA_4cebb46f4c97c1239d35

In [25]:
type(model)

pyspark.ml.feature.PCAModel

In [28]:
result.show(truncate=False)

+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+



In [33]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['userId', 'movieId', 'rating', 'timestamp'],
    outputCol="features")

output = assembler.transform(df)
output.show()

+------+-------+------+----------+--------------------+
|userId|movieId|rating| timestamp|            features|
+------+-------+------+----------+--------------------+
|     1|      2|   3.5|1112486027|[1.0,2.0,3.5,1.11...|
|     1|     29|   3.5|1112484676|[1.0,29.0,3.5,1.1...|
|     1|     32|   3.5|1112484819|[1.0,32.0,3.5,1.1...|
|     1|     47|   3.5|1112484727|[1.0,47.0,3.5,1.1...|
|     1|     50|   3.5|1112484580|[1.0,50.0,3.5,1.1...|
|     1|    112|   3.5|1094785740|[1.0,112.0,3.5,1....|
|     1|    151|   4.0|1094785734|[1.0,151.0,4.0,1....|
|     1|    223|   4.0|1112485573|[1.0,223.0,4.0,1....|
|     1|    253|   4.0|1112484940|[1.0,253.0,4.0,1....|
|     1|    260|   4.0|1112484826|[1.0,260.0,4.0,1....|
|     1|    293|   4.0|1112484703|[1.0,293.0,4.0,1....|
|     1|    296|   4.0|1112484767|[1.0,296.0,4.0,1....|
|     1|    318|   4.0|1112484798|[1.0,318.0,4.0,1....|
|     1|    337|   3.5|1094785709|[1.0,337.0,3.5,1....|
|     1|    367|   3.5|1112485980|[1.0,367.0,3.5

In [36]:
from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=3.5, inputCol="rating", outputCol="binarized_rating")
bin_df = binarizer.transform(df)
bin_df.show()

+------+-------+------+----------+----------------+
|userId|movieId|rating| timestamp|binarized_rating|
+------+-------+------+----------+----------------+
|     1|      2|   3.5|1112486027|             0.0|
|     1|     29|   3.5|1112484676|             0.0|
|     1|     32|   3.5|1112484819|             0.0|
|     1|     47|   3.5|1112484727|             0.0|
|     1|     50|   3.5|1112484580|             0.0|
|     1|    112|   3.5|1094785740|             0.0|
|     1|    151|   4.0|1094785734|             1.0|
|     1|    223|   4.0|1112485573|             1.0|
|     1|    253|   4.0|1112484940|             1.0|
|     1|    260|   4.0|1112484826|             1.0|
|     1|    293|   4.0|1112484703|             1.0|
|     1|    296|   4.0|1112484767|             1.0|
|     1|    318|   4.0|1112484798|             1.0|
|     1|    337|   3.5|1094785709|             0.0|
|     1|    367|   3.5|1112485980|             0.0|
|     1|    541|   4.0|1112484603|             1.0|
|     1|    

In [38]:
output.show()

+------+-------+------+----------+--------------------+
|userId|movieId|rating| timestamp|            features|
+------+-------+------+----------+--------------------+
|     1|      2|   3.5|1112486027|[1.0,2.0,3.5,1.11...|
|     1|     29|   3.5|1112484676|[1.0,29.0,3.5,1.1...|
|     1|     32|   3.5|1112484819|[1.0,32.0,3.5,1.1...|
|     1|     47|   3.5|1112484727|[1.0,47.0,3.5,1.1...|
|     1|     50|   3.5|1112484580|[1.0,50.0,3.5,1.1...|
|     1|    112|   3.5|1094785740|[1.0,112.0,3.5,1....|
|     1|    151|   4.0|1094785734|[1.0,151.0,4.0,1....|
|     1|    223|   4.0|1112485573|[1.0,223.0,4.0,1....|
|     1|    253|   4.0|1112484940|[1.0,253.0,4.0,1....|
|     1|    260|   4.0|1112484826|[1.0,260.0,4.0,1....|
|     1|    293|   4.0|1112484703|[1.0,293.0,4.0,1....|
|     1|    296|   4.0|1112484767|[1.0,296.0,4.0,1....|
|     1|    318|   4.0|1112484798|[1.0,318.0,4.0,1....|
|     1|    337|   3.5|1094785709|[1.0,337.0,3.5,1....|
|     1|    367|   3.5|1112485980|[1.0,367.0,3.5

In [42]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

polyExpansion = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures")
polyExpansion.transform(output).select(['features', 'polyFeatures']).show(truncate=False)

+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------+
|features                     |polyFeatures                                                                                                                           |
+-----------------------------+---------------------------------------------------------------------------------------------------------------------------------------+
|[1.0,2.0,3.5,1.112486027E9]  |[1.0,1.0,2.0,2.0,4.0,3.5,3.5,7.0,12.25,1.112486027E9,1.112486027E9,2.224972054E9,3.8937010945E9,1.23762516027024461E18]                |
|[1.0,29.0,3.5,1.112484676E9] |[1.0,1.0,29.0,29.0,841.0,3.5,3.5,101.5,12.25,1.112484676E9,1.112484676E9,3.2262055604E10,3.893696366E9,1.23762215433482496E18]         |
|[1.0,32.0,3.5,1.112484819E9] |[1.0,1.0,32.0,32.0,1024.0,3.5,3.5,112.0,12.25,1.112484819E9,1.112484819E9,3.5599514208E10,3.8936968665E9,1.23762247250546278E18] 

In [46]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df2 = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df2)
indexed = model.transform(df2)

# inputCol must be numeric type.
encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()


+---+--------+-------------+-------------+
| id|category|categoryIndex|  categoryVec|
+---+--------+-------------+-------------+
|  0|       a|          0.0|(2,[0],[1.0])|
|  1|       b|          2.0|    (2,[],[])|
|  2|       c|          1.0|(2,[1],[1.0])|
|  3|       a|          0.0|(2,[0],[1.0])|
|  4|       a|          0.0|(2,[0],[1.0])|
|  5|       c|          1.0|(2,[1],[1.0])|
+---+--------+-------------+-------------+



In [44]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['categoryIndex', 'categoryVec'],
    outputCol="features")

encoded2 = assembler.transform(encoded)
encoded2.show()

+---+--------+-------------+-------------+-------------+
| id|category|categoryIndex|  categoryVec|     features|
+---+--------+-------------+-------------+-------------+
|  0|       a|          0.0|(2,[0],[1.0])|[0.0,1.0,0.0]|
|  1|       b|          2.0|    (2,[],[])|[2.0,0.0,0.0]|
|  2|       c|          1.0|(2,[1],[1.0])|[1.0,0.0,1.0]|
|  3|       a|          0.0|(2,[0],[1.0])|[0.0,1.0,0.0]|
|  4|       a|          0.0|(2,[0],[1.0])|[0.0,1.0,0.0]|
|  5|       c|          1.0|(2,[1],[1.0])|[1.0,0.0,1.0]|
+---+--------+-------------+-------------+-------------+



In [47]:
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()

Bucketizer output with 4 buckets
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
|  -999.9|             0.0|
|    -0.5|             1.0|
|    -0.3|             1.0|
|     0.0|             2.0|
|     0.2|             2.0|
|   999.9|             3.0|
+--------+----------------+



In [51]:
from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (0, 2.0, 4.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__ where id = 0")
sqlTrans.transform(df).show()

+---+---+---+---+---+
| id| v1| v2| v3| v4|
+---+---+---+---+---+
|  0|1.0|3.0|4.0|3.0|
|  0|2.0|4.0|6.0|8.0|
+---+---+---+---+---+



In [52]:
sqlTrans = SQLTransformer(
    statement="SELECT id, sum(v1) as sum_v1 FROM __THIS__ group by id")
sqlTrans.transform(df).show()

+---+------+
| id|sum_v1|
+---+------+
|  0|   3.0|
|  2|   2.0|
+---+------+



In [53]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()


The hashed dataset where hashed values are stored in the column 'hashes':
+---+-----------+--------------------+
| id|   features|              hashes|
+---+-----------+--------------------+
|  0|  [1.0,1.0]|[[0.0], [0.0], [-...|
|  1| [1.0,-1.0]|[[0.0], [-1.0], [...|
|  2|[-1.0,-1.0]|[[-1.0], [-1.0], ...|
|  3| [-1.0,1.0]|[[-1.0], [0.0], [...|
+---+-----------+--------------------+



In [54]:
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()


Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:
+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  0|  6|              1.0|
|  2|  5|              1.0|
|  2|  7|              1.0|
|  3|  6|              1.0|
|  0|  4|              1.0|
|  3|  5|              1.0|
|  1|  4|              1.0|
|  1|  7|              1.0|
+---+---+-----------------+



In [55]:
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()

Approximately searching dfA for 2 nearest neighbors of the key:
+---+----------+--------------------+-------+
| id|  features|              hashes|distCol|
+---+----------+--------------------+-------+
|  0| [1.0,1.0]|[[0.0], [0.0], [-...|    1.0|
|  1|[1.0,-1.0]|[[0.0], [-1.0], [...|    1.0|
+---+----------+--------------------+-------+

