# TAREA 3: SERGIO SOLER ROCHA

## Objetivo
El objetivo de este ejercicio es implementar un benchmark en Google Cloud Platform (GCP) para evaluar el desempeño de un despliegue basado en Spark, centrándose en velocidad, rendimiento y uso de recursos. Se busca analizar de un cluster en GCP, utilizando servicios  como Dataproc (para ejecución de clústeres de Apache Spark y Apache Hadoop) y Google Cloud Storage (para almacenamiento en la nube). El benchmark permitirá extraer datos y realizar análisis para comprender las implicaciones del rendimiento en diferentes escenarios, facilitando la toma de decisiones sobre la configuración óptima y la mejora continua de la infraestructura en la nube.

El benchmark utilizado es se encuentra en la siguiente url: github.com/DIYBigData/pyspark-benchmark/

## Configuración inicial en GCP
Creamos un clúster implementado en Google Cloud Platform con Dataproc, denominado "nombre-del-cluster". A continuación, se detallan las principales características del clúster:

- Nombre del Clúster: nombre-del-cluster
- Tipo de Cluster: Standard
- Estado: En ejecución
- Región: europe-west6
- Zona: europe-west6-c
- Total de Nodos Trabajadores: 2
- VMs Flexibles: No
- Eliminación Programada: Desactivado
- Bucket de Etapa de Pruebas de Cloud Storage: dataproc-staging-europe-west6-592252024140-2eafoyog.¡

Lo configuramos con dos nodos trabajadores para distribuir eficientemente las tareas de procesamiento. La desactivación de la eliminación programada garantiza la persistencia del clúster, mientras que el almacenamiento en el bucket de etapa de pruebas de Cloud Storage facilita el manejo de datos durante el proceso de ejecución.

Ahora creamos un Bucket en Google Cloud Storage. Este Bucket será destinado para almacenar los datos y los scripts esenciales que serán utilizados en las evaluaciones de rendimiento, es decir, los script generate-data.py, benchmark-shuffle.py y benchmark-cpu.py.

El siguiente paso es Instalar la biblioteca de Google Cloud Storage para Python, desde el cmd escribimos pip install google-cloud-storage. La instalación de la biblioteca de Google Cloud Storage para Python es fundamental para habilitar la interacción y manipulación de objetos almacenados en Google Cloud Storage (GCS) a través de scripts y aplicaciones escritas en Python.


## Preparación de Datos

Para llevar a cabo las pruebas de rendimiento, se utiliza el script generate-data.py para generar conjuntos de datos de prueba, el cual es un trabajo PySpark. El archivo resultante será un archivo CSV particionado. Estos son los siguientes parámetros:

- --master spark://spark-master:7077: Opción de spark-submit que identifica la ubicación del maestro de Spark.
- --name 'generate-benchmark-test-data': Opción de spark-submit para nombrar el trabajo presentado (opcional- ).
- /path/to/test/data/file: Ruta completa donde se generará el archivo de datos de prueba. 
- -r num_rows: Número total de filas a generar. Cada fila tiene aproximadamente 75 bytes.
- -p num_partitions: Número de particiones que debería tener el archivo de datos de p

El esquema de los datos producidos por el script es el siguiente:

root
- |-- value: string (nullable = true)
- |-- prefix2: string (nullable = true)
- |-- prefix4: string (nullable = true)
- |-- prefix8: string (nullable = true)
- |-- float_val: string (nullable = true)
- |-- integer_val: string (nullable = true)


In [None]:
# Hacemos login con nuestra cuenta de Google Cloud
!gcloud auth login

In [2]:
# Establecemos el proyecto predeterminado en el entorno actual de Google Cloud SDK
!gcloud config set project practica-3-411314


Updated property [core/project].


La siguiente línea de código envía un trabajo PySpark al clúster de Dataproc con el script "generate-data.py", generando 1000000 filas de datos particionadas en 100 particiones y guardando el resultado en un archivo CSV en el bucket creado anteriormente de Google Cloud Storage.

In [15]:
!gcloud dataproc jobs submit pyspark \
  --cluster=nombre-del-cluster \
  --region=europe-west6 \
  gs://bucket_1990/generate-data.py \
  -- gs://bucket_1990/Data/datos_generados.csv -r 1000000 -p 100


