In [1]:
from pyspark.ml.feature import Binarizer

continuousDataFrame = sqlContext.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["label", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
binarizedFeatures = binarizedDataFrame.select("binarized_feature")
for binarized_feature, in binarizedFeatures.collect():
    print(binarized_feature)

0.0
1.0
0.0


In [2]:
from pyspark.ml.feature import PCA
from pyspark.mllib.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)

+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+



In [3]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.mllib.linalg import Vectors

df = sqlContext\
    .createDataFrame([(Vectors.dense([-2.0, 2.3]),),
                      (Vectors.dense([0.0, 0.0]),),
                      (Vectors.dense([0.6, -1.1]),)],
                     ["features"])
px = PolynomialExpansion(degree=2, inputCol="features", outputCol="polyFeatures")
polyDF = px.transform(df)
for expanded in polyDF.select("polyFeatures").take(3):
    print(expanded)

Row(polyFeatures=DenseVector([-2.0, 4.0, 2.3, -4.6, 5.29]))
Row(polyFeatures=DenseVector([0.0, 0.0, 0.0, 0.0, 0.0]))
Row(polyFeatures=DenseVector([0.6, 0.36, -1.1, -0.66, 1.21]))


In [4]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = sqlContext.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()

+---+-------------+
| id|  categoryVec|
+---+-------------+
|  0|(3,[0],[1.0])|
|  1|(3,[2],[1.0])|
|  2|(3,[1],[1.0])|
|  3|(3,[0],[1.0])|
|  4|(3,[0],[1.0])|
|  5|(3,[1],[1.0])|
+---+-------------+



In [15]:
print os.environ['SPARK_HOME']

/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0


In [25]:
print os.listdir("/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/data/mllib")

['als', 'gmm_data.txt', 'kmeans_data.txt', 'lr-data', 'lr_data.txt', 'pagerank_data.txt', 'pic_data.txt', 'ridge-data', 'sample_binary_classification_data.txt', 'sample_fpgrowth.txt', 'sample_isotonic_regression_data.txt', 'sample_lda_data.txt', 'sample_libsvm_data.txt', 'sample_linear_regression_data.txt', 'sample_movielens_data.txt', 'sample_multiclass_classification_data.txt', 'sample_naive_bayes_data.txt', 'sample_svm_data.txt', 'sample_tree_data.csv']


In [26]:
from pyspark.ml.feature import Normalizer

dataFrame = sqlContext.read.format("libsvm").load("/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/data/mllib/sample_libsvm_data.txt")

In [27]:
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
l1NormData.show()

+-----+--------------------+--------------------+
|label|            features|        normFeatures|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|


In [28]:
from pyspark.ml.feature import StandardScaler

dataFrame = sqlContext.read.format("libsvm").load("/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()

+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|


In [30]:
from pyspark.ml.feature import MinMaxScaler

dataFrame = sqlContext.read.format("libsvm").load("/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/data/mllib/sample_libsvm_data.txt")

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
scaledData.show()

+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|[0.5,0.5,0.5,0.5,...|
|  1.0|(692,[158,159,160...|[0.5,0.5,0.5,0.5,...|
|  1.0|(692,[124,125,126...|[0.5,0.5,0.5,0.5,...|
|  1.0|(692,[152,153,154...|[0.5,0.5,0.5,0.5,...|
|  1.0|(692,[151,152,153...|[0.5,0.5,0.5,0.5,...|
|  0.0|(692,[129,130,131...|[0.5,0.5,0.5,0.5,...|
|  1.0|(692,[158,159,160...|[0.5,0.5,0.5,0.5,...|
|  1.0|(692,[99,100,101,...|[0.5,0.5,0.5,0.5,...|
|  0.0|(692,[154,155,156...|[0.5,0.5,0.5,0.5,...|
|  0.0|(692,[127,128,129...|[0.5,0.5,0.5,0.5,...|
|  1.0|(692,[154,155,156...|[0.5,0.5,0.5,0.5,...|
|  0.0|(692,[153,154,155...|[0.5,0.5,0.5,0.5,...|
|  0.0|(692,[151,152,153...|[0.5,0.5,0.5,0.5,...|
|  1.0|(692,[129,130,131...|[0.5,0.5,0.5,0.5,...|
|  0.0|(692,[154,155,156...|[0.5,0.5,0.5,0.5,...|
|  1.0|(692,[150,151,152...|[0.5,0.5,0.5,0.5,...|
|  0.0|(692,[124,125,126...|[0.5,0.5,0.5,0.5,...|


In [31]:
from pyspark.ml.feature import SQLTransformer

df = sqlContext.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()

+---+---+---+---+----+
| id| v1| v2| v3|  v4|
+---+---+---+---+----+
|  0|1.0|3.0|4.0| 3.0|
|  2|2.0|5.0|7.0|10.0|
+---+---+---+---+----+



In [33]:
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-0.5,), (-0.3,), (0.0,), (0.2,)]
dataFrame = sqlContext.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()

+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
|    -0.5|             1.0|
|    -0.3|             1.0|
|     0.0|             2.0|
|     0.2|             2.0|
+--------+----------------+



In [34]:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors

data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = sqlContext.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
transformer.transform(df).show()

+-------------+-----------------+
|       vector|transformedVector|
+-------------+-----------------+
|[1.0,2.0,3.0]|    [0.0,2.0,6.0]|
|[4.0,5.0,6.0]|   [0.0,5.0,12.0]|
+-------------+-----------------+



In [35]:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = sqlContext.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")
output = assembler.transform(dataset)
print(output.select("features", "clicked").first())

Row(features=DenseVector([18.0, 1.0, 0.0, 10.0, 0.5]), clicked=1.0)


In [38]:
from pyspark.ml.feature import RFormula

dataset = sqlContext.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])
formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()

+--------------+-----+
|      features|label|
+--------------+-----+
|[0.0,0.0,18.0]|  1.0|
|[1.0,0.0,12.0]|  0.0|
|[0.0,1.0,15.0]|  0.0|
+--------------+-----+

