# Pyspark exploration

## Import dataset and setup the environment 

In [2]:
from pyspark import SparkContext
import os 

os.environ['JAVA_HOME']="/Library/Java/JavaVirtualMachines/jdk1.8.0_202.jdk/Contents/Home/"

os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] pyspark-shell"

sc = SparkContext.getOrCreate()

In [58]:
data_file = "../data/labelled_dataset.csv.gz"
raw_data = sc.textFile(data_file).cache()

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()

In [59]:
df = spark.read.csv(data_file,header=True,inferSchema=True)

## DataFrame operations

### columns name

In [21]:
df.columns

['label', 'txt']

### Select column

In [22]:
df.txt

Column<b'txt'>

### Print Schema

In [23]:
df.printSchema()

root
 |-- label: string (nullable = true)
 |-- txt: string (nullable = true)



### Head 5

In [24]:
# Didn't strictly need a for loop, could have just then head()
for row in df.head(5):
    print(row)
    print('\n')

Row(label='ham', txt='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...')


Row(label='ham', txt='Ok lar... Joking wif u oni...')


Row(label='spam', txt="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's")


Row(label='ham', txt='U dun say so early hor... U c already then say...')


Row(label='ham', txt="Nah I don't think he goes to usf, he lives around here though")




In [25]:
df.describe().show()

+-------+--------------------+------------------+
|summary|               label|               txt|
+-------+--------------------+------------------+
|  count|             1970119|            840730|
|   mean|            Infinity|1023.1263393359594|
| stddev|                 NaN|  8450.10290912209|
|    min|                   !|                  |
|    max|⸪ Great Allowance...|        ”” said he|
+-------+--------------------+------------------+



### Count

In [26]:
df.filter("label == 'spam'").count()

877

In [27]:
df.filter("label == 'books'").count()

1009

In [28]:
# https://stackoverflow.com/questions/48927271/count-number-of-words-in-a-spark-dataframe
import pyspark.sql.functions as f
data = df

### word count

In [29]:
# word count for each row
df = df.withColumn('wordCount', f.size(f.split(f.col('txt'), ' ')))
df.show()

+-----+--------------------+---------+
|label|                 txt|wordCount|
+-----+--------------------+---------+
|  ham|Go until jurong p...|       20|
|  ham|Ok lar... Joking ...|        6|
| spam|Free entry in 2 a...|       28|
|  ham|U dun say so earl...|       11|
|  ham|Nah I don't think...|       13|
| spam|FreeMsg Hey there...|       32|
|  ham|Even my brother i...|       16|
|  ham|As per your reque...|       26|
| spam|WINNER!! As a val...|       26|
| spam|Had your mobile 1...|       29|
|  ham|I'm gonna be home...|       21|
| spam|SIX chances to wi...|       26|
| spam|URGENT! You have ...|       26|
|  ham|I've been searchi...|       37|
|  ham|I HAVE A DATE ON ...|        8|
| spam|XXXMobileMovieClu...|       19|
|  ham|Oh k...i'm watchi...|        4|
|  ham|Eh u remember how...|       19|
|  ham|Fine if thatåÕs t...|       13|
| spam|England v Macedon...|       24|
+-----+--------------------+---------+
only showing top 20 rows



In [30]:
# total word
df.select(f.sum('wordCount')).collect() 

[Row(sum(wordCount)=5584925)]

In [31]:
# see what type df is
type(df)

pyspark.sql.dataframe.DataFrame

In [32]:
# first 5 row
df.take(5)

[Row(label='ham', txt='Go until jurong point, crazy.. Available only in bugis n great world la e buffet... Cine there got amore wat...', wordCount=20),
 Row(label='ham', txt='Ok lar... Joking wif u oni...', wordCount=6),
 Row(label='spam', txt="Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's", wordCount=28),
 Row(label='ham', txt='U dun say so early hor... U c already then say...', wordCount=11),
 Row(label='ham', txt="Nah I don't think he goes to usf, he lives around here though", wordCount=13)]

In [33]:
df.show()

