In [30]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, BooleanType, ArrayType
from pyspark.sql import SparkSession
import tarfile
import os
import pyspark.sql.functions as f
from pyspark.sql.functions import col

In [2]:
# Crear Spark Session
spark = SparkSession.builder.getOrCreate()

In [23]:
# Función para calcular el tamaño de un dataframe
def get_dataframe_size(df):
    # Calculate the size of each column
    col_sizes = [f.length(f.col(col_name).cast("string")).alias(col_name) for col_name in df.columns]
    # Calculate the total size
    total_size = df.select(*col_sizes).groupBy().sum().collect()[0][0]
    return total_size

In [3]:
%%time

# Extraer el archivo .tgz
with tarfile.open("./data/raw/yelp_dataset.tar", "r") as tar:
    tar.extractall(path="./data/ini/")

CPU times: user 7.44 s, sys: 54.5 s, total: 1min 1s
Wall time: 2min 2s


In [4]:
# Peso de review.json
file_size = os.path.getsize("./data/ini/yelp_academic_dataset_review.json")
print("Tamaño del archivo:", file_size, "bytes")

Tamaño del archivo: 5341868833 bytes


In [5]:
%%time

# Carga los archivos extraídos (review.json) en un DataFrame
df = spark.read.json("./data/ini/yelp_academic_dataset_review.json")


CPU times: user 16.5 ms, sys: 1.8 ms, total: 18.3 ms
Wall time: 17.2 s


In [24]:
# Calcular el tamaño del df en ram
dataframe_size = get_dataframe_size(df)
print("Tamaño en RAM:", dataframe_size, "bytes")

Tamaño en RAM: 153786160 bytes


In [6]:
%%time

# Guardar el df en parquet
df.write.parquet('review.parquet', mode="overwrite")

CPU times: user 72.4 ms, sys: 0 ns, total: 72.4 ms
Wall time: 1min 18s


In [7]:
# Peso de review.parquet
file_size = os.path.getsize("review.parquet")
print("Tamaño del archivo:", file_size, "bytes")

Tamaño del archivo: 2688 bytes


In [8]:
# Inferir el esquema inicial
initial_schema = df.schema
initial_schema

StructType([StructField('business_id', StringType(), True), StructField('cool', LongType(), True), StructField('date', StringType(), True), StructField('funny', LongType(), True), StructField('review_id', StringType(), True), StructField('stars', DoubleType(), True), StructField('text', StringType(), True), StructField('useful', LongType(), True), StructField('user_id', StringType(), True)])

In [9]:
# Definir el nuevo esquema
optimized_schema = StructType([
    StructField("review_id", StringType(), nullable=True),
    StructField("user_id", StringType(), nullable=True),
    StructField("business_id", StringType(), nullable=True),
    StructField("stars", IntegerType(), nullable=True),
    StructField("date", DateType(), nullable=True),
    StructField("text", StringType(), nullable=True),
    StructField("useful", IntegerType(), nullable=True),
    StructField("funny", IntegerType(), nullable=True),
    StructField("cool", IntegerType(), nullable=True)
])

In [10]:
%%time

# Aplicar el esquema optimizado al DataFrame
df_optimized = spark.read.schema(optimized_schema).json("./data/ini/yelp_academic_dataset_review.json")

CPU times: user 14.6 ms, sys: 0 ns, total: 14.6 ms
Wall time: 66.7 ms


In [25]:
# Calcular el tamaño del df en ram
dataframe_size = get_dataframe_size(df_optimized)
print("Tamaño en RAM:", dataframe_size, "bytes")

Tamaño en RAM: 153786160 bytes


In [11]:
%%time

# Guardar el df_optimized en parquet
df_optimized.write.parquet('review_op.parquet', mode="overwrite")

CPU times: user 91.8 ms, sys: 0 ns, total: 91.8 ms
Wall time: 1min 47s


In [12]:
# Peso del parquet review_op.parquet
file_size = os.path.getsize("review_op.parquet")
print("Tamaño del archivo:", file_size, "bytes")

Tamaño del archivo: 2688 bytes


In [13]:
%%time

# Carga los archivos extraídos business.json en un DataFrame
df_bus = spark.read.json("./data/ini/yelp_academic_dataset_business.json")
df_bus = df_bus.drop("stars")

CPU times: user 8.58 ms, sys: 0 ns, total: 8.58 ms
Wall time: 2.16 s


Una vez realizados los pasos anteriores crearemos el dataframe "df_optimized_v2" que contiene la fecha
desglozada por año y mes. Además haremos un join de ese dataframe con con df_bus.

In [14]:
%%time

# Extraer columnas de interés de df_optimized
df_optimized_v2 = (
    df_optimized
     .withColumn("year", f.year(f.col("date")))
     .withColumn("month", f.month(f.col("date")))
     .select("review_id", "user_id", "business_id", "stars", "year", "month", "date", "text", "useful", "funny", "cool")
)

