In [1]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local[*]").setAppName("Naive_Bayes")
sc   = SparkContext(conf=conf)
print ("Running Spark Version %s" % (sc.version))

Running Spark Version 2.2.0


In [2]:
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes

In [8]:
training_raw = sc.parallelize([
    {"text": "foo foo foo bar bar protein", "label": 1.0},
    {"text": "foo bar dna for bar", "label": 0.0},
    {"text": "foo bar foo dna foo", "label": 0.0},
    {"text": "bar foo protein foo ", "label": 1.0}])


# Split data into labels and features, transform
# preservesPartitioning is not really required
# since map without partitioner shouldn't trigger repartitiong
labels = training_raw.map(
    lambda doc: doc["label"],  # Standard Python dict access 
    preservesPartitioning=True # This is obsolete.
)

tf = HashingTF(numFeatures=100).transform( ## Use much larger number in practice
    training_raw.map(lambda doc: doc["text"].split(), 
    preservesPartitioning=True))

idf = IDF().fit(tf)
tfidf = idf.transform(tf)

# Combine using zip
training = labels.zip(tfidf).map(lambda x: LabeledPoint(x[0], x[1]))

# Train and check
model = NaiveBayes.train(training)
labels_and_preds = labels.zip(model.predict(tfidf)).map(
    lambda x: {"actual": x[0], "predicted": float(x[1])})
#To get some statistics you can use MulticlassMetrics:

from pyspark.mllib.evaluation import MulticlassMetrics
from operator import itemgetter

metrics = MulticlassMetrics(
    labels_and_preds.map(itemgetter("actual", "predicted")))

metrics.confusionMatrix().toArray()
## array([[ 2.,  0.],
##        [ 0.,  2.]])

array([[ 2.,  0.],
       [ 0.,  2.]])

In [131]:
training_raw = sc.parallelize([
    {"text": "foo foo foo bar bar protein", "label": 1.0},
    {"text": "foo bar dna for bar", "label": 0.0},
    {"text": "foo bar foo dna foo", "label": 0.0},
    {"text": "bar foo protein foo ", "label": 1.0}])

In [132]:
labels = training_raw.map(
    lambda doc: doc["label"],  # Standard Python dict access 
    preservesPartitioning=True # This is obsolete.
)

In [133]:
type(labels)

pyspark.rdd.PipelinedRDD

In [134]:
labels

PythonRDD[169] at RDD at PythonRDD.scala:48

In [3]:
path = "mini_newsgroups/*"

In [4]:
newsgroupsRawData = sc.wholeTextFiles(path)

In [5]:
type(newsgroupsRawData)

pyspark.rdd.RDD

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('rec').getOrCreate()

In [206]:
dfWithoutSchema = spark.createDataFrame(newsgroupsRawData)

In [207]:
type(dfWithoutSchema)

pyspark.sql.dataframe.DataFrame

In [208]:
dfWithoutSchema.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)



In [209]:
dfWithoutSchema.select("_1").show()

+--------------------+
|                  _1|
+--------------------+
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
+--------------------+
only showing top 20 rows



In [166]:
dfWithoutSchema.head()

