In [None]:
# Findspark
import findspark
findspark.init('C:\Spark\spark-3.0.3-bin-hadoop3.2')

# Stop any existing spark processes
try:
    sc.stop()
except:
    pass

# Load document textfile into hdfs


# load textfile into spark from hdfs
# Spark 3.2.1 did not work but 3.0.3 did
# Save transformations as new variables
# Map Values
# Split tags
# Parse Ids by Tags
# Bonus: Change closeddates to closed binary(y/n)
# Parse Scores by Tags
# Parse Tags by Closed cases
# Reduce lists
# Parallelize data into json and convert to dataframe
# Use dataframe to plot data and answer questions


In [39]:
import findspark
findspark.init('C:\Spark\spark-3.0.3-bin-hadoop3.2')

In [40]:
# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

Now that we have the spark context, let's use it to load our text data file from HDFS. This stores the data into a data structure called a **Resilient Distributed Dataset (RDD).** 

In [58]:
rdd_raw = sc.textFile("C:/Users/SCULLY/Desktop/msds-final/QueryResults.csv")

In [59]:
rdd_raw.take(2)

['Id,Tags,Score,ClosedDate', '"69969780","<dataframe><julia>","5",""']

In [60]:
# x is an element of the data set - in this case a line of text
header = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] == 'Tags').collect()[0]
header[0] = 'Id'
header

['Id', 'Tags', 'Score', 'ClosedDate']

In [62]:
rdd = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] != 'Tags')
rdd.take(2)

[['"69969780"', '"<dataframe><julia>"', '"5"', '""'],
 ['"69969782"',
  '"<jquery><ajax><asp.net-core-mvc><asp.net-core-webapi>"',
  '"1"',
  '""']]

In [55]:
so_map.take(5)

[(69969780, '<dataframe><julia>'),
 (69969782, '<jquery><ajax><asp.net-core-mvc><asp.net-core-webapi>'),
 (69969783, '<javascript><timer><jspsych>'),
 (69969784, None),
 (69969785, '<python><python-3.x><dictionary><anagram>')]

### A list of... lists?
The output of the last map command is a little surprising. **What happened?**

Recall that map() applies the function to each "piece" of data. In this case, each piece of data is a *line of text* and the function, split(), takes a line of text and **returns a list of individual words.** 

Our first transformation gave us individual words but they are wrapped in lists that we don't need. Fortunately, we have a way to deal with it. 

A **list of lists** is called **nested lists.** If you convert nested lists into a single list, you **flatten** it. 

Spark has a function that will act like map() and also flatten nested lists. It is called **flatmap().**

Let's see if we can use flatmap() to output our (word, 1) pairs. BTW, the parentheses on the **(**word,1**)** pair means we are grouping each word with the number 1 (technically, this is called a "tuple").

In [5]:
word_tuple = word_list.flatMap(lambda wordlist: [(word, 1) for word in wordlist])
word_tuple.take(5)

[('the', 1), ('sonnets', 1), ('by', 1), ('william', 1), ('shakespeare', 1)]

### OK, that is... complicated

Let's look at that last line in detail.

* `flatmap()` helps to flatten out the nested lists, and it takes a function just like `map()`. That's where the lambda comes in,
* On this lambda I called the incoming data **wordlist** just to help distinguish it.

"OK," I hear you saying, "but what the heck is with the brackets and `for` thingy?"

I'm glad you asked! The brackets make a **list comprehension**. This is a fancy, one-line version of a for loop.

Observe, in regular Python:

In [6]:
data = ['Now', ',', 'fair', 'Hippolyta', ',', 'our', 'nuptial', 'hour']
for word in data:
    print ((word, 1))

('Now', 1)
(',', 1)
('fair', 1)
('Hippolyta', 1)
(',', 1)
('our', 1)
('nuptial', 1)
('hour', 1)


That looks just like what we want. Let's try it as a list comprehension:

In [7]:
[(word, 1) for word in data]

[('Now', 1),
 (',', 1),
 ('fair', 1),
 ('Hippolyta', 1),
 (',', 1),
 ('our', 1),
 ('nuptial', 1),
 ('hour', 1)]

Notice the list comprehension gave us output in a list. The `flatmap()` function helped keep from nesting more lists.

## Next up: reduce()

"Reduce" in the MapReduce sense of the word means **to gather** or **to sum up.** 

You might remember that MapReduce has an *intermediate* shuffle/sort step that helps group all the occurrences of a word before going in to reduce. It effectivel turns this:

`('the', 1), ('the', 1), ('the', 1), ('the', 1), ('the', 1)`

into this:

`('the', 1,1,1,1,1)`

An interesting way of thinking of each tuple (thing with parentheses) is as a (key, value). So, above, each 'the' would be a key and each 1 is a value. Spark has a cool function called `reduceByKey()` that helps us:

In [8]:
word_counts = word_tuple.reduceByKey(lambda total, count: total + count)
word_counts.take(5)

[('shakespeare', 1),
 ('fairest', 5),
 ('creatures', 2),
 ('we', 15),
 ('increase', 4)]

## Results

That operation seems to have done exactly what we want. **BUT** even though that operation may have taken a long time, that's just because of the .take(). Normally, the full set of transformations would are not applied until a `reduce()` -type action is taken. Also a call to `collect()` will finalize transformations and collect the results *(can be used when you don't need a reduce but need finalize before (for example) writing to an output file)*. Let's check how many elements are in word_counts:

In [9]:
word_counts.count()

3721

## Saving

RDDs can be saved back to disk very easily. Just use the `saveAsTextFile()` function.

In [11]:
word_counts.saveAsTextFile('C:///Users/SCULLY/textfiles/spark_wordcount.txt')

Py4JJavaError: An error occurred while calling o110.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:105)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1090)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1088)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1061)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1026)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1008)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1007)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:964)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:962)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1552)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1552)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1538)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1538)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:549)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:645)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1230)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1435)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:493)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:678)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1868)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1910)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:332)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:402)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:375)
	at org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
	at org.apache.hadoop.mapred.OutputCommitter.commitJob(OutputCommitter.java:291)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:167)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:99)
	... 50 more


# But Wait! There's More!

Spark commands are meant to be **"chained"** together -- in other words, the output from one function call is immediately used as input for the next function call.

Let's see how chaining would work on our WordCount example. Remember, the variable **text_file** holds the contents of our source data file.

Here is the code, with comments:

`text_file.map(lambda x: x.split()).  \ # We put a "." after the map() to "chain" results -- \ is a line continuation
            flatMap(lambda wordlist: [(word, 1) for word in wordlist]). \
            reduceByKey(lambda total, count: total + count). \
            take(5) `

In [None]:
text_file.map(lambda x: x.split()).      \
            flatMap(lambda wordlist: [(word, 1) for word in wordlist]). \
            reduceByKey(lambda total, count: total + count). \
            take(5) 