# Streaming1

## Getting Started

The goal of this exercise is to process a stream of text received via a network interface (TCP socket). Afterwards you perform the following three exercises:
- Part I: Print the stream
- Part II: Count the number of words in each stream
- Part III: Mask swear words, i.e. replace all swear words in the stream (based on a list of specficialy provided swear words) with asterixes ("*").

To start Spark Streaming you have to open a stream.
You can do this with the netcat notebook - look there for more information.

After you start the stream you can call the `main(timeout, func, window)`function.<br>

## main explanation
### timeout:
In this environment you normally won't see an output of Spark Streaming after the Streaming is terminated.
To let this happen, we define a time in seconds when the streaming will terminate to see it's output. In a "normal" environment you won't need a timeout time. For instance, if the timeout is set to 10 seconds, you will see a streaming output of 10 seconds and then the stream terminates.

### func
This parameter has to be a function which will take the stream as a parameter.
When you solve an exercise, you have to program it in a function which takes the stream as parameter.
So it's easy to solve different exercises without jumping in the code.

### window
The window is a parameter for Spark.
It defines the interval the data is processed.
Example: 
- window = 1 ==> Every 1 second a new data stream is processed
- window = 5 ==> Every 5 seconds a new data stream is processed

## When problems occurs
When something went wrong, you can restart the cluster.</br>
When you restart it, you have to execute every piece of code again (especially the main block)<br>

Run the code for the function `main`
=> Click on it and press `Shift` + `<ENTER>`

In [0]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

def main(timeout, func, window):
    # Create a local StreamingContext and SparkContext.
    # In the databricks environment a default SparkContext already exists, so we can't create a second one.
    #sc = SparkContext("local[*]", "Streaming-Exercise")  # `*` means running Spark locally with as many worker threads as logical cores on your machine, otherwise you can specify any arbitrary number of threads instead
    ssc = StreamingContext(sc, window)
    stream = ssc.socketTextStream("localhost", 9999)

    func(stream)
    try:
        ssc.start() # Start the computation
        ssc.awaitTerminationOrTimeout(timeout)  # To see the output in Python, we have to wait until it's finished.
    finally:
        ssc.stop(False)

## Part One
First print the stream - it's only one line of code.
Note that you first have to start the notebook "Netcat" in order to receive some data via a TCP socket.

In [0]:
def print_stream(stream):
    # TODO - Print the Stream (one line of code)


In [0]:
# Let it run (start first the notebook "Netcat" to see some output)
main(10, print_stream, 1)

## Part Two
Now you have to count how many times each word occurs.
Split the stream into words, count them and print them.

In [0]:
def count_words(stream):
    # TODO
    # 1) Split the Stream into words
    # 2) Count the words
    # 3) Print them

In [0]:
main(10, count_words, 1)

## Part Three
Build a swear word filter which masks swear words as random characters.

### Import swearwords-file
Go to Data --> Add Data --> Select the swearwords-file --> save the url of the file (Example: `/FileStore/tables/swearwords.txt`) --> set the variable file_url to the saved url
(you don't have to create a table for this)

In [0]:
swearwords = set()
escape_char = ["!", "%", "&", "+", "$", "*", "#", "="]

def initialize_swearwords():
    global swearwords
    file_url = "/FileStore/tables/swearwords.txt" # Set your own path here
    rdd = sc.textFile(file_url)
    # TODO
    # Fill the set "swearwords" with all words in the file.
    # You should remove the "," and the newline in the file before you fill it in the variable

def random_escape():
    import random
    # TODO
    # Return a random string (with escape character) to mask the swearword.

def filter_swearwords(stream):
    # TODO
    # 1) Split the stream into words
    # 2) If the word is a swearword, mask it, otherwise do nothing
    # 3) Print the text (swearwords masked, other words normal)

initialize_swearwords()

In [0]:
main(20, filter_swearwords, 1)