In [1]:
pip install pyspark



In [2]:
#________________________________________________________________________________________
# import spark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

job_name = 'case_data_engineer'
spark = SparkSession.builder.appName(job_name).getOrCreate()

# Parte 1: Manipulação de Dados

Criação do DataFrame


In [3]:
df = spark.createDataFrame(
    [
        ("Alice", 34, "Data Scientist"),
        ("Bob", 45, "Data Engineer"),
        ("Cathy", 29, "Data Analyst"),
        ("David", 35, "Data Scientist"),
    ],
    ["Name", "Age", "Occupation"]
)

df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Age: long (nullable = true)
 |-- Occupation: string (nullable = true)



Filtragem e Seleção

Selecione apenas as colunas "Name" e "Age" do DataFrame criado.

In [4]:
df.select('Name', 'Age').show(truncate=False)

+-----+---+
|Name |Age|
+-----+---+
|Alice|34 |
|Bob  |45 |
|Cathy|29 |
|David|35 |
+-----+---+



Filtre as linhas onde a "Age" é maior que 30.

In [5]:
df.select('Name', 'Age').filter('Age > 30').show(truncate=False)

+-----+---+
|Name |Age|
+-----+---+
|Alice|34 |
|Bob  |45 |
|David|35 |
+-----+---+



Agrupamento e Agregação

Agrupe os dados pelo campo "Occupation" e calcule a média de "Age" para cada grupo.

In [6]:
df_agg = df.groupBy('Occupation').agg(F.avg('Age').alias('avgAge'))
df_agg.show(truncate=False)

+--------------+------+
|Occupation    |avgAge|
+--------------+------+
|Data Scientist|34.5  |
|Data Engineer |45.0  |
|Data Analyst  |29.0  |
+--------------+------+



Ordenação

Ordene o DataFrame resultante da questão anterior pela média de "Age" em ordem decrescente.

In [7]:
df_agg.sort(F.desc('avgAge')).show(truncate=False)

+--------------+------+
|Occupation    |avgAge|
+--------------+------+
|Data Engineer |45.0  |
|Data Scientist|34.5  |
|Data Analyst  |29.0  |
+--------------+------+



# Parte 2: Funções Avançadas

Criando a UDF que converte idades em categorias e aplicando ao DataFrame:

In [8]:
def age_category(age):
    result = None
    if age is not None:
        if age < 30:
            return "Jovem"
        elif age >= 30 & age <= 40:
            return "Adulto"
        elif age > 40:
            return "Senior"

df_get_age_category = F.udf(age_category, StringType())
df = df.withColumn('ageCategory', df_get_age_category('Age'))
df.show(truncate=False)

+-----+---+--------------+-----------+
|Name |Age|Occupation    |ageCategory|
+-----+---+--------------+-----------+
|Alice|34 |Data Scientist|Adulto     |
|Bob  |45 |Data Engineer |Adulto     |
|Cathy|29 |Data Analyst  |Jovem      |
|David|35 |Data Scientist|Adulto     |
+-----+---+--------------+-----------+



Funções de Janela

Use funções de janela para adicionar uma coluna que mostre a diferença de idade entre cada indivíduo e a média de idade do seu "Occupation".

In [9]:
windowSpec = Window.partitionBy("Occupation")
df = df.withColumn("avgAgeOccupation", F.avg("Age").over(windowSpec))
df = df.withColumn("ageDifference", (F.col("Age") - F.col("avgAgeOccupation")))
df.show()

+-----+---+--------------+-----------+----------------+-------------+
| Name|Age|    Occupation|ageCategory|avgAgeOccupation|ageDifference|
+-----+---+--------------+-----------+----------------+-------------+
|Cathy| 29|  Data Analyst|      Jovem|            29.0|          0.0|
|  Bob| 45| Data Engineer|     Adulto|            45.0|          0.0|
|Alice| 34|Data Scientist|     Adulto|            34.5|         -0.5|
|David| 35|Data Scientist|     Adulto|            34.5|          0.5|
+-----+---+--------------+-----------+----------------+-------------+



