#### Conceptos de este cuaderno :

## Con que trabaja spark?

##### DataSet(Datos para explotar, normalmente en formato CSV, JSON o similares). Los 'DataFrames'(No distribuido, mutable y no tolerante a fallos) y los 'RDD(Resilient Distributed Dataset)'(Distribuidos, inmutable y tolerante a fallos).

#### En estos primeras celdas trabajaremos con Apache Spark en modo 'Standalone', es decir, que tendremos todo corriendo en una sola máquina.

In [1]:
# Que version de Pyspark estamos usando? 

import pyspark
print(pyspark.__version__)

3.5.0


In [None]:
# Que version de Python estamos usando? 
import sys
print(sys.version)

### Optimizacion de recursos

##### PySpark utiliza memoria tanto para el almacenamiento en caché como para el procesamiento. Puedes configurar la cantidad de memoria que PySpark debe utilizar para cada uno de estos propósitos

In [24]:
#Configuración de memoria
from pyspark import SparkConf
conf = SparkConf().setAppName("EcommerceCosmeticShop").set("spark.driver.memory", "4g").set("spark.executor.memory", "4g")


#### Paralelismo y número de ejecutores: Otra configuración importante es el número de ejecutores y el nivel de paralelismo. Esto afecta cómo se divide la carga de trabajo. Puedes configurar la cantidad de ejecutores y los hilos de cada executor.

In [25]:
#Configuracion de Paralelismo y número de ejecutores 
conf.set("spark.executor.instances", "4").set("spark.executor.cores", "2")

<pyspark.conf.SparkConf at 0x7fb99f4a1c10>

In [26]:
# crear un dataframe de cliente declarando el esquema y pasando valores
import pyspark.sql.functions as F
from pyspark.sql.types import *

from pyspark.sql import SparkSession

In [27]:
# Crear una sesión de Spark
spark = SparkSession.builder.appName("EcommerceCosmeticShop").getOrCreate()

# terminal 
#Arrancamos el master en modo standalone(Arranca WebUI en 127.0.0.1:8080)
$SPARK_HOME/sbin/start-master.sh

 Arrancamos el primer esclavo
$SPARK_HOME/sbin/start-slave.sh spark://MacBook-Pro-de-Lucas-880.local:7077

In [5]:
# Leemos datos del DataSet y los escribimos en un DataFrame
df = spark.read.options(header='True', inferSchema='True').csv(['2019-Dec.csv', '2019-Nov.csv', '2019-Oct.csv', '2020-Feb.csv', '2020-Jan.csv'])
# Escribimos el DataFrame en disco en la carpeta donde se encuentra este Jupyter Notebook
df.write.mode('overwrite').csv("ecommerce-cosmetic-shop")


                                                                                

##### Detener la sesión de Spark cuando hayas terminado
spark.stop()


Comprobar que el proceso ha generado ficheros CSVs. El motivo es que Apache Spark particiona los datos para procesar en paralelo y, por lo tanto, crea un fichero por partición(También ha creado un fichero '_SUCCESS' para indicar el momento en que había acabado de exportar).

In [6]:
# Contar número de registros de un DataFrame
df.count()

                                                                                

20692840

In [18]:
# Escribir el Schema de los datos del Data Frame
df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [19]:
# Mostrar o contar los diferentes valores de una columna
df.select('event_type').distinct().show()



+----------------+
|      event_type|
+----------------+
|        purchase|
|            view|
|            cart|
|remove_from_cart|
+----------------+



                                                                                

In [9]:
# Mostrar o contar los diferentes valores de una columna
df.select('product_id').distinct().count()

                                                                                

54571

In [10]:
df.select('brand').distinct().show()



+------------+
|       brand|
+------------+
|     beautix|
|     farmona|
|  dr.gloderm|
|   profhenna|
|     philips|
|invisibobble|
|       riche|
|        oniq|
|    lebelage|
|     vilenta|
|       fancy|
|      jaguar|
|      tertio|
|    siberina|
|   koreatida|
|         jas|
|rocknailstar|
|   depilflax|
|protokeratin|
|       essie|
+------------+
only showing top 20 rows



                                                                                

In [11]:
# Obtener el primer 'product_id' del registro con 'event_type=cart'
df.select(['product_id']).filter("event_type='cart'").first()

Row(product_id=4958)

In [12]:
# Obtener los productos que se han comprado conjuntamente con el anterior registro
sesions=df.select(['user_session']).filter("event_type='cart' AND product_id=5844305").distinct().first()

In [13]:
products=df.select(['product_id']).filter("event_type='cart' AND product_id<>5844305").filter( df["user_session"].isin(sesions["user_session"]))
products.select('product_id').show()



+----------+
|product_id|
+----------+
|   5820774|
|   5820776|
|   5820777|
|   5870111|
|   5844303|
|   5820721|
|   5698887|
|   5698879|
+----------+



                                                                                

### SQL API - opcion b

In [35]:
from pyspark.sql import SparkSession

# Crear un objeto SparkSession
spark = SparkSession.builder.appName("EcommerceCosmeticShop").getOrCreate()

# Crear una vista temporal
df.createOrReplaceTempView("datasql")


In [36]:
# Ejecutar una consulta SQL y mostrar los resultados
spark.sql("SELECT * FROM datasql LIMIT 5").show()

Py4JError: An error occurred while calling o24.sql. Trace:
py4j.Py4JException: Method sql([class java.lang.String, class [Ljava.lang.Object;]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:832)



In [None]:
def myFunc(s):
  if s["brand"]=="riche" and s["event_type"]=="cart":
    return [ ( s["product_id"], 1) ]
  return []

In [None]:
lines=df.rdd.flatMap(myFunc).reduceByKey(lambda a, b: a + b)

In [None]:
for element in lines.collect(): 
  print(element)