In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F

from pyspark.sql.types import (
    IntegerType,
    StringType,
    TimestampType,
    StructField,
    StructType,
)

### Spark SQL

#### Session

In [2]:
sp: SparkSession = SparkSession.builder.appName("app").getOrCreate()

sp.sparkContext.setLogLevel("ERROR")

24/08/16 20:03:31 WARN Utils: Your hostname, torrescereno resolves to a loopback address: 127.0.1.1; using 192.168.1.165 instead (on interface enp4s0)
24/08/16 20:03:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/16 20:03:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Lectura - Schema

In [28]:
# Dataset
# https://www.kaggle.com/datasets/muhammadehsan000/paris-2024-summer-olympic-games-dataset

# Estructura / Schema
schema: StringType = StructType(
    [
        StructField("date", TimestampType(), True),
        StructField("stage_code", StringType(), True),
        StructField("event_code", StringType(), True),
        StructField("event_name", StringType(), True),
        StructField("event_stage", StringType(), True),
        StructField("stage", StringType(), True),
        StructField("stage_status", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("discipline_name", StringType(), True),
        StructField("discipline_code", StringType(), True),
        StructField("venue", StringType(), True),
        StructField("participant_code", StringType(), True),
        StructField("participant_name", StringType(), True),
        StructField("participant_type", StringType(), True),
        StructField("participant_country_code", StringType(), True),
        StructField("participant_country", StringType(), True),
        StructField("rank", IntegerType(), True),
        StructField("result", StringType(), True),
        StructField("result_type", StringType(), True),
        StructField("result_IRM", StringType(), True),
        StructField("result_diff", StringType(), True),
        StructField("qualification_mark", StringType(), True),
        StructField("start_order", IntegerType(), True),
        StructField("bib", StringType(), True)
    ]
)

# Lectura de csv
df: DataFrame = sp.read \
                        .format("csv") \
                        .option("header", True) \
                        .schema(schema) \
                        .load("data/results")
                        
# Inferir schema
# df = sp.read.option("header", True).option("inferSchema", True).csv("data/results")

df.count()

20613

In [25]:
# Tomar una muestra
df = (
    sp.read.format("csv")
    .option("header", True)
    .load("data/results")
    .sample(0.1)
)

df.count()

2035

In [26]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- stage_code: string (nullable = true)
 |-- event_code: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- event_stage: string (nullable = true)
 |-- stage: string (nullable = true)
 |-- stage_status: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- discipline_name: string (nullable = true)
 |-- discipline_code: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- participant_code: string (nullable = true)
 |-- participant_name: string (nullable = true)
 |-- participant_type: string (nullable = true)
 |-- participant_country_code: string (nullable = true)
 |-- participant_country: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- result: string (nullable = true)
 |-- result_type: string (nullable = true)
 |-- result_IRM: string (nullable = true)
 |-- result_diff: string (nullable = true)
 |-- qualification_mark: string (nullable = true)
 |-- start_order: string (nullable = 

#### SQL

In [6]:
# Registrar el DataFrame como una tabla temporal en SQL
df.createOrReplaceTempView("jjoo")

# Ejecutar una consulta SQL para seleccionar a todos los empleados de un departamento específico
sp.sql("SELECT * FROM jjoo WHERE participant_country = 'Chile'").show(5)


+--------------------+--------------------+----------+------------------+--------------------+--------------------+------------+------+---------------+---------------+----------------+----------------+--------------------+----------------+------------------------+-------------------+----+------+-----------+----------+-----------+------------------+-----------+----+
|                date|          stage_code|event_code|        event_name|         event_stage|               stage|stage_status|gender|discipline_name|discipline_code|           venue|participant_code|    participant_name|participant_type|participant_country_code|participant_country|rank|result|result_type|result_IRM|result_diff|qualification_mark|start_order| bib|
+--------------------+--------------------+----------+------------------+--------------------+--------------------+------------+------+---------------+---------------+----------------+----------------+--------------------+----------------+------------------------+

#### Dataframe

In [7]:
df[df["participant_country"] == "Chile"].show(5)

+--------------------+--------------------+----------+------------------+--------------------+--------------------+------------+------+---------------+---------------+----------------+----------------+--------------------+----------------+------------------------+-------------------+----+------+-----------+----------+-----------+------------------+-----------+----+
|                date|          stage_code|event_code|        event_name|         event_stage|               stage|stage_status|gender|discipline_name|discipline_code|           venue|participant_code|    participant_name|participant_type|participant_country_code|participant_country|rank|result|result_type|result_IRM|result_diff|qualification_mark|start_order| bib|
+--------------------+--------------------+----------+------------------+--------------------+--------------------+------------+------+---------------+---------------+----------------+----------------+--------------------+----------------+------------------------+

#### Distinct

In [29]:
df[df["participant_country"] == "Chile"].select("participant_name").distinct().count()

45

In [30]:
countries = df.select("participant_country").distinct().rdd.map(lambda row: row[0]).collect()
countries.append("United States of America")

df = df[~df["participant_name"].isin(countries)]

#### Filters

In [31]:
# Participantes mujeres (filter)
df.filter(df["participant_country"] == "Chile")\
    .select("participant_name") \
    .distinct() \
    .filter(df["gender"] == "W") \
    .count()

16

In [32]:
# Participantes hombres (vectores)
# WHERE utiliza la api de funciones que es más común en operaciones complejas y es más expresiva cuando se trabaja con múltiples condiciones o columnas

df[df["participant_country"] == "Chile"] \
    .select("participant_name") \
    .distinct().where(F.col("gender") == "M").count()

27

In [33]:
df.where(
    (F.col("participant_country") == "Chile") &
    ((F.col("gender") == "O") | (F.col("gender") == "X"))
).count()

1

#### Groups

In [34]:
# ¿Cuántos eventos diferentes se llevaron a cabo en cada disciplina?

df.groupBy("discipline_name").agg(F.countDistinct("event_name").alias("unique_event_count")).show(5)

+-----------------+------------------+
|  discipline_name|unique_event_count|
+-----------------+------------------+
|           Tennis|                 5|
|           Boxing|                13|
|Marathon Swimming|                 2|
|             Golf|                 2|
|           Rowing|                 7|
+-----------------+------------------+
only showing top 5 rows



In [35]:
# ¿Cuál fue la distribución de los eventos según el género de los participantes?

df.groupBy("gender").agg(F.countDistinct("event_code").alias("event_count")).show()

+------+-----------+
|gender|event_count|
+------+-----------+
|     M|        141|
|     O|          5|
|     X|         12|
|     W|        144|
+------+-----------+



In [36]:
# ¿Cuáles son los países con mayor cantidad de participantes?

df.groupBy("participant_country").agg(F.countDistinct("participant_name").alias("count")).orderBy(F.desc("count")).show(5)

+-------------------+-----+
|participant_country|count|
+-------------------+-----+
|      United States|  354|
|             France|  288|
|              China|  262|
|              Italy|  246|
|          Australia|  246|
+-------------------+-----+
only showing top 5 rows



In [37]:
# ¿Qué eventos tuvieron el mayor número de participantes?

df.groupBy("event_name", "discipline_name").agg(F.countDistinct("participant_name").alias("count")).orderBy(F.desc("count")).show(5)

+-----------------+---------------+-----+
|       event_name|discipline_name|count|
+-----------------+---------------+-----+
|       Men's 100m|      Athletics|  102|
|Women's Road Race|   Cycling Road|   92|
|     Women's 100m|      Athletics|   91|
| Women's Marathon|      Athletics|   91|
|  Men's Road Race|   Cycling Road|   90|
+-----------------+---------------+-----+
only showing top 5 rows



#### Pivot

In [38]:
# Cuántos participantes de cada género (gender) han competido en cada país (participant_country).

df.groupBy("participant_country").pivot("gender").agg(F.count("participant_code")).orderBy(F.asc("participant_country")).show(5)

+-------------------+---+----+---+----+
|participant_country|  M|   O|  W|   X|
+-------------------+---+----+---+----+
|                AIN| 37|NULL| 40|   1|
|        Afghanistan|  3|NULL|  4|NULL|
|            Albania| 13|NULL|  5|NULL|
|            Algeria| 52|NULL| 33|   3|
|     American Samoa|  1|NULL|  1|NULL|
+-------------------+---+----+---+----+
only showing top 5 rows



#### Columns

In [39]:
df.show(1)

+-------------------+--------------------+------------+--------------------+--------------------+-----+------------+------+---------------+---------------+---------+-----------------+------------------+----------------+------------------------+-------------------+----+-------+-----------+----------+-----------+------------------+-----------+----+
|               date|          stage_code|  event_code|          event_name|         event_stage|stage|stage_status|gender|discipline_name|discipline_code|    venue| participant_code|  participant_name|participant_type|participant_country_code|participant_country|rank| result|result_type|result_IRM|result_diff|qualification_mark|start_order| bib|
+-------------------+--------------------+------------+--------------------+--------------------+-----+------------+------+---------------+---------------+---------+-----------------+------------------+----------------+------------------------+-------------------+----+-------+-----------+----------+--

In [40]:
# Agregar una columna

df.withColumn("upper", F.upper(df.participant_name)).withColumn(
    "lower", F.lower(df.participant_name)
).show(1)

+-------------------+--------------------+------------+--------------------+--------------------+-----+------------+------+---------------+---------------+---------+-----------------+------------------+----------------+------------------------+-------------------+----+-------+-----------+----------+-----------+------------------+-----------+----+------------------+------------------+
|               date|          stage_code|  event_code|          event_name|         event_stage|stage|stage_status|gender|discipline_name|discipline_code|    venue| participant_code|  participant_name|participant_type|participant_country_code|participant_country|rank| result|result_type|result_IRM|result_diff|qualification_mark|start_order| bib|             upper|             lower|
+-------------------+--------------------+------------+--------------------+--------------------+-----+------------+------+---------------+---------------+---------+-----------------+------------------+----------------+-------

In [41]:
# Renombrar una columna

df.withColumnRenamed("date", "fecha").show(1)

+-------------------+--------------------+------------+--------------------+--------------------+-----+------------+------+---------------+---------------+---------+-----------------+------------------+----------------+------------------------+-------------------+----+-------+-----------+----------+-----------+------------------+-----------+----+
|              fecha|          stage_code|  event_code|          event_name|         event_stage|stage|stage_status|gender|discipline_name|discipline_code|    venue| participant_code|  participant_name|participant_type|participant_country_code|participant_country|rank| result|result_type|result_IRM|result_diff|qualification_mark|start_order| bib|
+-------------------+--------------------+------------+--------------------+--------------------+-----+------------+------+---------------+---------------+---------+-----------------+------------------+----------------+------------------------+-------------------+----+-------+-----------+----------+--

#### Performance

Cache

Por defecto, un DataFrame no se almacena en ningún lugar y se vuelve a calcular cuando es necesario. 
El almacenamiento en caché de un DataFrame puede mejorar el rendimiento si se accede a él muchas veces. 

Hay dos maneras de hacer esto:

1- El método de caché de DataFrame establece el modo de persistencia del DataFrame al valor predeterminado (Memoria y Disco).

2- Para tener más control puedes usar persistir. persistir requiere un StorageLevel. persistir se usa más comúnmente para controlar el factor de replicación.



In [42]:
from pyspark import StorageLevel

# Copias
df1 = df[df["participant_country"] == "Chile"]
df2 = df[df["participant_country"] == "Chile"]
df3 = df[df["participant_country"] == "Chile"]
df4 = df[df["participant_country"] == "Chile"]

print("Nivel de almacenamiento predeterminado (NONE).")
print(df.storageLevel)

print("\ncache()")
df1.cache()
print(df1.storageLevel)

print("\nDISK_ONLY")
df2.persist(storageLevel=StorageLevel.DISK_ONLY)
print(df2.storageLevel)

print("\nMEMORY_ONLY")
df3.persist(storageLevel=StorageLevel.MEMORY_ONLY)
print(df3.storageLevel)

print("\nMEMORY_AND_DISK")
df4.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
print(df4.storageLevel)

df.unpersist()
df1.unpersist()
df2.unpersist()
df3.unpersist()
df4.unpersist()

Nivel de almacenamiento predeterminado (NONE).
Serialized 1x Replicated

cache()
Disk Memory Deserialized 1x Replicated

DISK_ONLY
Disk Memory Deserialized 1x Replicated

MEMORY_ONLY
Disk Memory Deserialized 1x Replicated

MEMORY_AND_DISK
Disk Memory Deserialized 1x Replicated


DataFrame[date: timestamp, stage_code: string, event_code: string, event_name: string, event_stage: string, stage: string, stage_status: string, gender: string, discipline_name: string, discipline_code: string, venue: string, participant_code: string, participant_name: string, participant_type: string, participant_country_code: string, participant_country: string, rank: int, result: string, result_type: string, result_IRM: string, result_diff: string, qualification_mark: string, start_order: int, bib: string]

Plan de ejecución

El plan lógico optimizado muestra la secuencia de transformaciones que PySpark aplica al DataFrame antes de convertirlo en un plan físico (es decir, antes de ejecutar realmente las operaciones).

El plan físico muestra cómo PySpark ejecutará realmente las operaciones en el clúster.

In [43]:
df = df.groupBy("participant_code").count()

execution_plan = str(df.explain(mode="cost"))
print(execution_plan)

== Optimized Logical Plan ==
Aggregate [participant_code#1804], [participant_code#1804, count(1) AS count#3038L], Statistics(sizeInBytes=401.3 KiB)
+- Project [participant_code#1804], Statistics(sizeInBytes=312.2 KiB)
   +- Filter NOT participant_name#1805 INSET AIN, Afghanistan, Albania, Algeria, American Samoa, Andorra, Angola, Antigua and Barbuda, Argentina, Armenia, Aruba, Australia, Austria, Azerbaijan, Bahamas, Bahrain, Bangladesh, Barbados, Belgium, Belize, Benin, Bermuda, Bhutan, Bolivia, Bosnia & Herzegovina, Botswana, Brazil, Brunei Darussalam, Bulgaria, Burkina Faso, Burundi, Cabo Verde, Cambodia, Cameroon, Canada, Cayman Islands, Centr Afric Rep, Chad, Chile, China, Chinese Taipei, Colombia, Comoros, Congo, Cook Islands, Costa Rica, Croatia, Cuba, Cyprus, Czechia, Côte d'Ivoire, DPR Korea, DR Congo, Denmark, Djibouti, Dominica, Dominican Republic, EOR, Ecuador, Egypt, El Salvador, Equatorial Guinea, Eritrea, Estonia, Eswatini, Ethiopia, Fiji, Finland, France, Gabon, Gambia,

Repartition

El método .repartition() se utiliza para redistribuir los datos de un DataFrame en un número específico de particiones. Las particiones son subconjuntos de datos que se distribuyen a través de los nodos de un clúster para permitir un procesamiento paralelo eficiente.

El método foreachPartition  permite aplicar una función personalizada a cada partición de datos en un DataFrame o RDD. Esto es útil cuando necesitas realizar operaciones específicas en cada partición, como filtrar, procesar o contar registros.

In [44]:
def number_in_partition(rows):
    try:
        first_row = next(rows)
        partition_size = sum(1 for x in rows) + 1
        partition_value = first_row.participant_code
        print(f"Partition {partition_value} has {partition_size} records")
    except StopIteration:
        print("Empty partition")

df.repartition(20, "participant_code").foreachPartition(number_in_partition)

Partition 1952010 has 354 records
Partition 1918990 has 335 records
Partition 1897299 has 359 records
Partition 1920843 has 340 records
Partition 1924455 has 352 records
Partition 1570596 has 356 records
Partition 1956237 has 368 records
Partition 1928799 has 351 records
Partition 1543943 has 351 records
Partition 1939851 has 392 records
Partition 1573418 has 334 records
Partition 1563781 has 369 records
Partition 1969332 has 345 records
Partition 1902794 has 331 records
Partition 1573107 has 382 records
Partition 1958688 has 301 records
Partition 1573013 has 340 records
Partition 1981764 has 354 records
Partition 1904166 has 313 records
Partition 1552167 has 339 records


### Referencias

[cartershanklin/pyspark-cheatsheet](https://github.com/cartershanklin/pyspark-cheatsheet)

[kevinschaich/pyspark-cheatsheet](https://github.com/kevinschaich/pyspark-cheatsheet)

[PySpark](https://spark.apache.org/docs/latest/api/python/reference/index.html)