## Confguração de ambiente

### 1. Instalação das Dependências:

Ao contrário de bibliotecas Python padrão, a utilização do PySpark no Google Colab exige a instalação prévia do Java Development Kit (JDK) e do próprio Apache Spark. O processo é simples e pode ser executado diretamente nas células do notebook.

In [1]:
# Instalar o OpenJDK 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Baixar o Apache Spark (versão 3.5.0 com Hadoop 3)
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz

# Extrair o arquivo baixado
!tar xf spark-3.5.0-bin-hadoop3.tgz

# Instalar a biblioteca findspark
!pip install -q findspark

### 2. Configuração das Variáveis de Ambiente:
Após a instalação, precisamos definir as variáveis de ambiente JAVA_HOME e SPARK_HOME para que o sistema saiba onde encontrar as instalações do Java e do Spark, respectivamente.


In [2]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [3]:
import findspark
findspark.init()

## Inicializando o PySpark


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Iniciando com PySpark no Colab") \
    .getOrCreate()

In [30]:
df = spark.read.format("json").load('./data/landing/devices/')

In [31]:
df.printSchema()

root
 |-- build_number: long (nullable = true)
 |-- dt_current_timestamp: long (nullable = true)
 |-- id: long (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- uid: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- version: long (nullable = true)



In [32]:
df.show(1,vertical = True,truncate=False)

-RECORD 0----------------------------------------------------
 build_number         | 105                                  
 dt_current_timestamp | 1654631486481                        
 id                   | 9293                                 
 manufacturer         | Apple                                
 model                | Xiaomi Mi 8 Pro                      
 platform             | Windows 10 Mobile                    
 serial_number        | ToFVWLzGTJhQxAaJlDDn                 
 uid                  | 8f7b83dd-89f9-4dbf-8a72-1763b4b9eb37 
 user_id              | 5292                                 
 version              | 286                                  
only showing top 1 row



In [33]:
df.count()

600

## O que é uma UDF no Spark?

No Apache Spark, uma UDF (User Defined Function) é uma função definida pelo usuário que permite aplicar lógica personalizada em colunas de um DataFrame.
Ou seja, quando as funções nativas do Spark SQL não conseguem atender a uma transformação específica, você pode criar sua própria função e aplicá-la sobre os dados.

In [28]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def saudacao(nome):
    return f"Olá, {nome}!"


saudacao_udf = udf(saudacao, StringType())

df = spark.createDataFrame([("João",), ("Maria",)], ["nome"])
df = df.withColumn("mensagem", saudacao_udf(df["nome"]))
df.show()


+-----+-----------+
| nome|   mensagem|
+-----+-----------+
| João| Olá, João!|
|Maria|Olá, Maria!|
+-----+-----------+



### **Vantagens das UDFs**

1. **Flexibilidade**: Permite aplicar qualquer lógica em colunas, mesmo que não exista função Spark pronta.
2. **Reuso de código**: Você pode transformar funções Python existentes em UDFs e aplicá-las diretamente em DataFrames.
3. **Integração**: Funciona tanto com DataFrames quanto com Spark SQL.

---

### **Desvantagens das UDFs**

1. **Performance**: UDFs geralmente são mais lentas que funções nativas do Spark, porque Spark precisa serializar dados entre o JVM e Python (no caso do PySpark).
2. **Paralelização limitada**: Como o Spark não consegue otimizar a execução interna de UDFs da mesma forma que funções nativas, pode impactar grandes volumes de dados.
3. **Tipos de dados**: É necessário definir explicitamente o tipo de retorno da UDF, o que aumenta a complexidade em funções mais complexas.
4. **Depuração mais difícil**: Se houver erro dentro da UDF, o rastreamento pode ser mais complicado do que nas funções nativas do Spark SQL.




### Exemplo 1: Criar uma coluna que classifica o fabricante como “Apple” ou “Outro”

In [34]:
def classificar_fabricante(marca):
    if marca == "Apple":
        return "Apple"
    else:
        return "Outro"

classificar_udf = udf(classificar_fabricante, StringType())

df = df.withColumn("fabricante_classificado", classificar_udf(df["manufacturer"]))
df.show()

+------------+--------------------+----+------------+--------------------+-----------------+--------------------+--------------------+-------+-------+-----------------------+
|build_number|dt_current_timestamp|  id|manufacturer|               model|         platform|       serial_number|                 uid|user_id|version|fabricante_classificado|
+------------+--------------------+----+------------+--------------------+-----------------+--------------------+--------------------+-------+-------+-----------------------+
|         105|       1654631486481|9293|       Apple|     Xiaomi Mi 8 Pro|Windows 10 Mobile|ToFVWLzGTJhQxAaJlDDn|8f7b83dd-89f9-4db...|   5292|    286|                  Apple|
|          96|       1654631486481|5402|      Lenovo|   Samsung Galaxy S4|       Android OS|ToFVWLzGTJhQxAaJlDDn|0b41f920-e0ec-4a4...|   7509|    521|                  Outro|
|         376|       1654631486481|7319|       Apple|Samsung Galaxy S7...|      Windows 8.1|     SJMZOmtU0csrv4R|0beb9c3a-aa3

### Exemplo 2: Extrair o ano a partir do timestamp

In [35]:
import datetime
from pyspark.sql.types import IntegerType

def extrair_ano(ts):
    return datetime.datetime.fromtimestamp(ts / 1000).year

extrair_ano_udf = udf(extrair_ano, IntegerType())

df = df.withColumn("ano", extrair_ano_udf(df["dt_current_timestamp"]))
df.show()

+------------+--------------------+----+------------+--------------------+-----------------+--------------------+--------------------+-------+-------+-----------------------+----+
|build_number|dt_current_timestamp|  id|manufacturer|               model|         platform|       serial_number|                 uid|user_id|version|fabricante_classificado| ano|
+------------+--------------------+----+------------+--------------------+-----------------+--------------------+--------------------+-------+-------+-----------------------+----+
|         105|       1654631486481|9293|       Apple|     Xiaomi Mi 8 Pro|Windows 10 Mobile|ToFVWLzGTJhQxAaJlDDn|8f7b83dd-89f9-4db...|   5292|    286|                  Apple|2022|
|          96|       1654631486481|5402|      Lenovo|   Samsung Galaxy S4|       Android OS|ToFVWLzGTJhQxAaJlDDn|0b41f920-e0ec-4a4...|   7509|    521|                  Outro|2022|
|         376|       1654631486481|7319|       Apple|Samsung Galaxy S7...|      Windows 8.1|     SJM

## **Exemplo 3 : Validar e classificar dispositivos “únicos por usuário”**

**Problema:**

* Cada usuário (`user_id`) pode ter vários dispositivos.
* Queremos criar uma **categoria de dispositivo primário, secundário ou duplicado** com base em:

  * `serial_number`
  * `model`
  * `platform`
* Regras:

  1. Se o usuário tiver **um único dispositivo com esse serial**, marca como `Primário`.
  2. Se tiver **mais de um dispositivo com mesmo serial**, marca como `Duplicado`.
  3. Se o serial for novo para o usuário, mas o modelo já existe, marca como `Secundário`.


In [36]:
df.createOrReplaceTempView("devices")

In [39]:
spark.sql("""
WITH serial_counts AS (
    SELECT user_id, serial_number, model, COUNT(*) AS serial_count
    FROM devices
    GROUP BY user_id, serial_number, model
),
model_seen AS (
    SELECT d1.user_id, d1.model,
           CASE WHEN d2.model IS NOT NULL THEN true ELSE false END AS model_seen_before
    FROM devices d1
    LEFT JOIN devices d2
      ON d1.user_id = d2.user_id
     AND d1.model = d2.model
     AND d1.serial_number != d2.serial_number
)
SELECT sc.user_id, sc.serial_number, sc.model,
       CASE
           WHEN sc.serial_count > 1 THEN 'Duplicado'
           WHEN ms.model_seen_before THEN 'Secundário'
           ELSE 'Primário'
       END AS device_category
FROM serial_counts sc
LEFT JOIN model_seen ms
  ON sc.user_id = ms.user_id AND sc.model = ms.model
""").show()



+-------+--------------------+--------------------+---------------+
|user_id|       serial_number|               model|device_category|
+-------+--------------------+--------------------+---------------+
|   3949|UVr864F8zUbyYOAUd...|Xiaomi Redmi Note...|      Duplicado|
|   3949|UVr864F8zUbyYOAUd...|Xiaomi Redmi Note...|      Duplicado|
|   8622|T6UuMUTani3VGY4vXGia|           iPhone 4S|      Duplicado|
|   8622|T6UuMUTani3VGY4vXGia|           iPhone 4S|      Duplicado|
|    246|pEekWH7zGxVITv6NT...| Xiaomi Pocophone F1|       Primário|
|   7509|ToFVWLzGTJhQxAaJlDDn|   Samsung Galaxy S4|      Duplicado|
|   7509|ToFVWLzGTJhQxAaJlDDn|   Samsung Galaxy S4|      Duplicado|
|   9895|39gPmcOKpwhDezLdi...|          OnePlus 6T|       Primário|
|   7647|pEekWH7zGxVITv6NT...|   Samsung Galaxy S8|       Primário|
|   7876|Yr9Vt13BlgvXO9zgJ...|           OnePlus 2|       Primário|
|   6770|xC36G3Xy4n2Fu90ke...|          Huawei P10|       Primário|
|   4666|Yr9Vt13BlgvXO9zgJ...|   iPhone 6 / 6 Pl

1. O SQL funciona, mas é bem mais complexo, com CTEs, joins e subqueries.

2. Para datasets grandes, esse tipo de join pode explodir a quantidade de dados intermediários.

3. A UDF é muito mais legível para lógica condicional complexa que depende de múltiplas colunas e estado do grupo.

In [40]:
from pyspark.sql import Window
from pyspark.sql.functions import col, count
from pyspark.sql.types import StringType


# 1. Criar um contador de dispositivos por usuário
window_user = Window.partitionBy("user_id", "serial_number")
df_counted = df.withColumn("serial_count", count("*").over(window_user))


def categorize_device(serial_count, model_seen_before):
    if serial_count > 1:
        return "Duplicado"
    elif model_seen_before:
        return "Secundário"
    else:
        return "Primário"

# 3. Para simular se o modelo já apareceu, criamos um lookup temporário
# Aqui simplificado: assumimos False (poderia vir de outro DataFrame/consulta)
categorize_udf = udf(lambda serial_count, model_seen_before=False: categorize_device(serial_count, model_seen_before), StringType())

# 4. Aplicar UDF
df_final = df_counted.withColumn("device_category", categorize_udf(col("serial_count")))
df_final.select("user_id","serial_number","model","device_category").show(truncate=False)


+-------+------------------------------+-------------------+---------------+
|user_id|serial_number                 |model              |device_category|
+-------+------------------------------+-------------------+---------------+
|1      |VMTnd2mMQWvjbtNcZh7UIdULKb1mMo|OnePlus 3          |Primário       |
|8      |Kl2ZroV9a                     |Huawei Mate 20     |Primário       |
|14     |tL&^J@24CVF=zP46Lxixk`_a#=o6c5|iPhone 5C          |Duplicado      |
|14     |tL&^J@24CVF=zP46Lxixk`_a#=o6c5|iPhone 5C          |Duplicado      |
|26     |Kl2ZroV9a                     |Samsung Galaxy S3  |Primário       |
|33     |ToFVWLzGTJhQxAaJlDDn          |iPhone 4S          |Primário       |
|53     |m6aHiiHOc                     |Huawei Mate 9      |Primário       |
|97     |ToFVWLzGTJhQxAaJlDDn          |OnePlus 6T         |Primário       |
|98     |9vxM9fCsG9nXg8EjTN5ygV2LvaDZdG|iPhone 3GS         |Primário       |
|148    |T6UuMUTani3VGY4vXGia          |Huawei Mate 20 Pro |Primário       |

In [42]:
df_final.where("user_id = 3949").show()

+------------+--------------------+----+------------+--------------------+--------+--------------------+--------------------+-------+-------+-----------------------+----+------------+---------------+
|build_number|dt_current_timestamp|  id|manufacturer|               model|platform|       serial_number|                 uid|user_id|version|fabricante_classificado| ano|serial_count|device_category|
+------------+--------------------+----+------------+--------------------+--------+--------------------+--------------------+-------+-------+-----------------------+----+------------+---------------+
|         330|       1654631486481|5872|          HP|Xiaomi Redmi Note...|   webOS|UVr864F8zUbyYOAUd...|37bdbb44-2a44-472...|   3949|    581|                  Outro|2022|           2|      Duplicado|
|         330|       1654631486481|5872|          HP|Xiaomi Redmi Note...|   webOS|UVr864F8zUbyYOAUd...|37bdbb44-2a44-472...|   3949|    581|                  Outro|2022|           2|      Duplicado|


Perfeito! Vamos resumir de forma **didática e prática** o que você precisa para **criar, declarar e usar uma UDF no Spark**:

---

####  Ter uma função Python “normal”**

A UDF sempre começa com uma função Python que faz o que você quer linha a linha.

Exemplo:

```python
def classify_build(build_number):
    if build_number > 400:
        return "Novo"
    else:
        return "Antigo"
```

---

#### Registrar a função como UDF**

Você precisa transformar a função Python em uma **UDF Spark**, indicando o tipo de retorno:

```python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

classify_udf = udf(classify_build, StringType())
```

* `udf(func, returnType)` → recebe a função e o tipo do retorno.
* Tipos comuns: `StringType()`, `IntegerType()`, `DoubleType()`, `BooleanType()`, etc.

---

#### Aplicar a UDF em uma coluna do DataFrame**

Você usa `withColumn` ou `select` e passa a coluna como argumento:

```python
from pyspark.sql.functions import col

df = df.withColumn("build_category", classify_udf(col("build_number")))
```

* `col("build_number")` passa o valor da coluna para a função.
* Spark aplica a função **linha a linha** e cria a nova coluna `build_category`.

---

####  UDF com múltiplos argumentos**

Se a função precisa de mais de uma coluna:

```python
def device_info(build_number: int, is_active: bool, platform: str):
    if build_number > 400 and is_active and "Windows" in platform:
        return "Ativo Windows Novo"
    elif build_number > 400:
        return "Windows Antigo"
    else:
        return "Outro"

device_info_udf = udf(device_info, StringType())

df = df.withColumn("device_status", device_info_udf(col("build_number"), col("active"), col("platform")))
```

#### Tipos de dado diferentes**

Sim, a UDF pode receber **argumentos de tipos diferentes**. Por exemplo:

```python
from pyspark.sql.types import IntegerType, BooleanType, StringType
from pyspark.sql.functions import col, udf

def device_info(build_number: int, is_active: bool, platform: str):
    if build_number > 400 and is_active and "Windows" in platform:
        return "Ativo Windows Novo"
    elif build_number > 400:
        return "Windows Antigo"
    else:
        return "Outro"

device_info_udf = udf(device_info, StringType())

df = df.withColumn("device_status", device_info_udf(col("build_number"), col("active"), col("platform")))
```

* Aqui `build_number` é `IntegerType`, `is_active` é `BooleanType` e `platform` é `StringType`.
* Spark lida com diferentes tipos, **desde que a função Python consiga processá-los**.

---

#### Uma **UDF só pode retornar um valor por linha**