+-----+--------------------+---------+
|label|                 txt|wordCount|
+-----+--------------------+---------+
|  ham|Go until jurong p...|       20|
|  ham|Ok lar... Joking ...|        6|
| spam|Free entry in 2 a...|       28|
|  ham|U dun say so earl...|       11|
|  ham|Nah I don't think...|       13|
| spam|FreeMsg Hey there...|       32|
|  ham|Even my brother i...|       16|
|  ham|As per your reque...|       26|
| spam|WINNER!! As a val...|       26|
| spam|Had your mobile 1...|       29|
|  ham|I'm gonna be home...|       21|
| spam|SIX chances to wi...|       26|
| spam|URGENT! You have ...|       26|
|  ham|I've been searchi...|       37|
|  ham|I HAVE A DATE ON ...|        8|
| spam|XXXMobileMovieClu...|       19|
|  ham|Oh k...i'm watchi...|        4|
|  ham|Eh u remember how...|       19|
|  ham|Fine if thatåÕs t...|       13|
| spam|England v Macedon...|       24|
+-----+--------------------+---------+
only showing top 20 rows



### word count collection

In [61]:
df.withColumn('word', f.explode(f.split(f.col('txt'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)\
    .show()

+----+------+
|word| count|
+----+------+
|    |784389|
| the|282718|
| and|194113|
|  of|163358|
|  to|145701|
|   a|103066|
|  in| 90602|
|   I| 88815|
|that| 72731|
|  he| 53735|
| his| 48351|
|  it| 46497|
|  as| 46428|
|with| 45537|
| was| 45149|
|  is| 43232|
| you| 42713|
| for| 41989|
|  my| 39440|
|  be| 37498|
+----+------+
only showing top 20 rows



In [62]:
word = df.withColumn('word', f.explode(f.split(f.col('txt'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)


In [63]:
to_list = [list(row) for row in word.collect()]

In [64]:
to_list

[['', 784389],
 ['the', 282718],
 ['and', 194113],
 ['of', 163358],
 ['to', 145701],
 ['a', 103066],
 ['in', 90602],
 ['I', 88815],
 ['that', 72731],
 ['he', 53735],
 ['his', 48351],
 ['it', 46497],
 ['as', 46428],
 ['with', 45537],
 ['was', 45149],
 ['is', 43232],
 ['you', 42713],
 ['for', 41989],
 ['my', 39440],
 ['be', 37498],
 ['not', 36717],
 ['said', 35815],
 ['but', 30960],
 ['her', 29836],
 ['by', 28872],
 ['which', 27959],
 ['had', 27774],
 ['have', 27517],
 ['at', 26732],
 ['on', 24268],
 ['all', 23451],
 ['she', 23229],
 ['or', 23213],
 ['”', 22745],
 ['they', 22602],
 ['this', 22290],
 ['from', 22200],
 ['so', 21598],
 ['me', 20026],
 ['him', 19725],
 ['are', 18955],
 ['their', 16713],
 ['your', 16205],
 ['we', 16088],
 ['no', 16000],
 ['will', 15928],
 ['an', 15740],
 ['if', 15532],
 ['who', 15366],
 ['were', 14681],
 ['when', 14664],
 ['one', 14050],
 ['what', 13680],
 ["'", 12522],
 ['them', 12275],
 ['would', 12184],
 ['been', 11145],
 ['more', 11038],
 ['there', 10798]

In [36]:
311334

AttributeError: 'NoneType' object has no attribute 'count'

In [None]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)



In [41]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

tokenizer = Tokenizer(inputCol="txt", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="txt", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(df)

regexTokenized = regexTokenizer.transform(df)

regexTokenized.show()

+-----+--------------------+---------+--------------------+
|label|                 txt|wordCount|               words|
+-----+--------------------+---------+--------------------+
|  ham|Go until jurong p...|       20|[go, until, juron...|
|  ham|Ok lar... Joking ...|        6|[ok, lar, joking,...|
| spam|Free entry in 2 a...|       28|[free, entry, in,...|
|  ham|U dun say so earl...|       11|[u, dun, say, so,...|
|  ham|Nah I don't think...|       13|[nah, i, don, t, ...|
| spam|FreeMsg Hey there...|       32|[freemsg, hey, th...|
|  ham|Even my brother i...|       16|[even, my, brothe...|
|  ham|As per your reque...|       26|[as, per, your, r...|
| spam|WINNER!! As a val...|       26|[winner, as, a, v...|
| spam|Had your mobile 1...|       29|[had, your, mobil...|
|  ham|I'm gonna be home...|       21|[i, m, gonna, be,...|
| spam|SIX chances to wi...|       26|[six, chances, to...|
| spam|URGENT! You have ...|       26|[urgent, you, hav...|
|  ham|I've been searchi...|       37|[i

## Remove stop words

In [45]:
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
data = remover.transform(regexTokenized).show()

+-----+--------------------+---------+--------------------+--------------------+
|label|                 txt|wordCount|               words|            filtered|
+-----+--------------------+---------+--------------------+--------------------+
|  ham|Go until jurong p...|       20|[go, until, juron...|[go, jurong, poin...|
|  ham|Ok lar... Joking ...|        6|[ok, lar, joking,...|[ok, lar, joking,...|
| spam|Free entry in 2 a...|       28|[free, entry, in,...|[free, entry, 2, ...|
|  ham|U dun say so earl...|       11|[u, dun, say, so,...|[u, dun, say, ear...|
|  ham|Nah I don't think...|       13|[nah, i, don, t, ...|[nah, think, goes...|
| spam|FreeMsg Hey there...|       32|[freemsg, hey, th...|[freemsg, hey, da...|
|  ham|Even my brother i...|       16|[even, my, brothe...|[even, brother, l...|
|  ham|As per your reque...|       26|[as, per, your, r...|[per, request, me...|
| spam|WINNER!! As a val...|       26|[winner, as, a, v...|[winner, valued, ...|
| spam|Had your mobile 1...|

In [48]:
data = remover.transform(regexTokenized)

In [52]:
data.show()



+-----+--------------------+---------+--------------------+--------------------+
|label|                 txt|wordCount|               words|            filtered|
+-----+--------------------+---------+--------------------+--------------------+
|  ham|Go until jurong p...|       20|[go, until, juron...|[go, jurong, poin...|
|  ham|Ok lar... Joking ...|        6|[ok, lar, joking,...|[ok, lar, joking,...|
| spam|Free entry in 2 a...|       28|[free, entry, in,...|[free, entry, 2, ...|
|  ham|U dun say so earl...|       11|[u, dun, say, so,...|[u, dun, say, ear...|
|  ham|Nah I don't think...|       13|[nah, i, don, t, ...|[nah, think, goes...|
| spam|FreeMsg Hey there...|       32|[freemsg, hey, th...|[freemsg, hey, da...|
|  ham|Even my brother i...|       16|[even, my, brothe...|[even, brother, l...|
|  ham|As per your reque...|       26|[as, per, your, r...|[per, request, me...|
| spam|WINNER!! As a val...|       26|[winner, as, a, v...|[winner, valued, ...|
| spam|Had your mobile 1...|

Py4JJavaError: An error occurred while calling o484.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 51.0 failed 1 times, most recent failure: Lost task 0.0 in stage 51.0 (TID 1078, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:143)
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:141)
	... 18 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:143)
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:141)
	... 18 more


In [50]:
from pyspark.ml.feature import CountVectorizer

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=311334, minDF=2.0)

model = cv.fit(data)

result = model.transform(data)
result.show(truncate=False)

Py4JJavaError: An error occurred while calling o485.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 46.0 failed 1 times, most recent failure: Lost task 0.0 in stage 46.0 (TID 1074, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:143)
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:141)
	... 18 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
	at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:230)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$createTransformFunc$2: (string) => array<string>)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.NullPointerException
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:143)
	at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc$2.apply(Tokenizer.scala:141)
	... 18 more


## random try

In [135]:
df.withColumn('list', df['txt'].split(" ")).show()

TypeError: 'Column' object is not callable

In [141]:
split_col = f.split(df['txt'], ' ')

In [152]:
data = data.withColumn('NAME1', split_col.getItem(0))
data = data.withColumn('NAME2', split_col.getItem(1))



In [5]:
data.show()

NameError: name 'data' is not defined

In [116]:
df.withColumn('word', f.explode(f.split(f.col('txt'), ' ')))\
    .groupBy('word')\
    .count()\
    .sort('count', ascending=False)\
    .show()

+----+------+
|word| count|
+----+------+
|    |784389|
| the|282718|
| and|194113|
|  of|163358|
|  to|145701|
|   a|103066|
|  in| 90602|
|   I| 88815|
|that| 72731|
|  he| 53735|
| his| 48351|
|  it| 46497|
|  as| 46428|
|with| 45537|
| was| 45149|
|  is| 43232|
| you| 42713|
| for| 41989|
|  my| 39440|
|  be| 37498|
+----+------+
only showing top 20 rows



In [117]:
from pyspark.ml.feature import StopWordsRemover


In [95]:
text_file = sc.textFile("../books/*.txt")


In [91]:
text_file.count()

6003218

In [7]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case cla

In [51]:
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])
df.show()

+---+---------------+
| id|          words|
+---+---------------+
|  0|      [a, b, c]|
|  1|[a, b, b, c, a]|
+---+---------------+



In [16]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

print((df.count(), len(df.columns)))


+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+

(2, 2)
