데이터프레임 기반 머신러닝 API - 사용자가 빠르게 실질적인 머신러닝 파이프라인을 모으고 설정할 수 있다.

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [8]:
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession

In [9]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [10]:
df = spark.createDataFrame([(0.5,)], ["values"])

In [12]:
from pyspark.ml.feature import Binarizer

In [13]:
binarizer = Binarizer(threshold=1.0, inputCol="values", outputCol="features")

In [14]:
binarizer.setThreshold(1.0)

Binarizer_a0c685ab70c4

In [15]:
print(df)

DataFrame[values: double]


In [16]:
binarizer.setInputCol("values")

Binarizer_a0c685ab70c4

In [17]:
binarizer.setOutputCol("features")

Binarizer_a0c685ab70c4

In [18]:
binarizer.transform(df).head().features

0.0

In [19]:
binarizer.setParams(outputCol="freqs").transform(df).head().freqs

0.0

In [20]:
params = {binarizer.threshold: -0.5, binarizer.outputCol: "vector"}
binarizer.transform(df, params).head().vector

1.0

In [22]:
binarizerPath = "/binarizer"
binarizer.save(binarizerPath)
loadedBinarizer = Binarizer.load(binarizerPath)
loadedBinarizer.getThreshold() == binarizer.getThreshold()

df2 = spark.createDataFrame([(0.5, 0.3)], ["values1", "values2"])
binarizer2 = Binarizer(thresholds=[0.0, 1.0])
binarizer2.setInputCols(["values1", "values2"]).setOutputCols(["output1", "output2"])

binarizer2.transform(df2).show()

+-------+-------+-------+-------+
|values1|values2|output1|output2|
+-------+-------+-------+-------+
|    0.5|    0.3|    1.0|    0.0|
+-------+-------+-------+-------+



In [23]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

In [24]:
data = [(0, Vectors.dense([-1.0, -1.0 ]),),
        (1, Vectors.dense([-1.0, 1.0 ]),),
        (2, Vectors.dense([1.0, -1.0 ]),),
        (3, Vectors.dense([1.0, 1.0]),)]

In [41]:
from pyspark.ml.feature import BucketedRandomProjectionLSH,BucketedRandomProjectionLSHModel

In [27]:
df = spark.createDataFrame(data, ["id", "features"])
brp = BucketedRandomProjectionLSH()
brp.setInputCol("features")

BucketedRandomProjectionLSH_14b95c98994e

In [28]:
brp.setOutputCol("hashes")

BucketedRandomProjectionLSH_14b95c98994e

In [29]:
brp.setSeed(12345)

BucketedRandomProjectionLSH_14b95c98994e

In [30]:
brp.setBucketLength(1.0)

BucketedRandomProjectionLSH_14b95c98994e

In [31]:
model = brp.fit(df)
model.getBucketLength()

1.0

In [32]:
model.setOutputCol("hashes")

BucketedRandomProjectionLSHModel: uid=BucketedRandomProjectionLSH_14b95c98994e, numHashTables=1

In [33]:
model.transform(df).head()

Row(id=0, features=DenseVector([-1.0, -1.0]), hashes=[DenseVector([-1.0])])

In [34]:
data2 = [(4, Vectors.dense([2.0, 2.0 ]),),
         (5, Vectors.dense([2.0, 3.0 ]),),
         (6, Vectors.dense([3.0, 2.0 ]),),
         (7, Vectors.dense([3.0, 3.0]),)]

In [35]:
df2 = spark.createDataFrame(data2, ["id", "features"])

In [36]:
model.approxNearestNeighbors(df2, Vectors.dense([1.0, 2.0]), 1).collect()

[Row(id=4, features=DenseVector([2.0, 2.0]), hashes=[DenseVector([1.0])], distCol=1.0)]