# Parte 3: Performance e Otimização

Particionamento

Explique como o particionamento pode ser usado para melhorar a performance em operações de leitura e escrita de dados em PySpark. Dê um exemplo de código que particiona um DataFrame por uma coluna específica.

      Pode ser utilizado para organizar os dados em partes menores com um ou mais campos.
      O particionamento permite o Spark processar os dados mais rapido acessando apenas as partiçoes necessarias na hora em que houve a consulta.

In [13]:
df_csv = spark.read.csv("sample_data/ibge_nomes.csv", header=True, inferSchema=True)
df_csv.show()
output_path = "result/ibge_nome/output/particionado"
df_csv.write.partitionBy("sexo").parquet(output_path)
spark.read.parquet(output_path).filter("sexo = 'F'").count()

+---------+------+-------+----+----+
|     nome|regiao|   freq|rank|sexo|
+---------+------+-------+----+----+
|     JOSE|     0|5732508|   1|   M|
|     JOAO|     0|2971935|   2|   M|
|  ANTONIO|     0|2567494|   3|   M|
|FRANCISCO|     0|1765197|   4|   M|
|   CARLOS|     0|1483121|   5|   M|
|    PAULO|     0|1417907|   6|   M|
|    PEDRO|     0|1213557|   7|   M|
|    LUCAS|     0|1116818|   8|   M|
|     LUIZ|     0|1102927|   9|   M|
|   MARCOS|     0|1101126|  10|   M|
|     LUIS|     0| 931530|  11|   M|
|  GABRIEL|     0| 922744|  12|   M|
|   RAFAEL|     0| 814709|  13|   M|
|   DANIEL|     0| 706527|  14|   M|
|  MARCELO|     0| 690098|  15|   M|
|    BRUNO|     0| 663271|  16|   M|
|  EDUARDO|     0| 628539|  17|   M|
|   FELIPE|     0| 615924|  18|   M|
| RAIMUNDO|     0| 611174|  19|   M|
|  RODRIGO|     0| 598825|  20|   M|
+---------+------+-------+----+----+
only showing top 20 rows



10000

Broadcast Join

Descreva o conceito de Broadcast Join em PySpark e como ele pode ser usado para otimizar operações de join. Implemente um exemplo de Broadcast Join entre dois DataFrames.

      O Broadcast Join é uma técnica de otimização em PySpark usada quando realizamos operações de join entre dois DataFrames de tamanhos muito diferentes.
      Em um Broadcast Join, o Spark transmite o DataFrame menor para todos os nós do cluster, de forma que cada nó possa executar o join localmente, sem precisar redistribuir os dados grandes entre os nós.
      Só é interessante utilizar esta funcionalidade quando o dataframe menor couber na memória de cada nó.


In [14]:
# criando os DataFrame
df_large = spark.createDataFrame(
    [
        ("Alice", 34, "Data Scientist", 1),
        ("Bob", 45, "Data Engineer", 2),
        ("Cathy", 29, "Data Analyst", 3),
        ("David", 35, "Data Scientist", 4),
        ("Jucimara", 30, "Data Engineer", 1),
        ("João", 25, "Data Engineer", 4),
    ],
    ["Name", "Age", "Occupation", "idCity"]
)

df_small = spark.createDataFrame(
    [
        (1, "Buenópolis"),
        (2, "Curvelo"),
        (3, "Montes Claros")
    ],
    ["idCity", "City"]
)

df_large.show()
df_small.show()

df_joined = df_large.join(F.broadcast(df_small), ["idCity"], "left")
df_joined.show()

+--------+---+--------------+------+
|    Name|Age|    Occupation|idCity|
+--------+---+--------------+------+
|   Alice| 34|Data Scientist|     1|
|     Bob| 45| Data Engineer|     2|
|   Cathy| 29|  Data Analyst|     3|
|   David| 35|Data Scientist|     4|
|Jucimara| 30| Data Engineer|     1|
|    João| 25| Data Engineer|     4|
+--------+---+--------------+------+