done: true
driverControlFilesUri: gs://dataproc-staging-europe-west6-592252024140-2eafoyog/google-cloud-dataproc-metainfo/33b9567d-8933-4b45-b085-d8e6f9dd801d/jobs/092c8be3b8ab46849c8f4acd700d5b40/
driverOutputResourceUri: gs://dataproc-staging-europe-west6-592252024140-2eafoyog/google-cloud-dataproc-metainfo/33b9567d-8933-4b45-b085-d8e6f9dd801d/jobs/092c8be3b8ab46849c8f4acd700d5b40/driveroutput
jobUuid: 811f1614-a002-3353-a983-1fd76ea2df94
placement:
  clusterName: nombre-del-cluster
  clusterUuid: 33b9567d-8933-4b45-b085-d8e6f9dd801d
pysparkJob:
  args:
  - gs://bucket_1990/Data/datos_generados.csv
  - -r
  - '1000000'
  - -p
  - '100'
  mainPythonFileUri: gs://bucket_1990/generate-data.py
reference:
  jobId: 092c8be3b8ab46849c8f4acd700d5b40
  projectId: practica-3-411314
status:
  state: DONE
  stateStartTime: '2024-01-19T11:35:58.777993Z'
statusHistory:
- state: PENDING
  stateStartTime: '2024-01-19T11:35:05.915519Z'
- state: SETUP_DONE
  stateStartTime: '2024-01-19T11:35:05.961196

Job [092c8be3b8ab46849c8f4acd700d5b40] submitted.
Waiting for job output...
24/01/19 11:35:11 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/01/19 11:35:11 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/01/19 11:35:11 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/01/19 11:35:11 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
24/01/19 11:35:11 INFO org.sparkproject.jetty.util.log: Logging initialized @4064ms to org.sparkproject.jetty.util.log.Slf4jLog
24/01/19 11:35:12 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_392-b08
24/01/19 11:35:12 INFO org.sparkproject.jetty.server.Server: Started @4183ms
24/01/19 11:35:12 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@53a11f40{HTTP/1.1, (http/1.1)}{0.0.0.0:37771}
24/01/19 11:35:12 INFO org.apache.hadoop.yarn.client.RMPro

## Ejecución del Benchmark en Dataproc: benchmark-shuffle.py
El script benchmark-shuffle.py se centra en evaluar el rendimiento de operaciones comunes de PySpark en marcos de datos que desencadenan una operación de "shuffle". Las operaciones de prueba incluyen:

- Group By and Aggregate: Agrupación y agregación de datos.
- Repartition: Reorganización de datos entre particiones.
- Inner Join: Operación de unión interna entre dos conjuntos de datos.
- Broadcast Inner Join: Unión interna con transmisión de datos pequeños.

A continuación ejecutamos el script benchmark-shuffle.py sobre los datos generados en el apartado anterior. El número de particiones que se utilizarán durante la prueba de repartición será 200.

In [16]:
!gcloud dataproc jobs submit pyspark \
  --cluster=nombre-del-cluster \
  --region=europe-west6 \
  gs://bucket_1990/benchmark-shuffle.py \
  -- gs://bucket_1990/Data/datos_generados.csv -n 'shuffle-benchmark' -r 200 -o gs://bucket_1990/benchmark_results/shuffle_results



done: true

Job [c18ad85312d441ba9d175d9ea1cbe3a9] submitted.
Waiting for job output...
24/01/19 11:36:54 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/01/19 11:36:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/01/19 11:36:54 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/01/19 11:36:54 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
24/01/19 11:36:54 INFO org.sparkproject.jetty.util.log: Logging initialized @3835ms to org.sparkproject.jetty.util.log.Slf4jLog
24/01/19 11:36:54 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_392-b08
24/01/19 11:36:54 INFO org.sparkproject.jetty.server.Server: Started @3975ms
24/01/19 11:36:54 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@461e3540{HTTP/1.1, (http/1.1)}{0.0.0.0:41809}
24/01/19 11:36:55 INFO org.apache.hadoop.yarn.client.RMPro


