In [1]:
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 53.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=a9ecd07662fdad11b336859da672ae0c272706838fb45359118a4cf2a700925a
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [9]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer (inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[6,8,13,16],[...|
|  0.0|(20,[0,2,7,13,15,...|
|  1.0|(20,[3,4,6,11,19]...|
+-----+--------------------+



In [15]:
from pyspark.ml.feature import Word2Vec

documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
  text, vector = row
  print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))


Text: [Hi, I, heard, about, Spark] => 
Vector: [-0.02300390750169754,-0.06358029544353486,0.0037466228008270265]

Text: [I, wish, java, could, use, case, classes] => 
Vector: [0.01459998737222382,-0.026636907165603976,-0.0360073766538075]

Text: [Logistic, regression, models, are, neat] => 
Vector: [-0.012251960672438146,0.0005856143310666084,0.0076403886079788215]



In [12]:
from pandas._libs.lib import fast_unique_multiple_list_gen
from pyspark.ml.feature import CountVectorizer

df = spark.createDataFrame([
    (0, "a b c".split(" "))
], ["id", "words"])

cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------+---------+
|id |words    |features |
+---+---------+---------+
|0  |[a, b, c]|(0,[],[])|
+---+---------+---------+



In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

training = spark.createDataFrame([
    (0, "a b c d spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce",0.0),
],["id", "text", "label"])

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol=("features"))
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

In [21]:
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
  ], ["id", "text"])

prediction = model.transform(test)
# print (prediction.collect())
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
  rid, text, prob, prediction = row
  print("%d, %s) --> prob=%s, prediction=%f" % (rid,text, str(prob), prediction))

4, spark i j k) --> prob=[0.5080831414127923,0.4919168585872077], prediction=0.000000
5, l m n) --> prob=[0.9851072999550657,0.01489270004493426], prediction=0.000000
6, spark hadoop spark) --> prob=[0.052589797035650615,0.9474102029643494], prediction=1.000000
7, apache hadoop) --> prob=[0.995626802321357,0.0043731976786429705], prediction=0.000000
