In [1]:
from pyspark.sql import SparkSession


#### Explanation

Nosso `script python` precisa usar a **API** do spark chamada `pyspark` para se conectar ao **Cluster do Spark**, que é onde irá acontecer toda a transformação dos dados de maneira rápida.

Ou seja, para que as análises de dados sejam feitas de maneira mais rápida usando o spark, precisamos utilizar as funções ou métodos do Pyspark, que irá enviar as mensagens para os WORKERS do Spark. Contudo, as vezes temos pipelines que são muito específicos, então devemos transformar em funções python a trasnformação que queremos realizar, e aplicar o **apply** da instância do pyspark para que os dados sejam processados mais rápido no microservidor que está o Apache Spark.

### Função Customizada

In [2]:
# Criar uma nova SparkSession
spark = SparkSession.builder \
    .appName("Exemplo PySpark") \
    .master("spark://localhost:7077") \
    .getOrCreate()


24/11/28 22:42:32 WARN Utils: Your hostname, miguel-ubuntu2404 resolves to a loopback address: 127.0.1.1; using 192.168.0.186 instead (on interface wlp3s0)
24/11/28 22:42:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/28 22:42:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

In [4]:
# 1. Configurar o Spark (spark instance above - Done)

# 2. Criar um DataFrame de exemplo
data = [
    (1, "Alice", 28),
    (2, "Bob", 35),
    (3, "Cathy", 29),
    (4, "David", 40)
]
columns = ["id", "name", "age"]

df = spark.createDataFrame(data, columns)

# Mostrar o DataFrame original
print("DataFrame Original:")
df.show()

# 3. Criar uma função Python personalizada
def categorize_age(age):
    if age < 30:
        return "Young"
    elif 30 <= age < 40:
        return "Adult"
    else:
        return "Old"

# Converter a função Python para uma UDF (User Defined Function) do PySpark
# P.S. No PySpark, ao registrar uma função Python como uma UDF (User Defined Function),
# é necessário informar explicitamente o TIPO de dado que será retornado pela FUNÇÃO.
categorize_age_udf = udf(
    categorize_age, # função python do usuário
    StringType() # Especificando o tipo da função
)

# Aplicar a UDF ao DataFrame
df_transformed = df.withColumn(
    "age_category",
    categorize_age_udf(col("age"))
)

# Mostrar o DataFrame transformado
print("DataFrame Transformado:")
df_transformed.show()


# ALWAYS FINISH A INSTANCE
spark.stop()

DataFrame Original:


24/11/28 22:42:51 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 28|
|  2|  Bob| 35|
|  3|Cathy| 29|
|  4|David| 40|
+---+-----+---+

DataFrame Transformado:


                                                                                

+---+-----+---+------------+
| id| name|age|age_category|
+---+-----+---+------------+
|  1|Alice| 28|       Young|
|  2|  Bob| 35|       Adult|
|  3|Cathy| 29|       Young|
|  4|David| 40|         Old|
+---+-----+---+------------+