In [37]:
model.approxSimilarityJoin(df, df2, 3.0, distCol="EuclideanDistance").select(
    col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show()

+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  3|  6| 2.23606797749979|
+---+---+-----------------+



In [38]:
model.approxSimilarityJoin(df, df2, 3, distCol="EuclideanDistance").select(
    col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show()

+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  3|  6| 2.23606797749979|
+---+---+-----------------+



In [39]:
brpPath = "/brp"
brp.save(brpPath)
brp2 = BucketedRandomProjectionLSH.load(brpPath)
brp2.getBucketLength() == brp.getBucketLength()

True

In [43]:
modelPath = "/brp-model"
model.save(modelPath)
model2 = BucketedRandomProjectionLSHModel.load(modelPath)
model.transform(df).head().hashes == model2.transform(df).head().hashes

True

In [44]:
values = [(0.1, 0.0), (0.4, 1.0), (1.2, 1.3), (1.5, float("nan")), (float("nan"), 1.0), (float("nan"), 0.0)]

In [46]:
from pyspark.ml.feature import Bucketizer

In [47]:
df = spark.createDataFrame(values, ["values1", "values2"])
bucketizer = Bucketizer()
bucketizer.setSplits([-float("inf"), 0.5, 1.4, float("inf")])

Bucketizer_7826d1b3b5c8

In [48]:
bucketizer.setInputCol("values1")

Bucketizer_7826d1b3b5c8

In [49]:
bucketizer.setOutputCol("buckets")

Bucketizer_7826d1b3b5c8

In [50]:
bucketed = bucketizer.setHandleInvalid("keep").transform(df).collect()

In [51]:
bucketed = bucketizer.setHandleInvalid("keep").transform(df.select("values1"))

In [52]:
bucketed.show(truncate=False)

+-------+-------+
|values1|buckets|
+-------+-------+
|0.1    |0.0    |
|0.4    |0.0    |
|1.2    |1.0    |
|1.5    |2.0    |
|NaN    |3.0    |
|NaN    |3.0    |
+-------+-------+



In [71]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import ChiSqSelector, ChiSqSelectorModel

In [58]:
df = spark.createDataFrame(
    [ (Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0),
      (Vectors.dense([0.0, 1.0, 12.0, 1.0]), 0.0),
      (Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0)],
    ["features", "label"])

In [64]:
selector = ChiSqSelector(numTopFeatures=1, outputCol="selectedFeatures")
model = selector.fit(df)
model.getFeaturesCol()

'features'

In [60]:
model.setFeaturesCol("features")

ChiSqSelectorModel: uid=ChiSqSelector_35bfebd399f9, numSelectedFeatures=1

In [62]:
model.transform(df).head().selectedFeatures

DenseVector([18.0])

In [65]:
model.selectedFeatures

[2]

In [66]:
temp_path = ''
chiSqSelectorPath = temp_path + "/chi-sq-selector"
selector.save(chiSqSelectorPath)

In [67]:
loadedSelector = ChiSqSelector.load(chiSqSelectorPath)

In [68]:
loadedSelector.getNumTopFeatures() == selector.getNumTopFeatures()

True

In [69]:
modelPath = temp_path + "/chi-sq-selector-model"

In [73]:
model.save(modelPath)
loadedModel = ChiSqSelectorModel.load(modelPath)
loadedModel.selectedFeatures == model.selectedFeatures

True

In [94]:
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel

In [76]:
df = spark.createDataFrame(
    [(0, ["a", "b", "c"]), (1, ["a", "b", "b", "c", "a"])],
    ["label", "raw"])
cv = CountVectorizer()
cv.setInputCol("raw")

CountVectorizer_b37979506fd7

In [77]:
cv.setOutputCol("vector")

CountVectorizer_b37979506fd7

In [78]:
model = cv.fit(df)
model.setInputCol("raw")

CountVectorizerModel: uid=CountVectorizer_b37979506fd7, vocabularySize=3

In [79]:
model.transform(df).show(truncate=False)

+-----+---------------+-------------------------+
|label|raw            |vector                   |
+-----+---------------+-------------------------+
|0    |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+-----+---------------+-------------------------+



In [84]:
print(model.__doc__)


    Model fitted by :py:class:`CountVectorizer`.

    .. versionadded:: 1.6.0
    


In [80]:
sorted(model.vocabulary) == ['a', 'b', 'c']

True

In [85]:
countVectorizerPath = temp_path + "/count-vectorizer"

In [88]:
cv.save(countVectorizerPath)
loadedCv = CountVectorizer.load(countVectorizerPath)
loadedCv.getMinDF() == cv.getMinDF()

True

In [89]:
loadedCv.getMinTF() == cv.getMinTF()

True

In [92]:
print(loadedCv.getMinTF(), loadedCv.getMinDF(), loadedCv.getVocabSize() )

1.0 1.0 262144


In [96]:
# modelPath = temp_path + "/count-vectorizer-model"
# model.save(modelPath)
loadedModel = CountVectorizerModel.load(modelPath)
loadedModel.vocabulary == model.vocabulary

True

In [97]:
fromVocabModel = CountVectorizerModel.from_vocabulary(["a", "b", "c"],
    inputCol="raw", outputCol="vectors")

In [98]:
fromVocabModel.transform(df).show(truncate=False)

+-----+---------------+-------------------------+
|label|raw            |vectors                  |
+-----+---------------+-------------------------+
|0    |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+-----+---------------+-------------------------+



In [100]:
# from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import DCT
df1 = spark.createDataFrame([(Vectors.dense([5.0, 8.0, 6.0]),)], ["vec"])
dct = DCT( )
dct.setInverse(False)

DCT_48760b4220c2

In [101]:
dct.setInputCol("vec")

DCT_48760b4220c2

In [102]:
dct.setOutputCol("resultVec")

DCT_48760b4220c2

In [103]:
df2 = dct.transform(df1)
df2.head().resultVec

DenseVector([10.9697, -0.7071, -2.0412])

In [104]:
df3 = DCT(inverse=True, inputCol="resultVec", outputCol="origVec").transform(df2)
df3.head().origVec

DenseVector([5.0, 8.0, 6.0])

In [105]:
dctPath = temp_path + "/dct"
dct.save(dctPath)
loadedDtc = DCT.load(dctPath)
loadedDtc.getInverse()

False

In [106]:
from pyspark.ml.feature import FeatureHasher

In [107]:
data = [(2.0, True, "1", "foo"), (3.0, False, "2", "bar")]
cols = ["real", "bool", "stringNum", "string"]
df = spark.createDataFrame(data, cols)
hasher = FeatureHasher()
hasher.setInputCols(cols)

FeatureHasher_3bba49efb095

In [108]:
hasher.setOutputCol("features")

FeatureHasher_3bba49efb095

In [109]:
hasher.transform(df).head().features

SparseVector(262144, {174475: 2.0, 247670: 1.0, 257907: 1.0, 262126: 1.0})

In [110]:
hasher.setCategoricalCols(["real"]).transform(df).head().features

SparseVector(262144, {171257: 1.0, 247670: 1.0, 257907: 1.0, 262126: 1.0})

In [111]:
hasherPath = temp_path + "/hasher"
hasher.save(hasherPath)
loadedHasher = FeatureHasher.load(hasherPath)
loadedHasher.getNumFeatures() == hasher.getNumFeatures()

True

In [112]:
loadedHasher.transform(df).head().features == hasher.transform(df).head().features

True

In [114]:
from pyspark.ml.feature import HashingTF

df = spark.createDataFrame([(["a", "b", "c"],)], ["words"])
hashingTF = HashingTF(inputCol="words", outputCol="features")
hashingTF.setNumFeatures(10)

HashingTF_eeb18e42cf87

In [115]:
hashingTF.transform(df).head().features

SparseVector(10, {5: 1.0, 7: 1.0, 8: 1.0})

In [116]:
hashingTF.setParams(outputCol="freqs").transform(df).head().freqs

SparseVector(10, {5: 1.0, 7: 1.0, 8: 1.0})

In [117]:
params = {hashingTF.numFeatures: 5, hashingTF.outputCol: "vector"}
hashingTF.transform(df, params).head().vector

SparseVector(5, {0: 1.0, 2: 1.0, 3: 1.0})

In [118]:
hashingTFPath = temp_path + "/hashing-tf"
hashingTF.save(hashingTFPath)
loadedHashingTF = HashingTF.load(hashingTFPath)
loadedHashingTF.getNumFeatures() == hashingTF.getNumFeatures()

True

In [119]:
hashingTF.indexOf("b")

5

In [120]:
from pyspark.ml.feature import IDF
from pyspark.ml.linalg import DenseVector

In [121]:
df = spark.createDataFrame([(DenseVector([1.0, 2.0]),),
    (DenseVector([0.0, 1.0]),), (DenseVector([3.0, 0.2]),)], ["tf"])

In [122]:
idf = IDF(minDocFreq=3)
idf.setInputCol("tf")

IDF_a023d6888124

In [124]:
model = idf.fit(df)
model.setOutputCol("idf")

IDFModel: uid=IDF_a023d6888124, numDocs=3, numFeatures=2

In [125]:
model.getMinDocFreq()

3

In [126]:
model.idf

DenseVector([0.0, 0.0])

In [127]:
model.docFreq

[0, 3]

In [128]:
model.numDocs == df.count()

True

In [129]:
model.transform(df).head().idf

DenseVector([0.0, 0.0])

In [130]:
idf.setParams(outputCol="freqs").fit(df).transform(df).collect()[1].freqs

DenseVector([0.0, 0.0])

In [131]:
params = {idf.minDocFreq: 1, idf.outputCol: "vector"}
idf.fit(df, params).transform(df).head().vector

DenseVector([0.2877, 0.0])

In [132]:
idfPath = temp_path + "/idf"
idf.save(idfPath)
loadedIdf = IDF.load(idfPath)
loadedIdf.getMinDocFreq() == idf.getMinDocFreq()

True

In [135]:
from pyspark.ml.feature import IDFModel
modelPath = temp_path + "/idf-model"
model.save(modelPath)
loadedModel = IDFModel.load(modelPath)
loadedModel.transform(df).head().idf == model.transform(df).head().idf

True

In [141]:
from pyspark.ml.feature import MinMaxScaler, MinMaxScalerModel

In [137]:
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(Vectors.dense([0.0]),), (Vectors.dense([2.0]),)], ["a"])
mmScaler = MinMaxScaler(outputCol="scaled")
mmScaler.setInputCol("a")