Row(_1='file:/F:/textmining/mini_newsgroups/alt.atheism/51121', _2='Xref: cantaloupe.srv.cs.cmu.edu alt.atheism:51121 soc.motss:139944 rec.scouting:5318\nNewsgroups: alt.atheism,soc.motss,rec.scouting\nPath: cantaloupe.srv.cs.cmu.edu!crabapple.srv.cs.cmu.edu!fs7.ece.cmu.edu!europa.eng.gtefsd.com!howland.reston.ans.net!wupost!uunet!newsgate.watson.ibm.com!yktnews.watson.ibm.com!watson!Watson.Ibm.Com!strom\nFrom: strom@Watson.Ibm.Com (Rob Strom)\nSubject: Re: [soc.motss, et al.] "Princeton axes matching funds for Boy Scouts"\nSender: @watson.ibm.com\nMessage-ID: <1993Apr05.180116.43346@watson.ibm.com>\nDate: Mon, 05 Apr 93 18:01:16 GMT\nDistribution: usa\nReferences: <C47EFs.3q47@austin.ibm.com> <1993Mar22.033150.17345@cbnewsl.cb.att.com> <N4HY.93Apr5120934@harder.ccr-p.ida.org>\nOrganization: IBM Research\nLines: 15\n\nIn article <N4HY.93Apr5120934@harder.ccr-p.ida.org>, n4hy@harder.ccr-p.ida.org (Bob McGwier) writes:\n\n|> [1] HOWEVER, I hate economic terrorism and political correctnes

In [210]:
dfWithoutSchema = dfWithoutSchema.selectExpr("_1 as str_label", "_2 as text")

In [212]:
from pyspark.sql.functions import size
dfWithoutSchema.withColumn('str_label',split('str_label','/')[4])

DataFrame[str_label: string, text: string]

In [213]:
dfWithoutSchema = dfWithoutSchema.withColumn('str_label',split('str_label','/')[4])

In [214]:
dfWithoutSchema.show()

+-----------+--------------------+
|  str_label|                text|
+-----------+--------------------+
|alt.atheism|Xref: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Xref: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Xref: cantaloupe....|
|alt.atheism|Newsgroups: alt.a...|
|alt.atheism|Newsgroups: alt.a...|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Newsgroups: alt.a...|
|alt.atheism|Path: cantaloupe....|
+-----------+--------------------+
only showing top 20 rows



In [171]:
print(dfWithoutSchema.count())

200


In [173]:
labels = dfWithoutSchema.rdd.map(
    lambda doc: doc["label"],  # Standard Python dict access 
    preservesPartitioning=True # This is obsolete.
)


In [174]:
type(labels)

pyspark.rdd.PipelinedRDD

In [176]:
tf = HashingTF(numFeatures=2000).transform( ## Use much larger number in practice
    dfWithoutSchema.rdd.map(lambda doc: doc["text"].split(), 
    preservesPartitioning=True))


In [198]:
dfWithoutSchema.show()

+-----------+--------------------+
|      label|                text|
+-----------+--------------------+
|alt.atheism|Xref: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Xref: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Xref: cantaloupe....|
|alt.atheism|Newsgroups: alt.a...|
|alt.atheism|Newsgroups: alt.a...|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Path: cantaloupe....|
|alt.atheism|Newsgroups: alt.a...|
|alt.atheism|Path: cantaloupe....|
+-----------+--------------------+
only showing top 20 rows



In [215]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="str_label", outputCol="label")
indexed_df = indexer.fit(dfWithoutSchema).transform(dfWithoutSchema)

In [216]:
indexed_df.show()

+-----------+--------------------+-----+
|  str_label|                text|label|
+-----------+--------------------+-----+
|alt.atheism|Xref: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Xref: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Xref: cantaloupe....|  1.0|
|alt.atheism|Newsgroups: alt.a...|  1.0|
|alt.atheism|Newsgroups: alt.a...|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
|alt.atheism|Newsgroups: alt.a...|  1.0|
|alt.atheism|Path: cantaloupe....|  1.0|
+-----------+--------------------+-----+
only showing top

In [217]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered")
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")
#lr = LogisticRegression(maxIter=10, regParam=0.001)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
pipeline = Pipeline(stages=[tokenizer,remover, hashingTF, nb])

# Fit the pipeline to training documents.
model = pipeline.fit(indexed_df)

In [135]:
dfWithoutSchema.show()

+--------------------+--------------------+
|               label|                text|
+--------------------+--------------------+
|file:/F:/textmini...|Xref: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Xref: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Xref: cantaloupe....|
|file:/F:/textmini...|Newsgroups: alt.a...|
|file:/F:/textmini...|Newsgroups: alt.a...|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Path: cantaloupe....|
|file:/F:/textmini...|Newsgroups: alt.a...|
|file:/F:/textmini...|Path: cant

In [None]:
ratings = dfWithoutSchema.map(lambda _l: _l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

In [None]:
print("The number of documents read in is " + newsgroupsRawData.count() + ".")

In [None]:
newsgroupsRawData.takeSample(false, 1, 10L).foreach(println)

In [24]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

In [25]:
type(data)

pyspark.rdd.RDD

In [36]:
type(ratings)

pyspark.rdd.PipelinedRDD

In [37]:
dfWithoutSchema.select("_1").show()

+--------------------+
|                  _1|
+--------------------+
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
|file:/F:/textmini...|
+--------------------+
only showing top 20 rows



In [None]:
from pyspark.sql.functions import size
df.withColumn('col4',split('col4',' ').select()).show()

In [59]:
label = dfWithoutSchema.select("_1").rdd.map(lambda l : str()))

In [60]:
type(label)

pyspark.rdd.PipelinedRDD

In [61]:
print(label.take(10))

