# intorducing ML package of pyspark

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master('local') \
        .appName('test') \
        .config('spark.executor.memory', '1gb') \
        .getOrCreate()

# correlation and Hypothesis testing

In [6]:
from pyspark.ml.linalg import Vectors
Vectors.sparse(4, [(0, 1.0), (3,-2)])

SparseVector(4, {0: 1.0, 3: -2.0})

In [7]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation

data = [
    (Vectors.sparse(4, [(0, 1.0), (3,-2)]),),
    (Vectors.dense([4.0, 5.0, 0.0, 3.0]),),
    (Vectors.dense([6.0, 7.0, 0.0, 8.0]), ),
    (Vectors.sparse(4, [(0, 9.0), (3, 1.0)]),)
]

df = spark.createDataFrame(data, ["features"])


In [17]:
r1 = Correlation.corr(df, "features").head()
print(str(r1[0]))
r2 = Correlation.corr(df, "features", "spearman").head()
print(str(r2[0]))

DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])
DenseMatrix([[1.        , 0.10540926,        nan, 0.4       ],
             [0.10540926, 1.        ,        nan, 0.9486833 ],
             [       nan,        nan, 1.        ,        nan],
             [0.4       , 0.9486833 ,        nan, 1.        ]])


In [25]:
dataset = [[Vectors.dense([1, 0, 0, -2])],
           [Vectors.dense([4, 5, 0, 3])],
           [Vectors.dense([6, 7, 0, 8])],
           [Vectors.dense([9, 0, 0, 1])]]

df = spark.createDataFrame(dataset, ["features"])

In [27]:
print(Correlation.corr(df, "features").head()[0])

DenseMatrix([[1.        , 0.05564149,        nan, 0.40047142],
             [0.05564149, 1.        ,        nan, 0.91359586],
             [       nan,        nan, 1.        ,        nan],
             [0.40047142, 0.91359586,        nan, 1.        ]])


In [32]:
print(str(Correlation.corr(df, "features").collect()[0][0]).replace("nan", "NAN"))

DenseMatrix([[1.        , 0.05564149,        NAN, 0.40047142],
             [0.05564149, 1.        ,        NAN, 0.91359586],
             [       NAN,        NAN, 1.        ,        NAN],
             [0.40047142, 0.91359586,        NAN, 1.        ]])


In [37]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest

data = [(0.0, Vectors.dense(0.5, 10.0)),
        (0.0, Vectors.dense(1.5, 20.0)),
        (1.0, Vectors.dense(1.5, 30.0)),
        (0.0, Vectors.dense(3.5, 30.0)),
        (0.0, Vectors.dense(3.5, 40.0)),
        (1.0, Vectors.dense(3.5, 40.0))]

df = spark.createDataFrame(data, ["label", "features"])

chi = ChiSquareTest.test(df, "features", "label").head()
print(chi.pValues)
print(chi.degreesOfFreedom)
print(chi.statistics)

[0.6872892787909721,0.6822703303362126]
[2, 3]
[0.75,1.5]


In [39]:
chi

Row(pValues=DenseVector([0.6873, 0.6823]), degreesOfFreedom=[2, 3], statistics=DenseVector([0.75, 1.5]))

## pipelines

* DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.

* Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

* Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

* Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

* Parameter: All Transformers and Estimators now share a common API for specifying parameters.

In [46]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

lr = LogisticRegression(maxIter=10, regParam=0.01)
print(lr.explainParams())

model1 = lr.fit(training)
print(model1.extractParamMap())

paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter]= 30
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})

paramMap2 = {lr.probabilityCol: "myProbability"}  # Change output column name
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

model2 = lr.fit(training, paramMapCombined)

print("="*40)
print(model2.extractParamMap())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bounds vector size must beequal wi

In [47]:
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))

features=[-1.0,1.5,1.3], label=1.0 -> prob=[0.057073041710340625,0.9429269582896593], prediction=1.0
features=[3.0,2.0,-0.1], label=0.0 -> prob=[0.9238522311704118,0.07614776882958811], prediction=0.0
features=[0.0,2.2,-1.5], label=1.0 -> prob=[0.10972776114779748,0.8902722388522026], prediction=1.0