model = mmScaler.fit(df)
model.setOutputCol("scaledOutput")

model.originalMin

model.originalMax

model.transform(df).show()


+-----+------------+
|    a|scaledOutput|
+-----+------------+
|[0.0]|       [0.0]|
|[2.0]|       [1.0]|
+-----+------------+



In [138]:
minMaxScalerPath = temp_path + "/min-max-scaler"
mmScaler.save(minMaxScalerPath)
loadedMMScaler = MinMaxScaler.load(minMaxScalerPath)
loadedMMScaler.getMin() == mmScaler.getMin()

True

In [139]:
loadedMMScaler.getMax() == mmScaler.getMax()

True

In [142]:
modelPath = temp_path + "/min-max-scaler-model"
model.save(modelPath)
loadedModel = MinMaxScalerModel.load(modelPath)
loadedModel.originalMin == model.originalMin

loadedModel.originalMax == model.originalMax

True

In [143]:
print(loadedMMScaler.getMin(), loadedMMScaler.getMax(), loadedModel.originalMax, loadedModel.originalMin)

0.0 1.0 [2.0] [0.0]


In [146]:
from pyspark.ml.feature import NGram
from pyspark.sql import Row

In [147]:
df = spark.createDataFrame([Row(inputTokens=["a", "b", "c", "d", "e"])])
ngram = NGram(n=2)