CPU times: user 11.5 ms, sys: 0 ns, total: 11.5 ms
Wall time: 118 ms


In [35]:
schema_df_optimized_v2 = df_optimized_v2.schema
schema_df_optimized_v2

StructType([StructField('review_id', StringType(), True), StructField('user_id', StringType(), True), StructField('business_id', StringType(), True), StructField('stars', IntegerType(), True), StructField('year', IntegerType(), True), StructField('month', IntegerType(), True), StructField('date', DateType(), True), StructField('text', StringType(), True), StructField('useful', IntegerType(), True), StructField('funny', IntegerType(), True), StructField('cool', IntegerType(), True)])

In [15]:
%%time

# Realizar inner join en la columna 'business_id'
joined_df = df_bus.join(df_optimized_v2, "business_id", "inner")

CPU times: user 3.18 ms, sys: 0 ns, total: 3.18 ms
Wall time: 49 ms


In [20]:
%%time

# Parquet particionado por fecha
(joined_df
     .write.parquet(
         "op_df_v1", 
         mode="overwrite", 
         partitionBy=["year", "month"],
         compression="gzip"
     )
)

CPU times: user 1.02 s, sys: 0 ns, total: 1.02 s
Wall time: 10min 37s


In [17]:
%%time

# Parquet particionado por ciudad
(joined_df
     .write.parquet(
         "joined_df_v1", 
         mode="overwrite", 
         partitionBy=["state"],
         compression="gzip"
     )
)

CPU times: user 273 ms, sys: 9.61 ms, total: 283 ms
Wall time: 4min 42s


In [19]:
%%time

# Parquet particionado por ciudad y fecha
(joined_df
     .write.parquet(
         "joined_df_v2", 
         mode="overwrite", 
         partitionBy=["state", "year", "month"],
         compression="gzip"
     )
)

CPU times: user 3.82 s, sys: 0 ns, total: 3.82 s
Wall time: 1h 7min 44s


In [27]:
%%time

# Cargar el archivo Parquet en un DataFrame
op_df_v1 = spark.read.parquet("./op_df_v1")

# Aplicar el filtro por estado (state) y año (year)
filtered_df = op_df_v1.filter((op_df_v1.state == "CA") & (op_df_v1.year == "2016"))

CPU times: user 20.2 ms, sys: 0 ns, total: 20.2 ms
Wall time: 6.48 s


In [36]:
# Definir el nuevo esquema
joined_schema = StructType([
    StructField("review_id", StringType(), nullable=True),
    StructField("user_id", StringType(), nullable=True),
    StructField("business_id", StringType(), nullable=True),
    StructField("stars", IntegerType(), nullable=True),
    StructField("date", DateType(), nullable=True),
    StructField("year", IntegerType(), nullable=True),
    StructField("month", IntegerType(), nullable=True),
    StructField("text", StringType(), nullable=True),
    StructField("useful", IntegerType(), nullable=True),
    StructField("funny", IntegerType(), nullable=True),
    StructField("cool", IntegerType(), nullable=True),
    StructField("name", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("review_count", IntegerType(), True),
    StructField("is_open", IntegerType(), True),
    StructField("attributes", 
                StructType([
                    StructField("RestaurantsTakeOut", BooleanType(), True),
                    StructField("BusinessParking", 
                                StructType([
                                    StructField("garage", BooleanType(), True),
                                    StructField("street", BooleanType(), True),
                                    StructField("validated", BooleanType(), True),
                                    StructField("lot", BooleanType(), True),
                                    StructField("valet", BooleanType(), True)
                                ]),
                                True)
                ]),
                True),
    StructField("categories", ArrayType(StringType()), True),
    StructField("hours", 
                StructType([
                    StructField("Monday", StringType(), True),
                    StructField("Tuesday", StringType(), True),
                    StructField("Wednesday", StringType(), True),
                    StructField("Thursday", StringType(), True),
                    StructField("Friday", StringType(), True),
                    StructField("Saturday", StringType(), True),
                    StructField("Sunday", StringType(), True)
                ]),
                True)
])

In [37]:
%%time

# Cargar el archivo Parquet en un DataFrame
joined_df_v1 = spark.read.schema(joined_schema).parquet("./joined_df_v1")

# Aplicar el filtro por estado (state) y año (year)
filtered_df = joined_df_v1.filter((joined_df_v1.state == "CA") & (joined_df_v1.year == "2016"))


CPU times: user 14.1 ms, sys: 0 ns, total: 14.1 ms
Wall time: 113 ms


In [33]:
%%time

# Cargar el archivo Parquet en un DataFrame
joined_df_v2 = spark.read.schema(joined_schema).parquet("./joined_df_v2")

# Aplicar el filtro por estado (state) y año (year)
filtered_df = joined_df_v2.filter((joined_df_v2.state == "CA") & (joined_df_v2.year == "2016"))

CPU times: user 121 ms, sys: 0 ns, total: 121 ms
Wall time: 1min 49s