In [55]:
lr.maxIter

Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='maxIter', doc='max number of iterations (>= 0).')

In [56]:
paramMap

{Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='maxIter', doc='max number of iterations (>= 0).'): 30,
 Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
 Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='threshold', doc='Threshold in binary classification prediction, in range [0, 1]. If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p].'): 0.55}

In [57]:
model1.extractParamMap()

{Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2)'): 2,
 Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty'): 0.0,
 Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial.'): 'auto',
 Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='featuresCol', doc='features column name'): 'features',
 Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='fitIntercept', doc='whether to fit an intercept term'): True,
 Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name='labelCol', doc='label column name'): 'label',
 Param(parent='LogisticRegression_47b7a5b13b7b6562fe4b', name

In [50]:
result

[Row(features=DenseVector([-1.0, 1.5, 1.3]), label=1.0, myProbability=DenseVector([0.0571, 0.9429]), prediction=1.0),
 Row(features=DenseVector([3.0, 2.0, -0.1]), label=0.0, myProbability=DenseVector([0.9239, 0.0761]), prediction=0.0),
 Row(features=DenseVector([0.0, 2.2, -1.5]), label=1.0, myProbability=DenseVector([0.1097, 0.8903]), prediction=1.0)]

In [59]:
prediction.collect()

[Row(label=1.0, features=DenseVector([-1.0, 1.5, 1.3]), rawPrediction=DenseVector([-2.8047, 2.8047]), myProbability=DenseVector([0.0571, 0.9429]), prediction=1.0),
 Row(label=0.0, features=DenseVector([3.0, 2.0, -0.1]), rawPrediction=DenseVector([2.4959, -2.4959]), myProbability=DenseVector([0.9239, 0.0761]), prediction=0.0),
 Row(label=1.0, features=DenseVector([0.0, 2.2, -1.5]), rawPrediction=DenseVector([-2.0935, 2.0935]), myProbability=DenseVector([0.1097, 0.8903]), prediction=1.0)]

In [77]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])


tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")


In [78]:
selected.show()

