In [1]:
from pyspark import SparkContext, SparkConf

In [2]:
conf = SparkConf().setAppName("myapp").setMaster("local")
sc = SparkContext(conf=conf)

In [3]:
sc.uiWebUrl

'http://192.168.0.19:4040'

In [4]:
# creating RDD from a collection
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
type(distData)
print(distData)

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195


In [5]:
# Creating RDD from an external dats set

In [6]:
text = sc.textFile('data.txt')
print(text.collect())
text_list = text.map(lambda s : s.split()).reduce(lambda a, b: a + b)
print(text_list)
#print(text_list.collect())

['Once created, the distributed dataset (distData) can be operated on in parallel. ', 'For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list. ', 'We describe operations on distributed datasets later on.', '', 'One important parameter for parallel collections is the number of partitions to cut the dataset into. ', 'Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.', 'Normally, Spark tries to set the number of partitions automatically based on your cluster. ', 'However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). ', 'Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.', '']
['Once', 'created,', 'the', 'distributed', 'dataset', '(distData)', 'can', 'be', 'operated', 'on', 'in', 'parallel.', 'For', 'example,', 'we', 'can', 'call', 'distData.reduce(l

In [8]:
# saving and loading sequence files
# A sequence file is a binary file where data is stored as key value pairs.
data = sc.parallelize(range(1,5)).map(lambda x: (x, x*2))
print(data.collect())
#data.saveAsSequenceFile('textfile')

[(1, 2), (2, 4), (3, 6), (4, 8)]


In [15]:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
print(lineLengths.collect())
totalLength = lineLengths.reduce(lambda a, b: a + b)

[81, 97, 56, 0, 102, 122, 91, 122, 112, 0]


In [16]:
counter = 0
rdd = sc.parallelize(range(1,5))

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

Counter value:  0


In [18]:
counter = 0
rdd = sc.parallelize(range(1,5))
print(rdd.collect())
# Wrong: Don't do this!!
def increment_counter(x):
    return x+1
rdd.foreach(increment_counter)

print("Counter value: ", counter)
print(rdd.collect())

[1, 2, 3, 4]
Counter value:  0
[1, 2, 3, 4]


In [20]:
# working with key-value pairs
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
print(counts.collect())

[('Once created, the distributed dataset (distData) can be operated on in parallel. ', 1), ('For example, we can call distData.reduce(lambda a, b: a + b) to add up the elements of the list. ', 1), ('We describe operations on distributed datasets later on.', 1), ('', 2), ('One important parameter for parallel collections is the number of partitions to cut the dataset into. ', 1), ('Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster.', 1), ('Normally, Spark tries to set the number of partitions automatically based on your cluster. ', 1), ('However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). ', 1), ('Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.', 1)]
