# Example Operator Flow Job

### What This Notebook Does

This notebook simulates a large-scale production ETL workload that data engineers and data scientists commonly encounter during their workloads.

The notebook is essentially a container runtime that performs the following steps:

1. **Extracts the data** from a data store (in this case, Databricks File System, a distributed file system similar to Hadoop)

2. **Transforms the data** - in the form of text into a more consumable format by creating a temporary virtual SQL view, and performing filtering and aggregation logic on it.

3. **Loads the data** into the UI (simply displaying it), but also writing the data into an S3 bucket for consumption by data science / analytics teams downstream.

In [2]:
from pyspark.sql.functions import regexp_replace, trim, col, lower

def removePunctuation(column):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        column (Column): A Column containing a sentence.

    Returns:
        Column: A Column named 'sentence' with clean-up operations applied.
    """
    return trim(lower(regexp_replace(column, '([^\s\w_]|_)+', ''))).alias('sentence')

## Extract the Data

Load it from DBFS Filestore, and then show the first 15 lines

In [4]:
fileName = "dbfs:/FileStore/tables/shakespeare.txt"

shakespeareDF = sqlContext.read.text(fileName).select(removePunctuation(col('value')))
shakespeareDF.show(15, truncate=False)

## Transform Data Into Consumable Entities (Filter Out Empty Words, Aggregate Word Counts)

In [6]:
from pyspark.sql.functions import split, explode

shakeWordsSplitDF = (shakespeareDF
                    .select(split(shakespeareDF.sentence, '\s+').alias('split')))
shakeWordsSingleDF = (shakeWordsSplitDF
                    .select(explode(shakeWordsSplitDF.split).alias('word')))

shakeWordsSingleDF.createTempView("shakespeare_words")

In [7]:
spark.sql("SELECT word, COUNT(*) AS occurences FROM shakespeare_words WHERE word NOT LIKE '' GROUP BY 1 ORDER BY COUNT(*) DESC").show()