In [148]:
ngram.setInputCol("inputTokens")

NGram_291f536f9b63

In [149]:
ngram.setOutputCol("nGrams")

NGram_291f536f9b63

In [150]:
ngram.transform(df).head()

Row(inputTokens=['a', 'b', 'c', 'd', 'e'], nGrams=['a b', 'b c', 'c d', 'd e'])

In [151]:
ngram.setParams(n=4).transform(df).head()

Row(inputTokens=['a', 'b', 'c', 'd', 'e'], nGrams=['a b c d', 'b c d e'])

In [152]:
ngram.transform(df, {ngram.outputCol: "output"}).head()

ngram.transform(df).head()

Row(inputTokens=['a', 'b', 'c', 'd', 'e'], nGrams=['a b c d', 'b c d e'])

In [154]:
ngram.setParams("text")

TypeError: Method setParams forces keyword arguments.

In [155]:
ngramPath = temp_path + "/ngram"
ngram.save(ngramPath)
loadedNGram = NGram.load(ngramPath)
loadedNGram.getN() == ngram.getN()

True

In [158]:
ngram.getN()

4

In [192]:
from pyspark.ml.feature import OneHotEncoder, OneHotEncoderModel

In [160]:
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([(0.0,), (1.0,), (2.0,)], ["input"])
ohe = OneHotEncoder()
ohe.setInputCols(["input"])

