## Análisis de un log

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('rdd-example').getOrCreate()
sc = spark.sparkContext

21/11/30 23:39:51 WARN Utils: Your hostname, jesus-Aspire-A514-52 resolves to a loopback address: 127.0.1.1; using 192.168.1.54 instead (on interface wlp2s0)
21/11/30 23:39:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/30 23:39:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/11/30 23:39:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Creamos el RDD leyendo el fichero de log

In [2]:
logFile = sc.textFile("LabData/notebook.log")

### <span style="color: red">DEBES ESCRIBIR TU CÓDIGO DONDE LO INDIQUE:</span> 

#### En la celda de debajo, escribe el código necesario para filtrar todas las línesa que contiene la palabra "INFO"

In [3]:
info = logFile.filter(lambda line: "INFO" in line)


#### Cuenta las líneas que hay:

In [4]:
lines = info.count()
lines



13438

#### Cuenta las líneas que contienen "spark" combinando una transformación y una acción

In [5]:
spark_lines = info.filter(lambda line: "spark" in line).count()
spark_lines


156

#### Recupera esas líneas en un nuevo RDD

In [6]:
spark_rdd = info.filter(lambda line: "spark" in line).collect()

Muestra el grafo (DAG) asociado al RDD

In [7]:
info.toDebugString()

b'(2) PythonRDD[5] at RDD at PythonRDD.scala:53 []\n |  LabData/notebook.log MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []\n |  LabData/notebook.log HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []'

## Unir RDDs

Vamos a crear dos RDDs para los ficheros README.md y POM.xml

In [8]:
readmeFile = sc.textFile("LabData/README.md")
pomFile = sc.textFile("LabData/pom.xml")

Contar cuantas veces aparece la palabra "Spark" en cada fichero

In [10]:
spark_readme = readmeFile.filter(lambda line: "spark" in line).count()
spark_pom = readmeFile.filter(lambda line: "spark" in line).count()

spark_readme, spark_pom

(11, 11)

Contar el número de palabras distintas que aparecen en cada uno de los dos ficheros. Los resultados son RDDs de tipo PairRDD (Key, Value) de la forma (palabra, número de veces que aparece)

In [13]:
readmeCount = readmeFile.                    \
    flatMap(lambda line: line.split(" ")).   \
    map(lambda word: (word, 1)).             \
    reduceByKey(lambda a, b: a + b)
    
pomCount = pomFile.                          \
    flatMap(lambda line: line.split(" ")).   \
    map(lambda word: (word, 1)).            \
    reduceByKey(lambda a, b: a + b)

Mostrar el RDD completo usando el método collect()

In [14]:
print(readmeCount.collect())

