## SparkSession

In order to work with Spark, we have to first set up a `SparkSession`.

From this point forward, we can interact with Apache Spark using this `spark` object.

In [13]:
from pyspark.sql import SparkSession

In [14]:
# sc = SparkContext("local[*]","PySpark Word Count Example")
spark = SparkSession\
        .builder\
        .appName("PythonWordCount")\
        .getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x0A20E088>


Let's break down this code snippet a bit further.
In order to work with Spark, we have to set up a Spark Application which we wish to name `HelloWorldApp`.

To do this:
- We initiated a `SparkSession` using the `.builder` method.
- We used `.appName` to tell Spark to name our Application `PythonWordCount`. 
- We used `.getOrCreate()` to tell Spark to create the Application if it does not exist yet, or reconnect to the existing app with the given name should it exist already.
- Finally, the reference to this Spark application is stored in an object we named `spark`

*__Note__ that without a SparkSession, it is not possible to access and use Spark.
More information about SparkSession can be found [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession)*

In [15]:
from pathlib import Path, PurePath
dataset_path=Path().resolve().parent / 'data-sets/20NewshroupDataSet/20_newsgroup/alt.atheism'
# dataset_path=Path().resolve().parent / 'README.md'
print(str(dataset_path.resolve()))

S:\mesgit\CSC645\BDA\data-sets\20NewshroupDataSet\20_newsgroup\alt.atheism


In [16]:
def lower_clean_str(x):
  punc='!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'
  lowercased_str = x.lower()
  for ch in punc:
    lowercased_str = lowercased_str.replace(ch, ' ')
  return lowercased_str

In [17]:
# words = sc.textFile(str(dataset_path.resolve())+"/*/*").map(lower_clean_str)
lines = spark.read.text(str(dataset_path.resolve())+"/*").rdd.map(lambda r: r[0])

In [18]:
from operator import add
clean_lines = lines.map(lower_clean_str)
words = clean_lines.flatMap(lambda x: x.split(' '))
counts_clean = words.map(lambda x: (x, 1)).reduceByKey(add)


In [25]:
# Other 
counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)

In [26]:
threshold = lambda t: lambda v: v > t

In [27]:
print(threshold(10)(20))
print(threshold(20)(10))

True
False


In [32]:
count_filtered = counts_clean.filter(lambda couple: threshold(50)(couple[1]))

In [33]:
output = count_filtered.collect()
for (word, count) in output:
        print("%s: %i" % (word, count))

: 214994
srv: 1622
out: 538
exactly: 82
really: 277
once: 111
again: 217
interesting: 73
where: 223
far: 134
set: 109
something: 376
24: 97
willing: 52
sometimes: 56
result: 59
common: 57
10: 106
james: 76
notion: 57
work: 126
belief: 258
ece: 327
start: 80
systems: 81
ed: 59
i3150101: 221
cco: 113
1993apr21: 54
perry: 114
54: 56
bobby: 79
18: 143
through: 109
someone: 250
which: 902
by: 1307
men: 103
god: 1153
christians: 131
among: 65
still: 278
physical: 66
simply: 144
force: 73
agate: 81
article: 983
immoral: 72
instance: 64
w165w: 169
yoyo: 85
posts: 55
muslim: 128
standard: 73
this: 2397
these: 370
claims: 124
look: 166
at: 966
we: 1492
means: 186
ask: 95
evil: 82
17: 137
14: 129
lie: 56
certain: 83
34: 57
clear: 91
suppose: 54
mathew: 270
basis: 129
person: 186
usa: 59
similar: 69
opinions: 95
stanford: 118
die: 76
mchp: 469
wingate: 58
org: 67
wed: 89
bozo: 67
urbana: 53
level: 54
your: 1198
so: 1146
important: 104
since: 253
least: 155
much: 299
course: 216
problems: 65
while:

In [22]:
output = counts_clean.collect()
for (word, count) in output:
        if threshold(count ,50):
            print("%s: %i" % (word, count))


: 214994
srv: 1622
out: 538
exactly: 82
really: 277
once: 111
again: 217
interesting: 73
where: 223
far: 134
set: 109
something: 376
24: 97
willing: 52
sometimes: 56
result: 59
common: 57
10: 106
james: 76
notion: 57
work: 126
belief: 258
ece: 327
start: 80
systems: 81
ed: 59
i3150101: 221
cco: 113
1993apr21: 54
perry: 114
54: 56
bobby: 79
18: 143
through: 109
someone: 250
which: 902
by: 1307
men: 103
god: 1153
christians: 131
among: 65
still: 278
physical: 66
simply: 144
force: 73
agate: 81
article: 983
immoral: 72
instance: 64
w165w: 169
yoyo: 85
posts: 55
muslim: 128
standard: 73
this: 2397
these: 370
claims: 124
look: 166
at: 966
we: 1492
means: 186
ask: 95
evil: 82
17: 137
14: 129
lie: 56
certain: 83
34: 57
clear: 91
suppose: 54
mathew: 270
basis: 129
person: 186
usa: 59
similar: 69
opinions: 95
stanford: 118
die: 76
mchp: 469
wingate: 58
org: 67
wed: 89
bozo: 67
urbana: 53
level: 54
your: 1198
so: 1146
important: 104
since: 253
least: 155
much: 299
course: 216
problems: 65
while:

In [25]:
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)

In [26]:
wordCounts.foreach(print)

In [None]:
spark.stop()