In [1]:
# Create RDD from local text file
rdd = sc.textFile('file:/etc/hosts')
print rdd.toDebugString()

(2) file:/etc/hosts MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
 |  file:/etc/hosts HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []


In [2]:
# Apply filter to create a new RDD with lines containing 'localhost' string
filetered_rdd = rdd.filter(lambda line: "localhost" in line)
print filetered_rdd.toDebugString()

(2) PythonRDD[2] at RDD at PythonRDD.scala:42 []
 |  file:/etc/hosts MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
 |  file:/etc/hosts HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []


In [3]:
# Materialize filtered RDD and print each line
for line in filetered_rdd.collect():
    print line

# localhost is used to configure the loopback interface
127.0.0.1	localhost
::1             localhost 
fe80::1%lo0	localhost


In [4]:
# Cache filtered RDD
filetered_rdd_cached = filetered_rdd.cache()
print filetered_rdd_cached.toDebugString()

(2) PythonRDD[2] at RDD at PythonRDD.scala:42 [Memory Serialized 1x Replicated]
 |  file:/etc/hosts MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 [Memory Serialized 1x Replicated]
 |  file:/etc/hosts HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 [Memory Serialized 1x Replicated]


In [5]:
# Apply flatMap to spilt line into words
flatmapped_rdd = filetered_rdd.flatMap(lambda line: line.split())
print flatmapped_rdd.toDebugString()

(2) PythonRDD[3] at RDD at PythonRDD.scala:42 []
 |  PythonRDD[2] at RDD at PythonRDD.scala:42 []
 |  file:/etc/hosts MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
 |  file:/etc/hosts HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []


In [6]:
# Word Count
wc_one_rdd = flatmapped_rdd.map(lambda word: (word, 1))
wc_rdd = wc_one_rdd.reduceByKey(lambda count1, count2: count1 + count2)
for word, count in wc_rdd.collect():
    print word, count

# 1
is 1
::1 1
localhost 4
interface 1
the 1
127.0.0.1 1
to 1
used 1
configure 1
fe80::1%lo0 1
loopback 1


In [7]:
# Sort by key
wc_rdd_sorted = wc_rdd.sortByKey()
for word, count in wc_rdd_sorted.collect():
    print word, count

# 1
127.0.0.1 1
::1 1
configure 1
fe80::1%lo0 1
interface 1
is 1
localhost 4
loopback 1
the 1
to 1
used 1


In [8]:
print wc_rdd_sorted.toDebugString()

(2) PythonRDD[16] at collect at <ipython-input-7-0fdef4f82843>:3 []
 |  MapPartitionsRDD[15] at mapPartitions at PythonRDD.scala:338 []
 |  ShuffledRDD[14] at partitionBy at NativeMethodAccessorImpl.java:-2 []
 +-(2) PairwiseRDD[13] at sortByKey at <ipython-input-7-0fdef4f82843>:2 []
    |  PythonRDD[12] at sortByKey at <ipython-input-7-0fdef4f82843>:2 []
    |  MapPartitionsRDD[8] at mapPartitions at PythonRDD.scala:338 []
    |  ShuffledRDD[7] at partitionBy at NativeMethodAccessorImpl.java:-2 []
    +-(2) PairwiseRDD[6] at reduceByKey at <ipython-input-6-cc827b39bdfb>:3 []
       |  PythonRDD[5] at reduceByKey at <ipython-input-6-cc827b39bdfb>:3 []
       |  PythonRDD[2] at RDD at PythonRDD.scala:42 []
       |      CachedPartitions: 2; MemorySize: 238.0 B; TachyonSize: 0.0 B; DiskSize: 0.0 B
       |  file:/etc/hosts MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 []
       |  file:/etc/hosts HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2 []


In [9]:
# Lookup value of key
wc_rdd.lookup('localhost')

[4]

In [10]:
# Lookup value of key
wc_rdd_sorted.lookup('interface')

[1]