[('#', 1), ('Apache', 1), ('Spark', 14), ('', 67), ('is', 6), ('It', 2), ('provides', 1), ('high-level', 1), ('APIs', 1), ('in', 5), ('Scala,', 1), ('Java,', 1), ('an', 3), ('optimized', 1), ('engine', 1), ('supports', 2), ('computation', 1), ('analysis.', 1), ('set', 2), ('of', 5), ('tools', 1), ('SQL', 2), ('MLlib', 1), ('machine', 1), ('learning,', 1), ('GraphX', 1), ('graph', 1), ('processing,', 1), ('Documentation', 1), ('latest', 1), ('programming', 1), ('guide,', 1), ('[project', 2), ('README', 1), ('only', 1), ('basic', 1), ('instructions.', 1), ('Building', 1), ('using', 2), ('[Apache', 1), ('run:', 1), ('do', 2), ('this', 1), ('downloaded', 1), ('documentation', 3), ('project', 1), ('site,', 1), ('at', 2), ('Spark"](http://spark.apache.org/docs/latest/building-spark.html).', 1), ('Interactive', 2), ('Shell', 2), ('The', 1), ('way', 1), ('start', 1), ('Try', 1), ('following', 2), ('1000:', 2), ('scala>', 1), ('1000).count()', 1), ('Python', 2), ('Alternatively,', 1), ('use', 3

In [15]:
print(pomCount.collect())

[('<?xml', 1), ('version="1.0"', 1), ('', 2931), ('Apache', 2), ('more', 1), ('NOTICE', 1), ('this', 3), ('work', 1), ('additional', 1), ('regarding', 1), ('copyright', 1), ('The', 2), ('2.0', 1), ('(the', 1), ('"License");', 1), ('may', 2), ('use', 1), ('in', 3), ('compliance', 1), ('License.', 2), ('obtain', 1), ('of', 2), ('at', 1), ('law', 1), ('is', 2), ('an', 1), ('"AS', 1), ('IS"', 1), ('BASIS,', 1), ('CONDITIONS', 1), ('OF', 1), ('KIND,', 1), ('specific', 1), ('language', 1), ('limitations', 1), ('-->', 7), ('xmlns="http://maven.apache.org/POM/4.0.0"', 1), ('http://maven.apache.org/xsd/maven-4.0.0.xsd">', 1), ('<modelVersion>4.0.0</modelVersion>', 1), ('<artifactId>spark-parent_2.10</artifactId>', 1), ('<relativePath>../pom.xml</relativePath>', 1), ('</parent>', 1), ('<sbt.project.name>examples</sbt.project.name>', 1), ('Project', 1), ('Examples</name>', 1), ('<dependencies>', 2), ('<version>${project.version}</version>', 12), ('<artifactId>spark-streaming_${scala.binary.versio

El método join combina dos RDDs/Datasets (K,V) y (K,W) juntos, obteniendose un RDD/Dataset de la forma (K, (V,W)).
Usa el método con los RDDs anteriores (Readme y Pom)

In [17]:
joined_rdd = readmeCount.join(pomCount)

Muestra el valor del RDD resultante

In [18]:
joined_rdd.collect()

[('Apache', (1, 2)),
 ('Spark', (14, 1)),
 ('', (67, 2931)),
 ('is', (6, 2)),
 ('in', (5, 3)),
 ('an', (3, 1)),
 ('of', (5, 2)),
 ('this', (1, 3)),
 ('at', (2, 1)),
 ('The', (1, 2)),
 ('following', (2, 1)),
 ('use', (3, 1)),
 ('are', (1, 1)),
 ('uses', (1, 1)),
 ('a', (10, 1)),
 ('and', (10, 1)),
 ('for', (12, 2)),
 ('that', (3, 1)),
 ('You', (3, 2)),
 ('the', (21, 10)),
 ('on', (6, 1)),
 ('file', (1, 3)),
 ('not', (1, 1)),
 ('to', (14, 5)),
 ('you', (4, 1)),
 ('which', (2, 1)),
 ('with', (4, 2)),
 ('one', (2, 1)),
 ('be', (2, 1)),
 ('or', (3, 3)),
 ('See', (1, 2))]

Combina los valores de las cuentas de palabras (V,W) para obtener el valor total de las veces que aparece una palabra en los dos ficheros

In [19]:
joinedSum = joined_rdd.map(lambda k: (k[0], (k[1][0]+k[1][1])))

Para comprobar si es correcto, imprime los primeros cinco elementos del RDD resultante de la unión de los dos originales y los cinco primeros elementos del último RDD calculado

In [23]:
take_joined = joined_rdd.take(5)
take_joined

[('Apache', (1, 2)),
 ('Spark', (14, 1)),
 ('', (67, 2931)),
 ('is', (6, 2)),
 ('in', (5, 3))]

In [24]:
take_sum = joinedSum.take(5)
take_sum

[('Apache', 3), ('Spark', 15), ('', 2998), ('is', 8), ('in', 8)]

## Broadcast variables y acumuladores


Más información: [http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables](http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables)

Crear una variable de tipo broadcast con el array [1,2,3]

In [25]:
broadcast_array = sc.broadcast([1,2,3])

Muestra/obtén el valor de esa variable

In [26]:
broadcast_array.value

[1, 2, 3]

### Accumulators

Crea una variable de tipo acumulador con un valor inicial 0.

In [27]:
accum = sc.accumulator(0)

Paraleliza un array de cuatro enteros y define una función que añada un valor entero a la variable de tipo acumulador

In [28]:
sc.parallelize([1,2,3,4]).foreach(lambda x: accum.add(x))

Muestra el valor de la variable de tipo acumulador

In [29]:
accum.value

10