In [1]:
from pyspark.sql import SparkSession
import tarfile
import os
import py4j.protocol  
from py4j.protocol import Py4JJavaError  
from py4j.java_gateway import JavaObject  
from py4j.java_collections import JavaArray, JavaList
import pyspark.sql.functions as func
from pyspark import RDD, SparkContext  
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.sql.types import StringType,ByteType

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
ruta_archivo_tar = "/home/jovyan/work/yelp_dataset.tar"

directorio_extraccion = "/home/jovyan/work/yelp_dataset/" 

with tarfile.open(ruta_archivo_tar, "r") as tar:
    tar.extractall(directorio_extraccion)

In [4]:
archivos_extraidos = [os.path.join(directorio_extraccion, nombre) for nombre in os.listdir(directorio_extraccion)]
archivos_extraidos

['/home/jovyan/work/yelp_dataset/yelp_academic_dataset_checkin.json',
 '/home/jovyan/work/yelp_dataset/Dataset_User_Agreement.pdf',
 '/home/jovyan/work/yelp_dataset/yelp_academic_dataset_tip.json',
 '/home/jovyan/work/yelp_dataset/review.parquet',
 '/home/jovyan/work/yelp_dataset/yelp_academic_dataset_review.json',
 '/home/jovyan/work/yelp_dataset/.ipynb_checkpoints',
 '/home/jovyan/work/yelp_dataset/yelp_academic_dataset_business.json',
 '/home/jovyan/work/yelp_dataset/yelp_academic_dataset_review.parquet',
 '/home/jovyan/work/yelp_dataset/yelp_academic_dataset_user.json']

1. Toma el archivo `review.json` JSON y cuantífica cuánto pesa el archivo en   disco.

In [5]:
ruta_archivo_json = "/home/jovyan/work/yelp_dataset/yelp_academic_dataset_review.json"
tamaño_bytes = os.path.getsize(ruta_archivo_json)
tamaño_gb = tamaño_bytes / (1000 * 1000 * 1000)
print(f"El tamaño del archivo review.json es aproximadamente {tamaño_gb:.2f} GB")

El tamaño del archivo review.json es aproximadamente 5.34 GB


2. Carga el JSON en Spark y cuantífica cuánto pesa el DataFramen en memoria RAM.

In [6]:
df = spark.read.json("/home/jovyan/work/yelp_dataset/yelp_academic_dataset_review.json")
df.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

In [7]:
df_cached = df.cache()
stats = df_cached._jdf.queryExecution().optimizedPlan().stats()
df_ram_size_bytes = stats.sizeInBytes()
df_ram_size_gb = df_ram_size_bytes / (1000 * 1000 * 1000)
print(f"El tamaño del DataFrame en memoria (RAM) es aproximadamente {df_ram_size_gb:.2f} GB")

El tamaño del DataFrame en memoria (RAM) es aproximadamente 5.34 GB


3. Guarda el DataFrame como parquet en disco y muestra cuánto pesa el archivo.
   Cómo se compara con el JSON crudo.

In [None]:
df.write.parquet("/home/jovyan/work/yelp_dataset/yelp_academic_dataset_review.parquet")

El archivo pesa 5.34 en JSON y 3.12 en parquet

4. Utiliza el DataFrame, optimiza el tipo de dato que hay en cada columna (i.e. 
   `Int32`, `Int64`, `Float32`, `Float64`, `String`, `Categorical`) y guarda el 
   nuevo DataFrame como parquet. Cuántifica cuánto pesa el DataFrame en memoria 
   RAM y cuánto pesa en disco. Cómo se compara con el parquet crudo.

In [8]:
df = df.withColumn("stars", func.col("stars").cast("float"))
df = df.withColumn("useful", func.col("useful").cast(ByteType()))
df = df.withColumn("funny", func.col("funny").cast(ByteType()))
df = df.withColumn("cool", func.col("cool").cast(ByteType()))
df = df.withColumn("date", func.to_date("date"))

In [9]:
df_cached = df.cache()
stats = df_cached._jdf.queryExecution().optimizedPlan().stats()
df_ram_size_bytes = stats.sizeInBytes()
df_ram_size_gb = df_ram_size_bytes / (1000 * 1000 * 1000)
print(f"El tamaño del DataFrame en memoria (RAM) es aproximadamente {df_ram_size_gb:.2f} GB")

El tamaño del DataFrame en memoria (RAM) es aproximadamente 3.78 GB


El tamaño en discto es 3GB

5. Utiliza el DataFrame optimizado, guarda en parquet una nueva versión del DataFrame y particionalo por fecha (date). Otra versión por ciudad. Otra por ciudad y fecha

In [10]:
columnas = df.columns
print("Columnas en el DataFrame:", columnas)


Columnas en el DataFrame: ['business_id', 'cool', 'date', 'funny', 'review_id', 'stars', 'text', 'useful', 'user_id']


In [11]:
df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: byte (nullable = true)
 |-- date: date (nullable = true)
 |-- funny: byte (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: float (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: byte (nullable = true)
 |-- user_id: string (nullable = true)



In [12]:
df.write.partitionBy("date").parquet("/ruta/a/directorio/particionado_por_fecha")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <exception str() failed>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  

Py4JError: py4j does not exist in the JVM

In [None]:
df.write.partitionBy("state").parquet("/ruta/a/directorio/particionado_por_ciudad")


In [None]:
df.write.partitionBy("date", "state").parquet("/ruta/a/directorio/particionado_por_fecha_y_ciudad")



ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


6. Ejecuta un query utilizando sobre la tabla filtrando por una de las ciudades y un años en particular. Registra el tiempo de ejecución. Aplica el query sobre

In [None]:
import time

ciudad_filtrar = "STATE"  
año_filtrar = 2017 

df = df.withColumn("date", df.date.cast("date"))
filtro = (df.city == ciudad_filtrar) & (df.date.year == año_filtrar)
df_filtrado = df.filter(filtro)
start_time = time.time()
resultados = df_filtrado.show(5)
tiempo_ejecucion = time.time() - start_time

print(f"El query se ejecutó en {tiempo_ejecucion:.2f} segundos.")