driverControlFilesUri: gs://dataproc-staging-europe-west6-592252024140-2eafoyog/google-cloud-dataproc-metainfo/33b9567d-8933-4b45-b085-d8e6f9dd801d/jobs/c18ad85312d441ba9d175d9ea1cbe3a9/
driverOutputResourceUri: gs://dataproc-staging-europe-west6-592252024140-2eafoyog/google-cloud-dataproc-metainfo/33b9567d-8933-4b45-b085-d8e6f9dd801d/jobs/c18ad85312d441ba9d175d9ea1cbe3a9/driveroutput
jobUuid: a9806a9c-e378-3d10-b826-d43e1198124c
placement:
  clusterName: nombre-del-cluster
  clusterUuid: 33b9567d-8933-4b45-b085-d8e6f9dd801d
pysparkJob:
  args:
  - gs://bucket_1990/Data/datos_generados.csv
  - -n
  - "'shuffle-benchmark'"
  - -r
  - '200'
  - -o
  - gs://bucket_1990/benchmark_results/shuffle_results
  mainPythonFileUri: gs://bucket_1990/benchmark-shuffle.py
reference:
  jobId: c18ad85312d441ba9d175d9ea1cbe3a9
  projectId: practica-3-411314
status:
  state: DONE
  stateStartTime: '2024-01-19T11:37:58.817563Z'
statusHistory:
- state: PENDING
  stateStartTime: '2024-01-19T11:36:49.203248

### Resultados de la ejecución:
- **Group By test time** = 15.213665729999775 seconds: Indica el tiempo que ha tomado ejecutar la operación de agrupación (Group By). En este caso, la operación de agrupación ha tomado aproximadamente 15.21 segundos.
 
- **Repartition test time** = 8.370773898001062 seconds (200 partitions): Muestra el tiempo de ejecución de la operación de repartición (Repartition). Además, indica que durante esta prueba se utilizó un total de 200 particiones. En este caso, a la operación de repartición le ha llevado aproximadamente 8.37 segundos

- **Inner join test time** = 14.873300867009675 seconds: Indica el tiempo que tomó ejecutar la operación de unión interna (Inner Join). En este caso, la operación de unión interna ha tardado aproximadamente 14.87 segundos.

- **Broadcast inner join time** = 6.686923230998218 seconds: Representa el tiempo de ejecución de la operación de unión interna mediante difusión (Broadcast Inner Join). Aquí, la operación de unión interna mediante difusión ha tomado aproximadamente 6.69 segundos.

## Ejecución del Benchmark en Dataproc: benchmark-cpu.py
El benchmark de CPU tiene como objetivo evaluar operaciones de PySpark que están principalmente vinculadas a la CPU. A diferencia de otras pruebas, este benchmark se centra en evaluar la velocidad de la CPU y la eficiencia de las tareas, sin considerar E/S de disco o red significativas. Las operaciones de prueba incluyen:

- SHA-512 hashing de una cadena.
- Estimación de Pi con muestras aleatorias y una función Python definida por el usuario.
- Estimación de Pi con muestras aleatorias y solo funciones nativas de Spark.

Cada operación se cronometra de forma independiente.

Los resultados proporcionan información valiosa sobre el rendimiento de las operaciones de CPU en PySpark, y los tiempos se pueden comparar entre ejecuciones para evaluar la eficiencia del entorno PySpark.

In [17]:
!gcloud dataproc jobs submit pyspark \
    --cluster=nombre-del-cluster \
    --region=europe-west6 \
    gs://bucket_1990/benchmark-cpu.py \
    -- gs://bucket_1990/Data/datos_generados.csv -s 5000000000 -p 10 -n 'cpu-benchmark' -o gs://bucket_1990/benchmark_results/cpu_results

done: true
driverControlFilesUri: gs://dataproc-staging-europe-west6-592252024140-2eafoyog/google-cloud-dataproc-metainfo/33b9567d-8933-4b45-b085-d8e6f9dd801d/jobs/dbe21945b5014deb89450c61b3fd0613/
driverOutputResourceUri: gs://dataproc-staging-europe-west6-592252024140-2eafoyog/google-cloud-dataproc-metainfo/33b9567d-8933-4b45-b085-d8e6f9dd801d/jobs/dbe21945b5014deb89450c61b3fd0613/driveroutput
jobUuid: 9a322f01-84b1-3c8b-8c2a-3a1345af6c03
placement:
  clusterName: nombre-del-cluster
  clusterUuid: 33b9567d-8933-4b45-b085-d8e6f9dd801d
