In [1]:
data = [1, 2, 3, 4, 5]
print data
rdd = sc.parallelize(data, 4)
print rdd.collect()

In [2]:
data = [1, 3, 2, 2, 5, 1, 2, 4, 3, 1]
print data
# Remember: Out of space transformation.
# Returns a new RDD does not make changes in the same RDD.
# The returned value needs to be assigned to a variable to be accessed.
rdd = sc.parallelize(data, 4)
print rdd.map(lambda x : x * 2).collect()
print rdd.filter(lambda x : x % 2 == 0).collect()
print rdd.distinct().collect()
print rdd.map(lambda x : [x, x**2]).collect()
print rdd.flatMap(lambda x : [x, x**2]).collect()
print rdd.reduce(lambda a,b : a * b)
print rdd.take(4)
print rdd.takeOrdered(10)
print rdd.takeOrdered(15)
print rdd.takeOrdered(10, key=lambda s : s * -1)

In [3]:
%fs ls /databricks-datasets/samples/docs/

In [4]:
distFile = sc.textFile("/databricks-datasets/samples/docs/README.md", 4)
print distFile.count()

In [5]:
data = [(1, 3), (2, 5), (1, 2), (4, 5), (2, 4), (3, 6), (9, 5), (1, 1)]
rdd = sc.parallelize(data)
print rdd.reduceByKey(lambda a, b : a + b).collect()
print rdd.sortByKey().collect()
print rdd.groupByKey().collect()

In [6]:
# Broadcast Variables
bcVar = sc.broadcast([1, 2, 3])
print bcVar.value

In [7]:
# Accumulators
accVar = sc.accumulator(0)
rdd = sc.parallelize([1,2,3,4,5])
def foo(x):
  global accVar
  accVar += x
  
rdd.foreach(foo)

print accVar.value

In [8]:
rdd = sc.textFile("/databricks-datasets/samples/docs/README.md", 4)
blankLines = sc.accumulator(0)

def extractWords(line):
  global blankLines
  if line == "":
    blankLines += 1
    return []
  else:
      return line.split(" ")

words = rdd.flatMap(extractWords).filter(lambda x : x != '').takeOrdered(rdd.count())
print """
~~~Words~~~
{}""".format(words)
print "Number of blank lines is {}".format(blankLines)

In [9]:
import re

def wordCount(wordListRDD):
    """Creates a pair RDD with word counts from an RDD of words.

    Args:
        wordListRDD (RDD of str): An RDD consisting of words.

    Returns:
        RDD of (str, int): An RDD consisting of (word, count) tuples.
    """
    return wordListRDD.map(lambda x : (x, 1)).reduceByKey(lambda x, y : x + y)


def removePunctuation(text):
    """Removes punctuation, changes to lower case, and strips leading and trailing spaces.

    Note:
        Only spaces, letters, and numbers should be retained.  Other characters should should be
        eliminated (e.g. it's becomes its).  Leading and trailing spaces should be removed after
        punctuation is removed.

    Args:
        text (str): A string.

    Returns:
        str: The cleaned up string.
    """
    textWords = re.sub(r'[^\w\s]|[_,!.\']','',text.lower().strip())
    return textWords


shakespeareRDD = (sc
                  .textFile("/databricks-datasets/samples/docs/README.md", 8)
                  .map(removePunctuation))
shakespeareWordsRDD = shakespeareRDD.flatMap(lambda x : x.split())
shakeWordsRDD = shakespeareWordsRDD.filter(lambda x : x != '')
top15WordsAndCounts = wordCount(shakeWordsRDD).takeOrdered(15, key=lambda x : -x[1])
print '\n'.join(map(lambda (w, c): '{0}: {1}'.format(w, c), top15WordsAndCounts))