+---+------------------+--------------------+----------+
| id|              text|         probability|prediction|
+---+------------------+--------------------+----------+
|  4|       spark i j k|[0.15964077387874...|       1.0|
|  5|             l m n|[0.83783256854767...|       0.0|
|  6|spark hadoop spark|[0.06926633132976...|       1.0|
|  7|     apache hadoop|[0.98215753334442...|       0.0|
+---+------------------+--------------------+----------+



In [79]:
for row in selected.collect():
    rid, text, prob, pred = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, prob, pred))

(4, spark i j k) --> prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000
(5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000
(7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000


# extracting, transforming and selecting features

* Extraction: extracting features fro "row" data
* Transformation: scaling, converting, or modifying features
* Selection: selecting a subset from a larger set of features
* Locality sensitive hashing(LSH): This class of algorithms combines aspects of feature transformation with other algorithms.

## feature extractors

In [99]:
# TF-IDF
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

# Tokenize A tokenizer that converts the input string to lowercase and then splits it by white spaces.
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rowFeatures", numFeatures=20)
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol="features")
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])
model = pipeline.fit(sentenceData)

rescaledData = model.transform(sentenceData)

In [100]:
rescaledData.show()

+-----+--------------------+--------------------+--------------------+--------------------+
|label|            sentence|               words|         rowFeatures|            features|
+-----+--------------------+--------------------+--------------------+--------------------+
|  0.0|Hi I heard about ...|[hi, i, heard, ab...|(20,[0,5,9,17],[1...|(20,[0,5,9,17],[0...|
|  0.0|I wish Java could...|[i, wish, java, c...|(20,[2,7,9,13,15]...|(20,[2,7,9,13,15]...|
|  1.0|Logistic regressi...|[logistic, regres...|(20,[4,6,13,15,18...|(20,[4,6,13,15,18...|
+-----+--------------------+--------------------+--------------------+--------------------+



In [86]:
df = spark.createDataFrame([("a b C",)], ["text"])

tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenizer.transform(df).head()

Row(text='a b C', words=['a', 'b', 'c'])

In [88]:
# Word2Vec
from pyspark.ml.feature import Word2Vec, Tokenizer
from pyspark.ml import Pipeline

documentDF = spark.createDataFrame([
    ("Hi I heard about Spark", ),
    ("I wish Java could use case classes", ),
    ("Logistic regression models are neat", )
], ["text"])

tokenizer = Tokenizer(inputCol="text", outputCol="words")
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol=tokenizer.getOutputCol(), outputCol="features")
pipeline = Pipeline(stages=[tokenizer, word2Vec])

model = pipeline.fit(documentDF)

features = model.transform(documentDF)
features.show()

+--------------------+--------------------+--------------------+
|                text|               words|            features|
+--------------------+--------------------+--------------------+
|Hi I heard about ...|[hi, i, heard, ab...|[-0.0494803907349...|
|I wish Java could...|[i, wish, java, c...|[-0.0332627515308...|
|Logistic regressi...|[logistic, regres...|[-0.0120176910422...|
+--------------------+--------------------+--------------------+



In [96]:
for row in  features.select("features").collect():
    fe = row[0]
    print(fe[0], fe[1], fe[2])

-0.04948039073497057 0.025853979960083963 -0.019018191751092674
-0.033262751530855894 -0.012066990669284548 -0.03971555829048157
-0.012017691042274237 0.015785007923841476 0.009128139819949865


In [111]:
# CountVectorizer

from pyspark.ml.feature import CountVectorizer

df = spark.createDataFrame([
    (0, "a b c a".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

cv = CountVectorizer(inputCol="words", outputCol="featues", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)



+---+---------------+-------------------------+
|id |words          |featues                  |
+---+---------------+-------------------------+
|0  |[a, b, c, a]   |(3,[0,1,2],[2.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



In [105]:
df.show()

+---+---------------+
| id|          words|
+---+---------------+
|  0|      [a, b, c]|
|  1|[a, b, b, c, a]|
+---+---------------+



In [112]:
# FeatureHasher
from pyspark.ml.feature import FeatureHasher

dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")

featurized = hasher.transform(dataset)
featurized.show(truncate=False)

+----+-----+---------+------+--------------------------------------------------------+
|real|bool |stringNum|string|features                                                |
+----+-----+---------+------+--------------------------------------------------------+
|2.2 |true |1        |foo   |(262144,[174475,247670,257907,262126],[2.2,1.0,1.0,1.0])|
|3.3 |false|2        |bar   |(262144,[70644,89673,173866,174475],[1.0,1.0,1.0,3.3])  |
|4.4 |false|3        |baz   |(262144,[22406,70644,174475,187923],[1.0,1.0,4.4,1.0])  |
|5.5 |false|4        |foo   |(262144,[70644,101499,174475,257907],[1.0,1.0,5.5,1.0]) |
+----+-----+---------+------+--------------------------------------------------------+



In [113]:
# Tokenizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\w")

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
    
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+------------------+------+
|sentence                           |words             |tokens|
+-----------------------------------+------------------+------+
|Hi I heard about Spark             |[ ,  ,  ,  ]      |4     |
|I wish Java could use case classes |[ ,  ,  ,  ,  ,  ]|6     |
|Logistic,regression,models,are,neat|[,, ,, ,, ,]      |4     |

In [116]:
tokenized.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|
|  1|I wish Java could...|[i, wish, java, c...|
|  2|Logistic,regressi...|[logistic,regress...|
+---+--------------------+--------------------+



In [118]:
# StopWordsRemover
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



In [123]:
# NGram

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "heard", "about", "Spark"]),
    (1, ["I", "wish", "java", "could", "use", "caes", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi heard, heard about, about Spark]                              |
|[I wish, wish java, java could, could use, use caes, caes classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



In [124]:
# Binarizer 
from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()

Binarizer output with Threshold = 0.500000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
+---+-------+-----------------+



In [125]:
# PCA
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)

+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+



In [127]:
# PolynomialExpansion
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)

+----------+------------------------------------------+
|features  |polyFeatures                              |
+----------+------------------------------------------+
|[2.0,1.0] |[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0]     |
|[0.0,0.0] |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]     |
|[3.0,-1.0]|[3.0,9.0,27.0,-1.0,-3.0,-9.0,1.0,3.0,-1.0]|
+----------+------------------------------------------+



In [None]:
#DCT 
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)

In [None]:
from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()