In [24]:
!pip install pyspark
!pip install pandas



In [25]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
spark=SparkSession.builder.master('local[2]').getOrCreate()
sc=spark.sparkContext
spark

In [26]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

def tokenize(sentenceDataFrame):

    tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

    regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

    countTokens = udf(lambda words: len(words), IntegerType())

    tokenized = tokenizer.transform(sentenceDataFrame)
    tokenized.select("sentence", "words")\
        .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

    regexTokenized = regexTokenizer.transform(sentenceDataFrame)
    regexTokenized.select("sentence", "words") \
        .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
    return regexTokenized

In [27]:
sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

df=tokenize(sentenceDataFrame)
df.show()

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case cla

In [28]:
from pyspark.ml.feature import StopWordsRemover

def remove_stopw(df):

    remover = StopWordsRemover(inputCol="words", outputCol="filtered")
    df=remover.transform(df)
    return df

df=remove_stopw(df)

In [29]:
df.show()

+---+--------------------+--------------------+--------------------+
| id|            sentence|               words|            filtered|
+---+--------------------+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|  [hi, heard, spark]|
|  1|I wish Java could...|[i, wish, java, c...|[wish, java, use,...|
|  2|Logistic,regressi...|[logistic, regres...|[logistic, regres...|
+---+--------------------+--------------------+--------------------+



In [30]:
from pyspark.ml.feature import NGram

def ngram(worddf):
    ngram = NGram(n=2, inputCol="filtered", outputCol="ngrams")

    ngramdf = ngram.transform(worddf)
    ngramdf.select("ngrams").show(truncate=False)
    return ngramdf

ngram(df).show()

+-----------------------------------------------------+
|ngrams                                               |
+-----------------------------------------------------+
|[hi heard, heard spark]                              |
|[wish java, java use, use case, case classes]        |
|[logistic regression, regression models, models neat]|
+-----------------------------------------------------+

+---+--------------------+--------------------+--------------------+--------------------+
| id|            sentence|               words|            filtered|              ngrams|
+---+--------------------+--------------------+--------------------+--------------------+
|  0|Hi I heard about ...|[hi, i, heard, ab...|  [hi, heard, spark]|[hi heard, heard ...|
|  1|I wish Java could...|[i, wish, java, c...|[wish, java, use,...|[wish java, java ...|
|  2|Logistic,regressi...|[logistic, regres...|[logistic, regres...|[logistic regress...|
+---+--------------------+--------------------+--------------------

In [36]:
from pyspark.ml.feature import CountVectorizer

def count_vectorize(df):
    # fit a CountVectorizerModel from the corpus.
    cv = CountVectorizer(inputCol="words", outputCol="features", minDF=1.0)

    model = cv.fit(df)

    result = model.transform(df)
    result.show(truncate=False)
    return result

count_vectorize(df)

+---+-----------------------------------+------------------------------------------+------------------------------------+-----------------------------------------------------+
|id |sentence                           |words                                     |filtered                            |features                                             |
+---+-----------------------------------+------------------------------------------+------------------------------------+-----------------------------------------------------+
|0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |[hi, heard, spark]                  |(16,[0,1,2,3,10],[1.0,1.0,1.0,1.0,1.0])              |
|1  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|[wish, java, use, case, classes]    |(16,[0,4,5,7,11,14,15],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|2  |Logistic,regression,models,are,neat|[logistic, regression, models, are, neat] |[logistic, regression, models, neat]

DataFrame[id: bigint, sentence: string, words: array<string>, filtered: array<string>, features: vector]

AnalysisException: Cannot resolve column name "features" among (id, sentence, words, filtered)

In [20]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

def tfidf(df):
    tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
    wordsData = tokenizer.transform(df)

    hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
    featurizedData = hashingTF.transform(wordsData)
    # alternatively, CountVectorizer can also be used to get term frequency vectors

    idf = IDF(inputCol="rawFeatures", outputCol="features")
    idfModel = idf.fit(featurizedData)
    rescaledData = idfModel.transform(featurizedData)

    rescaledData.select("label", "features").show()
    return rescaledData
    
df=tfidf(df)
df.show()

AnalysisException: cannot resolve '`label`' given input columns: [features, filtered, id, rawFeatures, sentence, words];
'Project ['label, features#194]
+- Project [id#0L, sentence#1, words#38, filtered#82, rawFeatures#183, UDF(rawFeatures#183) AS features#194]
   +- Project [id#0L, sentence#1, words#38, filtered#82, UDF(words#38) AS rawFeatures#183]
      +- Project [id#0L, sentence#1, words#38, UDF(words#38) AS filtered#82]
         +- Project [id#0L, sentence#1, UDF(sentence#1) AS words#38]
            +- LogicalRDD [id#0L, sentence#1], false


In [9]:
!wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/jupyter/annotation/english/spark-nlp-basics/sample-sentences-en.txt

[31mERROR: Could not find a version that satisfies the requirement pyspark.ml (from versions: none)[0m
[31mERROR: No matching distribution found for pyspark.ml[0m


In [21]:
from pyspark.ml import Pipeline

documentAssembler = DocumentAssembler()\
    .setInputCol("statement")\
    .setOutputCol("document")

sentenceDetector = SentenceDetector()\
    .setInputCols(['document'])\
    .setOutputCol('sentences')

tokenizer = Tokenizer() \
    .setInputCols(["sentences"]) \
    .setOutputCol("token")

nlpPipeline = Pipeline(stages=[
    documentAssembler, 
    sentenceDetector,
    tokenizer
 ])

empty_df = spark.createDataFrame([['']]).toDF("text")

pipelineModel = nlpPipeline.fit(empty_df)

NameError: name 'DocumentAssembler' is not defined

In [11]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |filtered            |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



In [None]:
from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

In [45]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



In [22]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[6,8,13,16],[...|
|  0.0|(20,[0,2,7,13,15,...|
|  1.0|(20,[3,4,6,11,19]...|
+-----+--------------------+