OneHotEncoder_fa3989890193

In [185]:
ohe.setOutputCols(["output"])

OneHotEncoder_fa3989890193

In [186]:
model = ohe.fit(df)
model.setOutputCols(["output"])

OneHotEncoderModel: uid=OneHotEncoder_fa3989890193, dropLast=true, handleInvalid=error, numInputCols=1, numOutputCols=1

In [187]:
model.getHandleInvalid()

'error'

In [188]:
model.transform(df).head().output

SparseVector(2, {0: 1.0})

In [189]:
single_col_ohe = OneHotEncoder(inputCol="input", outputCol="output")
single_col_model = single_col_ohe.fit(df)
single_col_model.transform(df).head().output

SparseVector(2, {0: 1.0})

In [190]:
ohePath = temp_path + "/ohe"
ohe.save(ohePath)
loadedOHE = OneHotEncoder.load(ohePath)
loadedOHE.getInputCols() == ohe.getInputCols()

True

In [193]:
modelPath = temp_path + "/ohe-model"
model.save(modelPath)
loadedModel = OneHotEncoderModel.load(modelPath)
loadedModel.categorySizes == model.categorySizes

True

In [194]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

In [195]:
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0 )]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)
       ]

df = spark.createDataFrame(data, ["features"])
pca = PCA(k=2, inputCol = "features")
pca.setOutputCol("pca_feataures")

PCA_5658b4c561e7

In [199]:
model = pca.fit(df)

In [200]:
model.getK()

2

In [202]:
model.setOutputCol("output")

PCAModel: uid=PCA_5658b4c561e7, k=2

In [203]:
model.transform(df).collect()[0].output

DenseVector([1.6486, -4.0133])

In [204]:
model.transform(df).collect()[1].output

DenseVector([-4.6451, -1.1168])

In [205]:
model.transform(df).collect()[2].output

DenseVector([-6.4289, -5.338])

In [207]:
model.explainedVariance

DenseVector([0.7944, 0.2056])

In [208]:
sent = ("a b " * 100 + "a c "* 10).split(" ")
print(sent)