+------+-------------+
|idCity|         City|
+------+-------------+
|     1|   Buenópolis|
|     2|      Curvelo|
|     3|Montes Claros|
+------+-------------+

+------+--------+---+--------------+-------------+
|idCity|    Name|Age|    Occupation|         City|
+------+--------+---+--------------+-------------+
|     1|   Alice| 34|Data Scientist|   Buenópolis|
|     2|     Bob| 45| Data Engineer|      Curvelo|
|     3|   Cathy| 29|  Data Analyst|Montes Claros|
|     4|   David| 35|Data Scientist|         NULL|
|     1|Jucimara| 30| Data Engineer|   Buenópolis|
|     4|    João| 25| Data Engineer|         NULL|
+------+

# Parte 4: Integração com Outras Tecnologias

Leitura e Escrita de Dados

Demonstre como ler dados de um arquivo CSV e escrever o resultado em um formato Parquet.

In [15]:
df_csv = spark.read.csv("sample_data/ibge-fem-10000.csv", header=True, inferSchema=True)
df_csv.show()
output_path = "result/ibge-fem/output/ibge-fem.parquet"
df_csv.write.parquet(output_path)

+---------+------+--------+----+----+
|     nome|regiao|    freq|rank|sexo|
+---------+------+--------+----+----+
|    MARIA|     0|11694738|   1|   F|
|      ANA|     0| 3079729|   2|   F|
|FRANCISCA|     0|  721637|   3|   F|
|  ANTONIA|     0|  588783|   4|   F|
|  ADRIANA|     0|  565621|   5|   F|
|  JULIANA|     0|  562589|   6|   F|
|   MARCIA|     0|  551855|   7|   F|
| FERNANDA|     0|  531607|   8|   F|
| PATRICIA|     0|  529446|   9|   F|
|    ALINE|     0|  509869|  10|   F|
|   SANDRA|     0|  479230|  11|   F|
|   CAMILA|     0|  469851|  12|   F|
|   AMANDA|     0|  464624|  13|   F|
|    BRUNA|     0|  460770|  14|   F|
|  JESSICA|     0|  456472|  15|   F|
|  LETICIA|     0|  434056|  16|   F|
|    JULIA|     0|  430067|  17|   F|
|  LUCIANA|     0|  429769|  18|   F|
|  VANESSA|     0|  417512|  19|   F|
|  MARIANA|     0|  381778|  20|   F|
+---------+------+--------+----+----+
only showing top 20 rows



Integração com Hadoop

Explique como PySpark se integra com o Hadoop HDFS para leitura e escrita de dados. Dê um exemplo de código que leia um arquivo do HDFS e salve o resultado de volta no HDFS.

     o Spark é reponsavel por ler os dados direto no HDFS, esta leitura é distribuida entre os nós diponiveis no cluster, garantindo perfomance.
     Semelhante ocorre o processo de escrita, onde o Spark divide e distrui entre os nós, permitindo alta velocidade de escrita.

In [None]:
spark_hdfs = SparkSession.builder.appName(job_name).config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000/tmp/sample").getOrCreate()

In [None]:
hdfs_path = "hdfs://namenode_host:port/user/hadoop/dados.csv"

df_hdfs = spark.read.csv(hdfs_path, header=True, inferSchema=True)

df_hdfs.show()

In [None]:
output_path = "hdfs://namenode_host:port/user/hadoop/output/dados_filtrados.parquet"

df_hdfs.write.parquet(output_path)

# Parte 5: Problema de Caso

Processamento de Logs

Considere que você tem um grande arquivo de log com as seguintes colunas: "timestamp", "user_id", "action". Cada linha representa uma ação realizada por um usuário em um determinado momento.

In [16]:
import random
import csv
from datetime import datetime, timedelta

# Funções auxiliares para gerar dados
def generate_timestamp(start, end):
    """Gera um timestamp aleatório entre a data de início e fim."""
    return start + timedelta(seconds=random.randint(0, int((end - start).total_seconds())))