pysparkJob:
  args:
  - gs://bucket_1990/Data/datos_generados.csv
  - -s
  - '5000000000'
  - -p
  - '10'
  - -n
  - "'cpu-benchmark'"
  - -o
  - gs://bucket_1990/benchmark_results/cpu_results
  mainPythonFileUri: gs://bucket_1990/benchmark-cpu.py
reference:
  jobId: dbe21945b5014deb89450c61b3fd0613
  projectId: practica-3-411314
status:
  state: DONE
  stateStartTime: '2024-01-19T11:59:24.239727Z'
statusHistory:
- state: PENDING
  stateStartTime: '2024-

Job [dbe21945b5014deb89450c61b3fd0613] submitted.
Waiting for job output...
24/01/19 11:43:22 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/01/19 11:43:22 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/01/19 11:43:22 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/01/19 11:43:22 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
24/01/19 11:43:22 INFO org.sparkproject.jetty.util.log: Logging initialized @3876ms to org.sparkproject.jetty.util.log.Slf4jLog
24/01/19 11:43:22 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_392-b08
24/01/19 11:43:22 INFO org.sparkproject.jetty.server.Server: Started @4014ms
24/01/19 11:43:22 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@60110b72{HTTP/1.1, (http/1.1)}{0.0.0.0:35987}
24/01/19 11:43:23 INFO org.apache.hadoop.yarn.client.RMPro

### Resultados de la ejecución
Los resultados del benchmark de CPU muestran el tiempo que tardó cada operación en ejecutarse:

- **SHA-512 benchmark time**: Este resultado indica el tiempo que le ha llevado calcular 100,000 hashes SHA-512. En este caso, ha sido de aproximadamente 14.61 segundos. Esta prueba mide la velocidad de procesamiento para operaciones intensivas en CPU, como la generación de hashes.

- **Calculate Pi benchmark**: Para esta prueba, se han tomado 5 mil millones de muestras para estimar el valor de Pi. El resultado muestra que ha tardado alrededor de 852.47 segundos (algo más de 14 minutos) realizar esta operación.

- **Calculate Pi benchmark using dataframe**: Similar a la prueba anterior, esta vez, la estimación de Pi se ha realizado utilizando manipulaciones de DataFrame. Ha tomado aproximadamente 74.67 segundos. Esto proporciona una comparación entre el rendimiento de las operaciones puras de PySpark y aquellas que involucran manipulaciones de DataFrame.

In [18]:
# Información sobre el cluster
!gcloud dataproc clusters describe nombre-del-cluster --region=europe-west6

clusterName: nombre-del-cluster
clusterUuid: 33b9567d-8933-4b45-b085-d8e6f9dd801d
config:
  configBucket: dataproc-staging-europe-west6-592252024140-2eafoyog
  endpointConfig: {}
  gceClusterConfig:
    internalIpOnly: false
    networkUri: https://www.googleapis.com/compute/v1/projects/practica-3-411314/global/networks/default
    serviceAccountScopes:
    - https://www.googleapis.com/auth/bigquery
    - https://www.googleapis.com/auth/bigtable.admin.table
    - https://www.googleapis.com/auth/bigtable.data
    - https://www.googleapis.com/auth/cloud.useraccounts.readonly
    - https://www.googleapis.com/auth/devstorage.full_control
    - https://www.googleapis.com/auth/devstorage.read_write
    - https://www.googleapis.com/auth/logging.write
    - https://www.googleapis.com/auth/monitoring.write
    zoneUri: https://www.googleapis.com/compute/v1/projects/practica-3-411314/zones/europe-west6-c
  masterConfig:
    diskConfig:
      bootDiskSizeGb: 1000
      bootDiskType: pd-standard
 

## Conclusiones
El tiempo de todas las pruebas es mayor que los valores de referencia que se indican en el siguiente enlace https://openbenchmarking.org/test/pts/spark&eval=8dbf91eee81b8c1295c3d871d534bd6055fa787b#metrics (1000000 rows y 100 partitions), lo que indica un bajo rendimiento del cluster creado. Sería interesante ajustar la configuración del clúster, como la cantidad de nodos o tipo de máquinas virtuales, para mejorar el rendimiento en las pruebas mencionadas, pero debido a las limitaciones de la cuenta esto no ha sido posible.
	
Como conclusión final, la implementación de benchmarks en el clúster de GCS proporciona una base valiosa para la optimización continua y la toma de decisiones informadas para garantizar un rendimiento eficiente y rentable.