# Spark Fundamentals I - Introduction to Spark

Python - Working with RDD operations

## - Analyzing a log file

In [2]:
logFile = sc.textFile("/home/jane/Downloads/LabData/notebook.log")

In [3]:
sc.version

'3.0.0-preview2'

In [4]:
type(logFile)

pyspark.rdd.RDD

In [5]:
logFile.collect()

['[I 12:09:13.491 NotebookApp] Using MathJax: /static/vendor/MathJax-2.5-latest/MathJax.js',
 "[I 12:09:13.494 NotebookApp] Using existing profile dir: u'/home/notebook/.ipython/profile_default'",
 '[I 12:09:13.513 NotebookApp] Writing notebook server cookie secret to /home/notebook/.ipython/profile_default/security/notebook_cookie_secret',
 '[I 12:09:13.586 NotebookApp] Serving notebooks from local directory: /resources',
 '[I 12:09:13.586 NotebookApp] 0 active kernels ',
 '[I 12:09:13.586 NotebookApp] The IPython Notebook is running at: http://[all ip addresses on your system]:8888/',
 '[I 12:09:13.586 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).',
 '[W 12:09:13.586 NotebookApp] No web browser found: could not locate runnable browser.',
 '[I 05:23:31.876 NotebookApp] Using MathJax: /static/vendor/MathJax-2.5-latest/MathJax.js',
 "[I 05:23:31.878 NotebookApp] Using existing profile dir: u'/home/notebook/.ipython/profile_default

In [6]:
logFile.count()

34836

In [9]:
# Filter out the lines that contains INFO
info = logFile.filter(lambda line: "INFO" in line)
info.count()

13438

In [11]:
# Count the lines with "spark" in it by combining transformation and action.
info.filter(lambda line: "spark" in line).count()

156

In [12]:
# Fetch those lines as an array of Strings
info.filter(lambda line: "spark" in line).collect()

['15/10/14 14:29:23 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.22:53333]',
 "15/10/14 14:29:23 INFO Utils: Successfully started service 'sparkDriver' on port 53333.",
 '15/10/14 14:29:23 INFO DiskBlockManager: Created local directory at /tmp/spark-fe150378-7bad-42b6-876b-d14e2c193eb6/blockmgr-c142f2f1-ebb6-4612-945b-0a67d156230a',
 '15/10/14 14:29:23 INFO HttpFileServer: HTTP File server directory is /tmp/spark-fe150378-7bad-42b6-876b-d14e2c193eb6/httpd-ed3f4ab0-7218-48bc-9d8a-3981b1cfe574',
 "15/10/14 14:29:24 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35726.",
 '15/10/15 15:33:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.17.0.22:47412]',
 "15/10/15 15:33:42 INFO Utils: Successfully started service 'sparkDriver' on port 47412.",
 '15/10/15 15:33:42 INFO DiskBlockManager: Created local directory at /tmp/spark-fc035223-3b43-43d1-8d7d-

In [13]:
# View the graph of an RDD using this command:
print(info.toDebugString())

b'(2) PythonRDD[8] at RDD at PythonRDD.scala:53 []\n |  /home/jane/Downloads/LabData/notebook.log MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n |  /home/jane/Downloads/LabData/notebook.log HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []'


## - Joining RDDs

In [22]:
# Create RDDs for the same README and the POM files
readmeFile = sc.textFile("/home/jane/Downloads/LabData/README.md")
pomFile = sc.textFile("/home/jane/Downloads/LabData/pom.xml")

In [25]:
print(readmeFile.filter(lambda line: "Spark" in line).count())
print(pomFile.filter(lambda line: "Spark" in line).count())

18
2


In [26]:
# Now do a WordCount on each RDD so that the results are (K,V) pairs of (word,count)
wordCountREADME = readmeFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

In [27]:
wordCountPOM = pomFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

In [None]:
# or
readmeCount = readmeFile.                    \
    flatMap(lambda line: line.split("   ")).   \
    map(lambda word: (word, 1)).             \
    reduceByKey(lambda a, b: a + b)
    
pomCount = pomFile.                          \
    flatMap(lambda line: line.split("   ")).   \
    map(lambda word: (word, 1)).            \
    reduceByKey(lambda a, b: a + b)

In [28]:
wordCountREADME.collect()

[('#', 1),
 ('Apache', 1),
 ('Spark', 14),
 ('is', 6),
 ('It', 2),
 ('provides', 1),
 ('high-level', 1),
 ('APIs', 1),
 ('in', 5),
 ('Scala,', 1),
 ('Java,', 1),
 ('an', 3),
 ('optimized', 1),
 ('engine', 1),
 ('supports', 2),
 ('computation', 1),
 ('analysis.', 1),
 ('set', 2),
 ('of', 5),
 ('tools', 1),
 ('SQL', 2),
 ('MLlib', 1),
 ('machine', 1),
 ('learning,', 1),
 ('GraphX', 1),
 ('graph', 1),
 ('processing,', 1),
 ('Documentation', 1),
 ('latest', 1),
 ('programming', 1),
 ('guide,', 1),
 ('[project', 2),
 ('README', 1),
 ('only', 1),
 ('basic', 1),
 ('instructions.', 1),
 ('Building', 1),
 ('using', 2),
 ('[Apache', 1),
 ('run:', 1),
 ('do', 2),
 ('this', 1),
 ('downloaded', 1),
 ('documentation', 3),
 ('project', 1),
 ('site,', 1),
 ('at', 2),
 ('Spark"](http://spark.apache.org/docs/latest/building-spark.html).', 1),
 ('Interactive', 2),
 ('Shell', 2),
 ('The', 1),
 ('way', 1),
 ('start', 1),
 ('Try', 1),
 ('following', 2),
 ('1000:', 2),
 ('scala>', 1),
 ('1000).count()', 1),


In [30]:
# Or
print("Readme Count\n")
print(wordCountREADME.collect())

Readme Count

[('#', 1), ('Apache', 1), ('Spark', 14), ('is', 6), ('It', 2), ('provides', 1), ('high-level', 1), ('APIs', 1), ('in', 5), ('Scala,', 1), ('Java,', 1), ('an', 3), ('optimized', 1), ('engine', 1), ('supports', 2), ('computation', 1), ('analysis.', 1), ('set', 2), ('of', 5), ('tools', 1), ('SQL', 2), ('MLlib', 1), ('machine', 1), ('learning,', 1), ('GraphX', 1), ('graph', 1), ('processing,', 1), ('Documentation', 1), ('latest', 1), ('programming', 1), ('guide,', 1), ('[project', 2), ('README', 1), ('only', 1), ('basic', 1), ('instructions.', 1), ('Building', 1), ('using', 2), ('[Apache', 1), ('run:', 1), ('do', 2), ('this', 1), ('downloaded', 1), ('documentation', 3), ('project', 1), ('site,', 1), ('at', 2), ('Spark"](http://spark.apache.org/docs/latest/building-spark.html).', 1), ('Interactive', 2), ('Shell', 2), ('The', 1), ('way', 1), ('start', 1), ('Try', 1), ('following', 2), ('1000:', 2), ('scala>', 1), ('1000).count()', 1), ('Python', 2), ('Alternatively,', 1), ('use

In [29]:
wordCountPOM.collect()

[('<?xml', 1),
 ('version="1.0"', 1),
 ('Apache', 2),
 ('more', 1),
 ('NOTICE', 1),
 ('this', 3),
 ('work', 1),
 ('additional', 1),
 ('regarding', 1),
 ('copyright', 1),
 ('The', 2),
 ('2.0', 1),
 ('(the', 1),
 ('"License");', 1),
 ('may', 2),
 ('use', 1),
 ('in', 3),
 ('compliance', 1),
 ('License.', 2),
 ('obtain', 1),
 ('of', 2),
 ('at', 1),
 ('law', 1),
 ('is', 2),
 ('an', 1),
 ('"AS', 1),
 ('IS"', 1),
 ('BASIS,', 1),
 ('CONDITIONS', 1),
 ('OF', 1),
 ('KIND,', 1),
 ('specific', 1),
 ('language', 1),
 ('limitations', 1),
 ('-->', 7),
 ('xmlns="http://maven.apache.org/POM/4.0.0"', 1),
 ('http://maven.apache.org/xsd/maven-4.0.0.xsd">', 1),
 ('<modelVersion>4.0.0</modelVersion>', 1),
 ('<artifactId>spark-parent_2.10</artifactId>', 1),
 ('<relativePath>../pom.xml</relativePath>', 1),
 ('</parent>', 1),
 ('<sbt.project.name>examples</sbt.project.name>', 1),
 ('Project', 1),
 ('Examples</name>', 1),
 ('<dependencies>', 2),
 ('<version>${project.version}</version>', 12),
 ('<artifactId>spa

In [32]:
# or
print("Pom Count\n")
print(wordCountPOM.collect())

Pom Count

[('<?xml', 1), ('version="1.0"', 1), ('Apache', 2), ('more', 1), ('NOTICE', 1), ('this', 3), ('work', 1), ('additional', 1), ('regarding', 1), ('copyright', 1), ('The', 2), ('2.0', 1), ('(the', 1), ('"License");', 1), ('may', 2), ('use', 1), ('in', 3), ('compliance', 1), ('License.', 2), ('obtain', 1), ('of', 2), ('at', 1), ('law', 1), ('is', 2), ('an', 1), ('"AS', 1), ('IS"', 1), ('BASIS,', 1), ('CONDITIONS', 1), ('OF', 1), ('KIND,', 1), ('specific', 1), ('language', 1), ('limitations', 1), ('-->', 7), ('xmlns="http://maven.apache.org/POM/4.0.0"', 1), ('http://maven.apache.org/xsd/maven-4.0.0.xsd">', 1), ('<modelVersion>4.0.0</modelVersion>', 1), ('<artifactId>spark-parent_2.10</artifactId>', 1), ('<relativePath>../pom.xml</relativePath>', 1), ('</parent>', 1), ('<sbt.project.name>examples</sbt.project.name>', 1), ('Project', 1), ('Examples</name>', 1), ('<dependencies>', 2), ('<version>${project.version}</version>', 12), ('<artifactId>spark-streaming_${scala.binary.version

### *The join function combines the two datasets (K,V) and (K,W) together and get (K, (V,W)). Let's join these two counts together.*

In [33]:
# Join function 
joined = wordCountREADME.join(wordCountPOM)

In [34]:
joined.collect()

[('Apache', (1, 2)),
 ('Spark', (14, 1)),
 ('is', (6, 2)),
 ('in', (5, 3)),
 ('an', (3, 1)),
 ('of', (5, 2)),
 ('this', (1, 3)),
 ('at', (2, 1)),
 ('The', (1, 2)),
 ('following', (2, 1)),
 ('use', (3, 1)),
 ('are', (1, 1)),
 ('uses', (1, 1)),
 ('a', (10, 1)),
 ('and', (10, 1)),
 ('for', (12, 2)),
 ('that', (3, 1)),
 ('You', (3, 2)),
 ('the', (21, 10)),
 ('on', (6, 1)),
 ('file', (1, 3)),
 ('not', (1, 1)),
 ('to', (14, 5)),
 ('you', (4, 1)),
 ('which', (2, 1)),
 ('with', (4, 2)),
 ('one', (2, 1)),
 ('be', (2, 1)),
 ('or', (3, 3)),
 ('See', (1, 2))]

In [35]:
# Let's combine the values together to get the total count
joinedSum = joined.map(lambda k: (k[0], (k[1][0]+k[1][1])))

In [36]:
print("Joined Individial\n")
print(joined.take(5))

print("\n\nJoined Sum\n")
print(joinedSum.take(5))

Joined Individial

[('Apache', (1, 2)), ('Spark', (14, 1)), ('is', (6, 2)), ('in', (5, 3)), ('an', (3, 1))]


Joined Sum

[('Apache', 3), ('Spark', 15), ('is', 8), ('in', 8), ('an', 4)]


## - Shared variables

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of shared variables for two common usage patterns: broadcast variables and accumulators.

Normalmente, quando uma função passada para uma operação do Spark (como mapear ou reduzir) é executada em um nó de cluster remoto, ela funciona em cópias separadas de todas as variáveis usadas na função. Essas variáveis são copiadas para cada máquina, e nenhuma atualização das variáveis na máquina remota é propagada de volta para o programa do driver. O suporte a variáveis compartilhadas gerais de leitura e gravação entre as tarefas seria ineficiente. No entanto, o Spark fornece dois tipos limitados de variáveis compartilhadas para dois padrões de uso comuns: variáveis de transmissão e acumuladores.

### Broadcast variables

Broadcast variables are useful for when you have a large dataset that you want to use across all the worker nodes. A read-only variable is cached on each machine rather than shipping a copy of it with tasks. Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage.

More here: http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables


Variáveis de transmissão são úteis quando você tem um grande conjunto de dados que deseja usar em todos os nós de trabalho. Uma variável somente leitura é armazenada em cache em cada máquina, em vez de enviar uma cópia dela com as tarefas. As ações do Spark são executadas por meio de um conjunto de estágios, separados por operações "shuffle" distribuídas. O Spark transmite automaticamente os dados comuns necessários para as tarefas em cada está

In [37]:
# Create a broadcast variable
broadcastVar = sc.broadcast([1,2,3])

In [38]:
broadcastVar.value

[1, 2, 3]

### Accumulators

Accumulators are variables that can only be added through an associative operation. It is used to implement counters and sum efficiently in parallel. Spark natively supports numeric type accumulators and standard mutable collections. Programmers can extend these for new types. Only the driver can read the values of the accumulators. The workers can only invoke it to increment the value.

Acumuladores são variáveis que só podem ser adicionadas por meio de uma operação associativa. É usado para implementar contadores e somar de forma eficiente em paralelo. O Spark oferece suporte nativo a acumuladores de tipo numérico e coleções mutáveis padrão. Os programadores podem estendê-los para novos tipos. Apenas o driver pode ler os valores dos acumuladores. Os workers só podem invocá-lo para incrementar o valor.

In [40]:
# Create the accumulator variable
accum = sc.accumulator(0)

In [41]:
# Next parallelize an array of four integers and run it through a loop to add each integer value to 
# the accumulator variable
rdd = sc.parallelize([1,2,3,4])
def f(x):
    global accum 
    accum += x

In [42]:
# Next, iterate through each element of the rdd and apply the function f on it:
rdd.foreach(f)

In [43]:
# To get the current value of the accumulator variable, type in:
accum.value

10

This command can only be invoked on the driver side. The worker nodes can only increment the accumulator

In [44]:
# Create a key-value pair of two characters
pair = ('a', 'b')

In [45]:
# To access the value of the first index use [0] and [1] method for the 2nd.
print(pair[0])
print(pair[1])

a
b


# End