def generate_user_id():
    """Gera um user_id aleatório entre 1000 e 1150."""
    return random.randint(1000, 1150)

def generate_action():
    """Gera uma ação aleatória realizada pelo usuário."""
    actions = ["login", "logout", "view_page", "click_ad"]
    return random.choice(actions)

# Configurações
num_lines = 6001
start_date = datetime(2023, 1, 1)
end_date = datetime(2024, 8, 21)

# Caminho do arquivo de log
log_file = "sample_data/user_activity_log.csv"

# Gerando o arquivo de log
with open(log_file, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["timestamp", "user_id", "action"])  # Cabeçalhos

    for _ in range(num_lines):
        timestamp = generate_timestamp(start_date, end_date)
        user_id = generate_user_id()
        action = generate_action()
        writer.writerow([timestamp, user_id, action])

print(f"Arquivo de log '{log_file}' gerado com {num_lines} linhas.")

Arquivo de log 'sample_data/user_activity_log.csv' gerado com 6001 linhas.


Carregue o arquivo de log em um DataFrame.

In [17]:
df_log = spark.read.csv(log_file, header=True, inferSchema=True)
df_log.show()

+-------------------+-------+---------+
|          timestamp|user_id|   action|
+-------------------+-------+---------+
|2023-11-19 00:03:30|   1064| click_ad|
|2023-02-18 11:37:18|   1040|    login|
|2023-12-28 17:10:26|   1001|    login|
|2024-02-27 00:14:49|   1010|view_page|
|2024-06-25 08:54:26|   1017|   logout|
|2023-12-18 08:20:31|   1046| click_ad|
|2023-04-15 18:17:19|   1034|view_page|
|2024-03-21 03:47:06|   1118|view_page|
|2023-06-29 16:41:40|   1087|   logout|
|2024-06-06 20:49:41|   1116|view_page|
|2023-06-02 20:53:39|   1022| click_ad|
|2023-06-03 00:36:47|   1023|    login|
|2023-11-20 03:05:44|   1057|    login|
|2023-07-28 00:20:22|   1142|view_page|
|2023-05-08 13:10:30|   1071| click_ad|
|2024-08-20 21:33:09|   1145|   logout|
|2023-03-07 09:09:37|   1145| click_ad|
|2023-08-17 03:02:17|   1060|view_page|
|2024-04-03 19:26:13|   1041| click_ad|
|2024-05-21 06:30:14|   1027|    login|
+-------------------+-------+---------+
only showing top 20 rows



Conte o número de ações realizadas por cada usuário.

In [18]:
df_result = df_log.groupBy('user_id').agg(F.count('action').alias('qtdAction'))
df_result.show(truncate=False)

+-------+---------+
|user_id|qtdAction|
+-------+---------+
|1088   |33       |
|1025   |43       |
|1127   |40       |
|1084   |26       |
|1139   |46       |
|1143   |47       |
|1016   |35       |
|1005   |47       |
|1133   |41       |
|1068   |31       |
|1031   |34       |
|1051   |40       |
|1064   |36       |
|1034   |42       |
|1030   |45       |
|1019   |29       |
|1135   |38       |
|1148   |43       |
|1056   |36       |
|1085   |44       |
+-------+---------+
only showing top 20 rows



Encontre os 10 usuários mais ativos.

In [19]:
df_result = df_result.sort(F.desc('qtdAction')).limit(10)
df_result.show(truncate=False)

+-------+---------+
|user_id|qtdAction|
+-------+---------+
|1028   |54       |
|1115   |54       |
|1070   |54       |
|1053   |52       |
|1130   |51       |
|1050   |50       |
|1112   |50       |
|1009   |50       |
|1080   |49       |
|1026   |48       |
+-------+---------+



Salve o resultado em um arquivo CSV.

In [20]:
output_path = "result/result_case_problem.csv"
df_result.write.mode("overwrite").csv(output_path, header=True)

print("DataFrame salvo como CSV em: {0}".format(output_path))

DataFrame salvo como CSV em: result/result_case_problem.csv
