In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, stddev, percentile_approx, count, when
import time
import random
import sys

# Criar sessão do Spark sem Hive
spark = SparkSession.builder \
    .appName("PySpark_Alunos_DB") \
    .config("spark.sql.warehouse.dir", "file:///tmp/spark-warehouse") \
    .getOrCreate()

# Criar dados para turmas
dados_turmas = pd.DataFrame({
    "id_turma": [1, 2, 3, 4],
    "nome_turma": ["Turma A", "Turma B", "Turma C", "Turma D"],
    "disciplina": ["Matemática", "História", "Física", "Química"],
    "ano": [2024, 2023, 2024, 2023],
    "num_alunos": [30, 25, 20, 28]
})

df_turmas = spark.createDataFrame(dados_turmas)

# Criar dataset de alunos grande (500.000 alunos)
dados_alunos_grande = pd.DataFrame({
    "id_aluno": list(range(1, 500001)),
    "nome": [f"Aluno {i}" for i in range(1, 500001)],
    "idade": [random.randint(15, 20) for _ in range(500000)],
    "turma": [random.randint(1, 4) for _ in range(500000)],
    "desempenho": [random.choice(["Bom", "Excelente", "Médio", "Fraco"]) for _ in range(500000)]
})

df_alunos_pandas = pd.DataFrame(dados_alunos_grande)
df_alunos_pyspark = spark.createDataFrame(dados_alunos_grande)

# 🔎 CONSULTAS PANDAS
start = time.time()
media_pandas = df_alunos_pandas.groupby("turma")["idade"].mean()
mediana_pandas = df_alunos_pandas.groupby("turma")["idade"].median()
desvio_pandas = df_alunos_pandas.groupby("turma")["idade"].std()
end = time.time()
print(f"⏳ Tempo com Pandas: {end - start:.4f} segundos")

# 🔎 CONSULTAS Pyspark
start = time.time()

df_resultado = df_alunos_pyspark.groupBy("turma").agg(
    avg("idade").alias("media_idade"),
    percentile_approx("idade", 0.5).alias("mediana_idade"),
    stddev("idade").alias("desvio_padrao")
)
df_resultado.show()

end = time.time()
print(f"🚀 Tempo com PySpark: {end - start:.4f} segundos")

# 🔥 🚀 JOIN: Alunos + Turmas
start = time.time()
df_join = df_alunos_pyspark.join(df_turmas, df_alunos_pyspark.turma == df_turmas.id_turma, "inner")
df_join.show(5)
end = time.time()
print(f"🚀 Tempo do JOIN em PySpark: {end - start:.4f} segundos")

# 🔥 🚀 Criar nova coluna "Aprovado" baseada no desempenho
start = time.time()
df_aprovados = df_alunos_pyspark.withColumn("aprovado", when(col("desempenho").isin(["Bom", "Excelente"]), "Sim").otherwise("Não"))
df_aprovados.groupBy("aprovado").count().show()
end = time.time()
print(f"🚀 Tempo para criar nova coluna e contar aprovados: {end - start:.4f} segundos")

# 📌 4️⃣ Mostrar como PySpark pode escalar
print("🔥 PySpark pode processar milhões de registos distribuídos em múltiplos nós!")

# Fechar sessão
spark.stop()


⏳ Tempo com Pandas: 0.0331 segundos
+-----+------------------+-------------+------------------+
|turma|       media_idade|mediana_idade|     desvio_padrao|
+-----+------------------+-------------+------------------+
|    1| 17.49734697139204|           17| 1.711997007947214|
|    3|17.496832245336584|           17|1.7065528571756308|
|    2| 17.49942861709354|           17|1.7128282635165537|
|    4| 17.49734065951644|           18|1.7076331817736128|
+-----+------------------+-------------+------------------+

🚀 Tempo com PySpark: 4.1214 segundos
+--------+--------+-----+-----+----------+--------+----------+----------+----+----------+
|id_aluno|    nome|idade|turma|desempenho|id_turma|nome_turma|disciplina| ano|num_alunos|
+--------+--------+-----+-----+----------+--------+----------+----------+----+----------+
|       1| Aluno 1|   16|    1|       Bom|       1|   Turma A|Matemática|2024|        30|
|       6| Aluno 6|   19|    1|       Bom|       1|   Turma A|Matemática|2024|        