In [29]:
import logging
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, sum, when, split, col, lit, max, min, expr
from pyspark.sql.functions import to_date, var_samp, variance, var_pop, month, to_timestamp, dayofweek
from pyspark.sql.types import NumericType, IntegerType, FloatType

In [30]:
spark = SparkSession.builder \
    .appName("Maestria_evidencia1") \
    .config("spark.driver.memory", "64g") \
    .config("spark.executor.memory", "32g") \
    .config("spark.sql.shuffle.partitions", "32") \
    .config("spark.default.parallelism", "16") \
    .config("spark.driver.maxResultSize", "16g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Mejores tablas
spark

In [31]:
import kagglehub
from kagglehub import KaggleDatasetAdapter

####
#### Opcion 1 para conseguir datos
#### Bajar el archivo de Kaggle
####
####
file_path = "/Users/pauescalante/Documents/Maestria/Trimestre 7/BigData/Datos"
df = spark.read.csv(file_path, header=True, inferSchema=True)
# df de un subset de las columnas
spark_df = df[['event_type', 'user_id', 'product_id', 'category_id', 'price']]

####
####

####
#### Opcion 2 para conseguir datos
#### Bajar el archivo de Kaggle
####
####

#file_path = "2019-Nov.csv"

#df_pandas = kagglehub.load_dataset(
#    KaggleDatasetAdapter.PANDAS,
#    "mkechinov/ecommerce-behavior-data-from-multi-category-store",
#    file_path
#)

#spark_df = spark.createDataFrame(
#    df_pandas[['event_type', 'user_id', 'product_id', 'category_id', 'price']]
#)

####
####

df.head()

                                                                                

Row(event_time=datetime.datetime(2019, 10, 31, 17, 0), event_type='view', product_id=1003461, category_id=2053013555631882655, category_code='electronics.smartphone', brand='xiaomi', price=489.07, user_id=520088904, user_session='4d3b30da-a5e4-49df-b1a8-ba5943f1dd33')

In [33]:
df = df.selectExpr(
    'cast(event_time as timestamp) event_time',
    'cast(event_type as string) event_type',
    'cast(product_id as int) product_id',
    'cast(category_id as long) category_id',
    'cast(category_code as string) category_code',
    'cast(brand as string) brand',
    'cast(price as float) price',
    'cast(user_id as int) user_id',
    'cast(user_session as string) user_session'
)

df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: float (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [32]:
df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



In [34]:
for column in df.columns:
    count = df.filter(col(column).isNull()).count()
    if count > 0:
        print(f"Columna '{column}' tiene {count} valores null")

                                                                                

Columna 'category_code' tiene 35413780 valores null


                                                                                

Columna 'brand' tiene 15331243 valores null




Columna 'user_session' tiene 12 valores null


                                                                                

In [35]:
# Verificando que existen valores non-null en las columnas
non_null_count = df.filter(col("brand").isNotNull()).count()
print(f"Valores non-null 'brand': {non_null_count}")

non_null_count = df.filter(col("category_code").isNotNull()).count()
print(f"Valores non-null 'category_code': {non_null_count}")

                                                                                

Valores non-null 'brand': 94619500




Valores non-null 'category_code': 74536963


                                                                                

In [37]:
# Dado que category_code y brand tienen muchos valores nulos, se van a convertir esos valores a None para tener algo asignado
# Se pueden eliminar en el futuro si tenemos suficientes datos...
df = df.fillna({"brand": "None"})
df = df.fillna({"category_code": "None"})

# Para user_session ya que el numero es muy pequeño, solamente se quitaran las entries con esos valores nulos
df = df.na.drop(subset=["user_session"])

In [38]:
# Verificando que existen valores non-null en las columnas
# Debe ser mayor que la verificación anterior ya que reemplazamos valores null a "None"
non_null_count = df.filter(col("brand").isNotNull()).count()
print(f"Valores non-null 'brand': {non_null_count}")

non_null_count = df.filter(col("category_code").isNotNull()).count()
print(f"Valores non-null 'category_code': {non_null_count}")

                                                                                

Valores non-null 'brand': 109950731




Valores non-null 'category_code': 109950731


                                                                                

In [39]:
output = ""
for column in df.columns:
    # Cuantos valores existen
    distinct_count = df.select(column).distinct().count()
    
    output += f"\nColumna: '{column}' — {distinct_count} valores distintos\n"
    output += "-" * 50 + "\n"

print(output)




Columna: 'event_time' — 5171097 valores distintos
--------------------------------------------------

Columna: 'event_type' — 3 valores distintos
--------------------------------------------------

Columna: 'product_id' — 206876 valores distintos
--------------------------------------------------

Columna: 'category_id' — 691 valores distintos
--------------------------------------------------

Columna: 'category_code' — 130 valores distintos
--------------------------------------------------

Columna: 'brand' — 4304 valores distintos
--------------------------------------------------

Columna: 'price' — 82966 valores distintos
--------------------------------------------------

Columna: 'user_id' — 5316649 valores distintos
--------------------------------------------------

Columna: 'user_session' — 23016650 valores distintos
--------------------------------------------------



                                                                                

In [40]:
# Ya que event_time tiene muchos valores distintos y no ocupamos tanto detalle, podemos cambiar event_time a ser solamente, YYYY/MM/DD
df.select("event_time").show(3, truncate=False)

+-------------------+
|event_time         |
+-------------------+
|2019-10-31 17:00:00|
|2019-10-31 17:00:00|
|2019-10-31 17:00:01|
+-------------------+
only showing top 3 rows



In [41]:
df = df.withColumn("event_time", to_date("event_time"))
df.select("event_time").show(3, truncate=False)

+----------+
|event_time|
+----------+
|2019-10-31|
|2019-10-31|
|2019-10-31|
+----------+
only showing top 3 rows



In [42]:
distinct_count = df.select("event_time").distinct().count() 
output = f"\nColumn: '{'event_time'}' — {distinct_count} valores distintos\n"
output += "-" * 50 + "\n"
print(output)




Column: 'event_time' — 62 valores distintos
--------------------------------------------------



                                                                                

In [43]:
# Para analizar los valores distintos, incluimos los valores categorícos del data set
df_unique = df[['event_time', 'event_type', 'brand', 'category_id', 'category_code']]

output = ""
for column in df_unique.columns:
    distinct_values = df_unique.select(column).distinct().orderBy(column).collect()
    distinct_count = len(distinct_values)

    # Separados por comas
    values_list = [str(row[column]) for row in distinct_values[:100]]
    values_str = ", ".join(values_list)
    output += f"\nColumna: '{column}' — {distinct_count} valores únicos\n Primeros 100 valores únicos:\n"
    output += "-" * 50 + "\n"
    output += values_str + "\n"
print(output)




Columna: 'event_time' — 62 valores únicos
 Primeros 100 valores únicos:
--------------------------------------------------
2019-09-30, 2019-10-01, 2019-10-02, 2019-10-03, 2019-10-04, 2019-10-05, 2019-10-06, 2019-10-07, 2019-10-08, 2019-10-09, 2019-10-10, 2019-10-11, 2019-10-12, 2019-10-13, 2019-10-14, 2019-10-15, 2019-10-16, 2019-10-17, 2019-10-18, 2019-10-19, 2019-10-20, 2019-10-21, 2019-10-22, 2019-10-23, 2019-10-24, 2019-10-25, 2019-10-26, 2019-10-27, 2019-10-28, 2019-10-29, 2019-10-30, 2019-10-31, 2019-11-01, 2019-11-02, 2019-11-03, 2019-11-04, 2019-11-05, 2019-11-06, 2019-11-07, 2019-11-08, 2019-11-09, 2019-11-10, 2019-11-11, 2019-11-12, 2019-11-13, 2019-11-14, 2019-11-15, 2019-11-16, 2019-11-17, 2019-11-18, 2019-11-19, 2019-11-20, 2019-11-21, 2019-11-22, 2019-11-23, 2019-11-24, 2019-11-25, 2019-11-26, 2019-11-27, 2019-11-28, 2019-11-29, 2019-11-30

Columna: 'event_type' — 3 valores únicos
 Primeros 100 valores únicos:
--------------------------------------------------
cart, purc

                                                                                

In [44]:
# Stats solamente valores numéricos
numeric_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
df.select(numeric_cols).describe().show()



+-------+--------------------+--------------------+-----------------+-------------------+
|summary|          product_id|         category_id|            price|            user_id|
+-------+--------------------+--------------------+-----------------+-------------------+
|  count|           109950731|           109950731|        109950731|          109950731|
|   mean|1.1755771377122113E7|2.057707155067225E18|291.6348042545759|5.366697784138143E8|
| stddev|1.5435645021777712E7|1.949326427113854...| 356.679979733147|2.145172855534869E7|
|    min|             1000365| 2053013552226107603|              0.0|           10300217|
|    max|           100028554| 2187707861038006932|          2574.07|          579969851|
+-------+--------------------+--------------------+-----------------+-------------------+



                                                                                

In [45]:
# Verificar con metodos de varianza 
variance_pop = df.agg(var_pop("price").alias("Population Variance"))
variance_samp = df.agg(var_samp("price").alias("Sample Variance"))
variance_pop.show()
variance_samp.show()

                                                                                

+-------------------+
|Population Variance|
+-------------------+
|  127220.6067853689|
+-------------------+





+------------------+
|   Sample Variance|
+------------------+
|127220.60794243813|
+------------------+



                                                                                

In [46]:
resultado = df.groupBy("event_type").count()
resultado.show()



+----------+---------+
|event_type|    count|
+----------+---------+
|      cart|  3955434|
|  purchase|  1659788|
|      view|104335509|
+----------+---------+



                                                                                

In [47]:
# Agrupar fechas por mes
df.groupBy(month("event_time").alias("month")) \
  .count() \
  .orderBy("month") \
  .show()



+-----+--------+
|month|   count|
+-----+--------+
|    9|  258646|
|   10|42507934|
|   11|67184151|
+-----+--------+



                                                                                

In [48]:
# Añadiendo una nueva columna de categoria a partir de una existente
# La nueva columna se llamará 'category_subset' generada a partir de las columna category_code
from pyspark.sql.functions import col, split, instr, length, substring

df = (
    df.withColumn("parent_category", split(col("category_code"), "\\.")[0])
      .withColumn(
          "subcategory",
          when(
              instr(col("category_code"), ".") > 0,
              expr("substring(category_code, instr(category_code, '.') + 1, length(category_code))")
          ).otherwise("None")
      )
)

In [49]:
df.show(1, truncate=False)
df.printSchema()

+----------+----------+----------+-------------------+----------------------+------+------+---------+------------------------------------+---------------+-----------+
|event_time|event_type|product_id|category_id        |category_code         |brand |price |user_id  |user_session                        |parent_category|subcategory|
+----------+----------+----------+-------------------+----------------------+------+------+---------+------------------------------------+---------------+-----------+
|2019-10-31|view      |1003461   |2053013555631882655|electronics.smartphone|xiaomi|489.07|520088904|4d3b30da-a5e4-49df-b1a8-ba5943f1dd33|electronics    |smartphone |
+----------+----------+----------+-------------------+----------------------+------+------+---------+------------------------------------+---------------+-----------+
only showing top 1 row

root
 |-- event_time: date (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- catego

In [20]:
# Se elimina la columna category_Code ya que tenemos dos columnas nuevas que representan a esta columna
df = df.drop("category_code")
df.show(1, truncate=False)
df.printSchema()

+----------+----------+----------+-------------------+------+------+---------+------------------------------------+---------------+-----------+
|event_time|event_type|product_id|category_id        |brand |price |user_id  |user_session                        |parent_category|subcategory|
+----------+----------+----------+-------------------+------+------+---------+------------------------------------+---------------+-----------+
|2019-10-31|view      |1003461   |2053013555631882655|xiaomi|489.07|520088904|4d3b30da-a5e4-49df-b1a8-ba5943f1dd33|electronics    |smartphone |
+----------+----------+----------+-------------------+------+------+---------+------------------------------------+---------------+-----------+
only showing top 1 row

root
 |-- event_time: date (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- brand: string (nullable = false)
 |-- price: double (nullable = true)
 |-- user_

In [50]:
# Para analizar los valores distintos, incluimos los valores categorícos del data set
df_unique = df[['parent_category', 'subcategory']]
output = ""
for column in df_unique.columns:
    distinct_values = df_unique.select(column).distinct().orderBy(column).collect()
    distinct_count = len(distinct_values)
    
    # Separados por comas
    values_list = [str(row[column]) for row in distinct_values]
    values_str = ", ".join(values_list)
    output += f"\nColumn: '{column}' — {distinct_count} valores únicos\n"
    output += "-" * 50 + "\n"
    output += values_str + "\n"
print(output)




Column: 'parent_category' — 14 valores únicos
--------------------------------------------------
None, accessories, apparel, appliances, auto, computers, construction, country_yard, electronics, furniture, kids, medicine, sport, stationery

Column: 'subcategory' — 130 valores únicos
--------------------------------------------------
None, accessories.alarm, accessories.anti_freeze, accessories.compressor, accessories.parktronic, accessories.player, accessories.radar, accessories.videoregister, accessories.winch, audio.acoustic, audio.headphone, audio.microphone, audio.music_tools.piano, audio.subwoofer, bag, bathroom.bath, bathroom.toilet, bedroom.bed, bedroom.blanket, bedroom.pillow, belt, bicycle, camera.photo, camera.video, carriage, cartrige, clocks, components.cooler, components.cpu, components.faucet, components.hdd, components.memory, components.motherboard, components.power_supply, components.videocards, costume, cultivator, desktop, dolls, dress, ebooks, environment.air_condi

                                                                                

In [22]:
df.groupBy("brand").count().orderBy("count", ascending=False).show(5, truncate=False)



+-------+--------+
|brand  |count   |
+-------+--------+
|None   |15331241|
|samsung|13172018|
|apple  |10381931|
|xiaomi |7721825 |
|huawei |2521331 |
+-------+--------+
only showing top 5 rows



                                                                                

In [51]:
df.groupBy("parent_category").count().orderBy("count", ascending=False).show(5, truncate=False)



+---------------+--------+
|parent_category|count   |
+---------------+--------+
|electronics    |40141700|
|None           |35413777|
|appliances     |13457119|
|computers      |6505575 |
|apparel        |4554025 |
+---------------+--------+
only showing top 5 rows



                                                                                

In [52]:
df.groupBy("subcategory").count().orderBy("count", ascending=False).show(5, truncate=False)



+-----------+--------+
|subcategory|count   |
+-----------+--------+
|None       |35413777|
|smartphone |27882227|
|clocks     |3397998 |
|video.tv   |3321794 |
|notebook   |3318177 |
+-----------+--------+
only showing top 5 rows



                                                                                

In [53]:
# Crear una nueva columna de price_bucket para representar rangos de precios conocidos
# Crear una nueva columna de is_weekend para agregar insights de marketing y day of week para trackear dias
df = df.withColumn(
    "price_bucket",
    when(col("price") < 100, "low")
    .when((col("price") >= 100) & (col("price") < 300), "medium")
    .otherwise("high")
)

df = df.withColumn("event_time_ts", to_timestamp(col("event_time")))
df = df.withColumn("day_of_week", dayofweek(col("event_time_ts")))
df = df.withColumn(
    "is_weekend",
    when((col("day_of_week") == 1) | (col("day_of_week") == 7), True).otherwise(False)
)

df.select("event_time", "price", "price_bucket", "day_of_week", "is_weekend").show(5)

+----------+------+------------+-----------+----------+
|event_time| price|price_bucket|day_of_week|is_weekend|
+----------+------+------------+-----------+----------+
|2019-10-31|489.07|        high|          5|     false|
|2019-10-31|293.65|      medium|          5|     false|
|2019-10-31| 28.31|         low|          5|     false|
|2019-10-31|712.87|        high|          5|     false|
|2019-10-31|183.27|      medium|          5|     false|
+----------+------+------------+-----------+----------+
only showing top 5 rows



In [54]:
resultado = df.groupBy("price_bucket").count()
resultado.show()



+------------+--------+
|price_bucket|   count|
+------------+--------+
|         low|37649697|
|      medium|39579818|
|        high|32721216|
+------------+--------+



                                                                                

In [55]:
# Ya que tenemos valores limpios y derivados
# Escoger el sample size de la población
df.printSchema()

root
 |-- event_time: date (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = false)
 |-- brand: string (nullable = false)
 |-- price: float (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- parent_category: string (nullable = true)
 |-- subcategory: string (nullable = false)
 |-- price_bucket: string (nullable = false)
 |-- event_time_ts: timestamp (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- is_weekend: boolean (nullable = false)