["<class 'pyspark.sql.types.Row'>", "<class 'pyspark.sql.types.Row'>", "<class 'pyspark.sql.types.Row'>", "<class 'pyspark.sql.types.Row'>", "<class 'pyspark.sql.types.Row'>", "<class 'pyspark.sql.types.Row'>", "<class 'pyspark.sql.types.Row'>", "<class 'pyspark.sql.types.Row'>", "<class 'pyspark.sql.types.Row'>", "<class 'pyspark.sql.types.Row'>"]


In [63]:
type(dfWithoutSchema.select("_1"))

pyspark.sql.dataframe.DataFrame

In [64]:
dfWithoutSchema.s

AttributeError: 'DataFrame' object has no attribute 's'

In [91]:
df = spark.createDataFrame([('ab12cd',)], ['s',])



In [92]:
df.show()

+------+
|     s|
+------+
|ab12cd|
+------+



In [98]:
from pyspark.sql.functions import split, explode
df1 = df.select(split(df.s, '[0-9]+').alias('s'))
df1

DataFrame[s: array<string>]

In [99]:
type(df1)

pyspark.sql.dataframe.DataFrame

In [101]:
df1.show()

+--------+
|       s|
+--------+
|[ab, cd]|
+--------+



In [67]:
# Create dummy data
df = sc.parallelize([(1, 2, 3, 'a b c'),
                     (4, 5, 6, 'd e f'),
                     (7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])



In [68]:
type(df)

pyspark.sql.dataframe.DataFrame

In [69]:
df.head()

Row(col1=1, col2=2, col3=3, col4='a b c')

In [70]:
df.show()

+----+----+----+-----+
|col1|col2|col3| col4|
+----+----+----+-----+
|   1|   2|   3|a b c|
|   4|   5|   6|d e f|
|   7|   8|   9|g h i|
+----+----+----+-----+



In [88]:
from pyspark.sql.functions import size
df.withColumn('col4',split('col4',' ').select()).show()

TypeError: 'Column' object is not callable

In [105]:
s = 'file:/F:/textmining/mini_newsgroups/alt.atheism/51121'

SyntaxError: EOL while scanning string literal (<ipython-input-105-2a4856269f2a>, line 1)

In [108]:
print()

['file:/F:/textmining/mini_newsgroups/alt.atheism/51121']


In [178]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
for features_label in rescaledData.select("features", "label").take(3):
    print(features_label)

Row(features=SparseVector(20, {0: 0.6931, 5: 0.6931, 9: 0.2877, 17: 1.3863}), label=0)
Row(features=SparseVector(20, {2: 0.6931, 7: 0.6931, 9: 0.863, 13: 0.2877, 15: 0.2877}), label=0)
Row(features=SparseVector(20, {4: 0.6931, 6: 0.6931, 13: 0.2877, 15: 0.2877, 18: 0.6931}), label=1)


In [179]:
sentenceData.show()

+-----+--------------------+
|label|            sentence|
+-----+--------------------+
|    0|Hi I heard about ...|
|    0|I wish Java could...|
|    1|Logistic regressi...|
+-----+--------------------+



In [188]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.classification import NaiveBayes

sentenceData = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["features", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=2000)

idf = IDF(inputCol="rawFeatures", outputCol="features")
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
pipeline = Pipeline(stages=[tokenizer,remover, hashingTF, nb])

model = pipeline.fit(sentenceData)

# test = spark.createDataFrame([
#     (4, "spark i j k"),
#     (5, "l m n"),
#     (6, "spark hadoop spark"),
#     (7, "apache hadoop")
# ], ["id", "text"])
# prediction = model.transform(test)
# selected = prediction.select("id", "text", "probability", "prediction")
# for row in selected.collect():
#     rid, text, prob, prediction = row
#     print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))


IllegalArgumentException: 'requirement failed: Column features must be of type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 but was actually LongType.'

In [197]:
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="filtered")
hashingTF = HashingTF(inputCol=remover.getOutputCol(), outputCol="features")
#lr = LogisticRegression(maxIter=10, regParam=0.001)
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")
pipeline = Pipeline(stages=[tokenizer,remover, hashingTF, nb])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

(4, spark i j k) --> prob=[0.250010728724,0.749989271276], prediction=1.000000
(5, l m n) --> prob=[0.50001430476,0.49998569524], prediction=0.000000
(6, spark hadoop spark) --> prob=[0.181826693896,0.818173306104], prediction=1.000000
(7, apache hadoop) --> prob=[0.666675143508,0.333324856492], prediction=0.000000


In [201]:
from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed_df = indexer.fit(df).transform(df)

In [202]:
indexed_df.show()

+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

