# Maps

En Spark, las funciones map toman datos como entrada y luego transforman esos datos con la función que se le indique. Son como instrucciones para los datos que indican cómo debe transformarse cada entrada para obtener el resultado.

La primera celda de código crea un objeto SparkContext. Con SparkContext, puedes ingresar un conjunto de datos y paralelizar los datos en un clúster (dado que actualmente estás usando Spark en modo local en una sola máquina, técnicamente el conjunto de datos aún no está distribuido).

Ejecuta la celda de código a continuación para instanciar un objeto SparkContext y luego cargar la lista log_of_songs en Spark.

In [1]:
###
# You might have noticed this code in the screencast.
#
# import findspark
# findspark.init('spark-2.3.2-bin-hadoop2.7')
#
# The findspark Python module makes it easier to install
# Spark in local mode on your computer. This is convenient
# for practicing Spark syntax locally.
# However, the workspaces already have Spark installed and you do not
# need to use the findspark module
#
###

import pyspark
sc = pyspark.SparkContext(appName="maps_and_lazy_evaluation_example")

log_of_songs = [
        "Despacito",
        "Nice for what",
        "No tears left to cry",
        "Despacito",
        "Havana",
        "In my feelings",
        "Nice for what",
        "despacito",
        "All the stars","Despacito",
        "Nice for what",
        "No tears left to cry",
        "Despacito",
        "Havana",
        "In my feelings",
        "Nice for what",
        "despacito",
        "All the stars","Despacito",
        "Nice for what",
        "No tears left to cry",
        "Despacito",
        "Havana",
        "In my feelings",
        "Nice for what",
        "despacito",
        "All the stars","Despacito",
        "Nice for what",
        "No tears left to cry",
        "Despacito",
        "Havana",
        "In my feelings",
        "Nice for what",
        "despacito",
        "All the stars"
]

# parallelize the log_of_songs to use with Spark
distributed_song_log = sc.parallelize(log_of_songs)

24/11/05 21:07:01 WARN Utils: Your hostname, MacBook-Pro-de-Jhon.local resolves to a loopback address: 127.0.0.1; using 192.168.18.9 instead (on interface en0)
24/11/05 21:07:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/05 21:07:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


La siguiente celda de código define una función que convierte el título de una canción a minúsculas. Luego, hay un ejemplo que convierte la palabra "Havana" a "havana".

In [2]:
%%time
def convert_song_to_lowercase(song):
    return song.lower()

convert_song_to_lowercase("Havana")

CPU times: user 13 µs, sys: 0 ns, total: 13 µs
Wall time: 14.8 µs


'havana'


Las siguientes celdas de código demuestran cómo aplicar esta función usando un paso de map. El paso map recorrerá cada canción en la lista y aplicará la función convert_song_to_lowercase().

In [3]:
%%time
distributed_song_log.map(convert_song_to_lowercase)

CPU times: user 830 µs, sys: 1.35 ms, total: 2.18 ms
Wall time: 2.85 ms


PythonRDD[1] at RDD at PythonRDD.scala:53


Notarás que esta celda de código se ejecutó bastante rápido. Esto se debe a la evaluación diferida. Spark no ejecuta realmente el paso map a menos que sea necesario.

"RDD" en la salida se refiere a resilient distributed dataset (conjunto de datos distribuido resiliente). Los RDDs son exactamente lo que dicen ser: conjuntos de datos tolerantes a fallos distribuidos a través de un clúster. Esta es la forma en que Spark almacena los datos.

Para que Spark realmente ejecute el paso map, necesitas usar una "acción". Una acción disponible es el método collect. El método collect() toma los resultados de todos los clústeres y los "recoge" en una lista única en el nodo maestro.

In [4]:
distributed_song_log.map(convert_song_to_lowercase).collect()

                                                                                

['despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars',
 'despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars',
 'despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars',
 'despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars']

Observa también que Spark no está cambiando el conjunto de datos original: simplemente está creando una copia. Puedes ver esto ejecutando collect() en el conjunto de datos original.

In [5]:
distributed_song_log.collect()

['Despacito',
 'Nice for what',
 'No tears left to cry',
 'Despacito',
 'Havana',
 'In my feelings',
 'Nice for what',
 'despacito',
 'All the stars',
 'Despacito',
 'Nice for what',
 'No tears left to cry',
 'Despacito',
 'Havana',
 'In my feelings',
 'Nice for what',
 'despacito',
 'All the stars',
 'Despacito',
 'Nice for what',
 'No tears left to cry',
 'Despacito',
 'Havana',
 'In my feelings',
 'Nice for what',
 'despacito',
 'All the stars',
 'Despacito',
 'Nice for what',
 'No tears left to cry',
 'Despacito',
 'Havana',
 'In my feelings',
 'Nice for what',
 'despacito',
 'All the stars']

No siempre tienes que escribir una función personalizada para el paso map. También puedes usar funciones anónimas (lambda) así como funciones integradas de Python, como string.lower().

Las funciones anónimas son en realidad una característica de Python para escribir programas de estilo funcional.

In [6]:
distributed_song_log.map(lambda song: song.lower()).collect()

['despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars',
 'despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars',
 'despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars',
 'despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars']

In [7]:
distributed_song_log.map(lambda x: x.lower()).collect()

['despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars',
 'despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars',
 'despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars',
 'despacito',
 'nice for what',
 'no tears left to cry',
 'despacito',
 'havana',
 'in my feelings',
 'nice for what',
 'despacito',
 'all the stars']

24/11/05 23:59:32 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 289687 ms exceeds timeout 120000 ms
24/11/05 23:59:32 WARN SparkContext: Killing executors is not supported by current scheduler.
24/11/05 23:59:36 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$