['a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b', 'a', 'b',

In [233]:
from pyspark.ml.feature import Word2Vec, Word2VecModel

doc = spark.createDataFrame([(sent,), (sent,)], ["sentence"])
word2vec = Word2Vec(vectorSize=5, seed=42, inputCol="sentence", outputCol="model")
word2vec.setMaxIter(10)

Word2Vec_f19b4137424e

In [210]:
word2vec.getMaxIter()

10

In [212]:
word2vec.clear(word2vec.maxIter)
model = word2vec.fit(doc)
model.getMinCount()

5

In [213]:
model.setInputCol("sentence")

Word2VecModel: uid=Word2Vec_98c052266adf, numWords=3, vectorSize=5

In [214]:
model.getVectors().show()

+----+--------------------+
|word|              vector|
+----+--------------------+
|   a|[0.09511678665876...|
|   b|[-1.2028766870498...|
|   c|[0.30153277516365...|
+----+--------------------+



In [218]:
model.findSynonymsArray("a", 2)

[('b', 0.015859870240092278), ('c', -0.5680795907974243)]

In [221]:
from pyspark.sql.functions import format_number as fmt
model.findSynonyms('a', 2).select("word", fmt("similarity", 5).alias("similarity")).show()

+----+----------+
|word|similarity|
+----+----------+
|   b|   0.01586|
|   c|  -0.56808|
+----+----------+



In [222]:
model.transform(doc).head().model

DenseVector([-0.4833, 0.1855, -0.273, -0.0509, -0.4769])

In [228]:
# word2vecPath = temp_path + "/word2vec"
# word2vec.save(word2vecPath)
loadedWord2Vec = Word2Vec.load(word2vecPath)
loadedWord2Vec.getVectorSize() == word2vec.getVectorSize()

True

In [230]:
loadedWord2Vec.getNumPartitions() == word2vec.getNumPartitions()

True

In [231]:
loadedWord2Vec.getMinCount() == word2vec.getMinCount()

True

In [235]:
modelPath = temp_path + "/word2vec-model"
model.save(modelPath)
loadedModel = Word2VecModel.load(modelPath)
loadedModel.getVectors().first().word == model.getVectors().first().word

True

In [236]:
loadedModel.getVectors().first().vector == model.getVectors().first().vector

True

In [251]:
from pyspark.ml.clustering import LDA, LocalLDAModel, DistributedLDAModel

In [238]:
from pyspark.ml.linalg import Vector, SparseVector

In [239]:
df = spark.createDataFrame([[1, Vectors.dense([0.0, 1.0])],
     [2, SparseVector(2, {0: 1.0})],], ["id", "features"])

In [240]:
lda = LDA(k=2, seed=1, optimizer="em")
lda.setMaxIter(10)

LDA_fc24ba9c5c3a

In [241]:
lda.getMaxIter()

10

In [242]:
lda.clear(lda.maxIter)
model = lda.fit(df)
model.setSeed(1)

DistributedLDAModel: uid=LDA_fc24ba9c5c3a, k=2, numFeatures=2

In [243]:
model.getTopicDistributionCol()

'topicDistribution'

In [244]:
model.isDistributed()

True

In [245]:
localModel = model.toLocal()
localModel.isDistributed()

False

In [246]:
model.vocabSize()

2

In [247]:
model.describeTopics().show()

+-----+-----------+--------------------+
|topic|termIndices|         termWeights|
+-----+-----------+--------------------+
|    0|     [1, 0]|[0.50530243938871...|
|    1|     [0, 1]|[0.50530243938871...|
+-----+-----------+--------------------+



In [248]:
model.topicsMatrix()

DenseMatrix(2, 2, [0.4947, 0.5053, 0.5053, 0.4947], 0)

In [254]:
lda_path = temp_path + "/lda"
lda.save(lda_path)
sameLDA = LDA.load(lda_path)
distributed_model_path = temp_path + "/lda_distributed_model"
model.save(distributed_model_path)
sameModel = DistributedLDAModel.load(distributed_model_path)
local_model_path = temp_path + "/lda_local_model"
localModel.save(local_model_path)
sameLocalModel = LocalLDAModel.load(local_model_path)

In [284]:
from pyspark.ml.recommendation import ALS, ALSModel

In [256]:
df = spark.createDataFrame(
    [(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
    ["user", "item", "rating"])

In [257]:
als = ALS(rank=10, seed=0)

In [258]:
als.setMaxIter(5)

ALS_2e832d6867de

In [259]:
als.getMaxIter()

5

In [260]:
als.setRegParam(0.1)

ALS_2e832d6867de

In [261]:
als.getRegParam()

0.1

In [263]:
als.clear(als.regParam)
model = als.fit(df)
model.getBlockSize()

4096

In [264]:
model.getUserCol()

'user'

In [265]:
model.setUserCol("user")

ALSModel: uid=ALS_2e832d6867de, rank=10

In [266]:
model.getItemCol()

'item'

In [267]:
model.setPredictionCol("newPrediction")

ALSModel: uid=ALS_2e832d6867de, rank=10

In [268]:
model.rank

10

In [269]:
model.userFactors.orderBy("id").collect()

[Row(id=0, features=[-0.3651984930038452, 0.3723710775375366, -0.06104307249188423, 0.8052674531936646, -0.797479510307312, -0.06917982548475266, 0.0801284983754158, -0.09667783230543137, 0.1752723604440689, 0.4072113633155823]),
 Row(id=1, features=[-0.221788227558136, 0.5157026052474976, 0.20773544907569885, 0.9807761907577515, -0.38315072655677795, 0.3542327582836151, 0.18112820386886597, -0.1403946578502655, 0.8413816690444946, 0.20572681725025177]),
 Row(id=2, features=[0.27025529742240906, 0.08384223282337189, 0.5856322646141052, 0.12703897058963776, 0.5216090679168701, 0.6421923637390137, 0.17314176261425018, 0.08272238075733185, 0.990022599697113, -0.2700589895248413])]

In [273]:
test = spark.createDataFrame([(0, 2), (1, 0), (2, 0)], ["user", "item"])
predictions = sorted(model.transform(test).collect(), key=lambda r: r[0])
predictions[0]

Row(user=0, item=2, newPrediction=0.6929101347923279)

In [274]:
predictions[1]

Row(user=1, item=0, newPrediction=3.47356915473938)

In [275]:
predictions[2]

Row(user=2, item=0, newPrediction=-0.8991986513137817)

In [276]:
user_recs = model.recommendForAllUsers(3)

In [277]:
user_recs.where(user_recs.user == 0).select("recommendations.item", "recommendations.rating").collect()

[Row(item=[0, 1, 2], rating=[3.9102137088775635, 1.9978108406066895, 0.6929101347923279])]

In [279]:
item_recs = model.recommendForAllItems(3)
item_recs.where(item_recs.item == 2).select("recommendations.user", "recommendations.rating").collect()

[Row(user=[2, 1, 0], rating=[4.892695903778076, 3.9912655353546143, 0.6929101347923279])]

In [280]:
user_subset = df.where(df.user == 2)
user_subset_recs = model.recommendForUserSubset(user_subset, 3)
user_subset_recs.select("recommendations.item", "recommendations.rating").first()

Row(item=[2, 1, 0], rating=[4.892695903778076, 1.0764491558074951, -0.8991986513137817])

In [281]:
item_subset = df.where(df.item == 0)
item_subset_recs = model.recommendForItemSubset(item_subset, 3)
item_subset_recs.select("recommendations.user", "recommendations.rating").first()

Row(user=[0, 1, 2], rating=[3.9102137088775635, 3.47356915473938, -0.8991986513137817])

In [282]:
als_path = temp_path + "/als"
als.save(als_path)
als2 = ALS.load(als_path)
als.getMaxIter()

5

In [286]:
# model_path = temp_path + "/als_model"
# model.save(model_path)
model2 = ALSModel.load(model_path)
model.rank == model2.rank

True

In [287]:
sorted(model.userFactors.collect()) == sorted(model2.userFactors.collect())

True

In [288]:
sorted(model.itemFactors.collect()) == sorted(model2.itemFactors.collect())

True

In [292]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [293]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import TrainValidationSplitModel
import tempfile
dataset = spark.createDataFrame(
    [(Vectors.dense([0.0]), 0.0),
     (Vectors.dense([0.4]), 1.0),
     (Vectors.dense([0.5]), 0.0),
     (Vectors.dense([0.6]), 1.0),
     (Vectors.dense([1.0]), 1.0)] * 10,
    ["features", "label"]).repartition(1)

In [294]:
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
    parallelism=1, seed=42)
tvsModel = tvs.fit(dataset)
tvsModel.getTrainRatio()

0.75

In [295]:
tvsModel.validationMetrics

[0.5, 0.8857142857142857]

In [296]:
path = tempfile.mkdtemp()
model_path = path + "/model"
tvsModel.write().save(model_path)
tvsModelRead = TrainValidationSplitModel.read().load(model_path)
tvsModelRead.validationMetrics

[0.5, 0.8857142857142857]

In [297]:
evaluator.evaluate(tvsModel.transform(dataset))

0.8333333333333333