In [1]:
%pip install pyspark



# Importando as bibliotecas necessárias e configurando o Spark:

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, expr
import random

spark = SparkSession \
    .builder \
    .master ("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

print("Spark configurado com sucesso!")


Spark configurado com sucesso!


# Etapa 1: Ler o arquivo e mostrar o DataFrame

In [7]:
df_nomes = spark.read.text("nomes_aleatorios.txt").toDF("nomes")

df_nomes.show(5)

+----------------+
|           nomes|
+----------------+
|  Frances Bennet|
|   Jamie Russell|
|  Edward Kistler|
|   Sheila Maurer|
|Donald Golightly|
+----------------+
only showing top 5 rows



# Etapa 2: Renomear a coluna e verificar o schema

In [8]:
df_nomes = df_nomes.withColumnRenamed("nomes", "Nomes")

df_nomes.printSchema()

df_nomes.show(10)

root
 |-- Nomes: string (nullable = true)

+-----------------+
|            Nomes|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



# Etapa 3: Adicionar coluna "Escolaridade"

In [9]:
escolaridade_opcoes = ["Fundamental", "Medio", "Superior"]

df_nomes = df_nomes.withColumn(
    "Escolaridade",
    expr(f"array('{escolaridade_opcoes[0]}', '{escolaridade_opcoes[1]}', '{escolaridade_opcoes[2]}')[cast(rand() * 3 as int)]")
)

df_nomes.show(10)

+-----------------+------------+
|            Nomes|Escolaridade|
+-----------------+------------+
|   Frances Bennet|    Superior|
|    Jamie Russell|       Medio|
|   Edward Kistler|       Medio|
|    Sheila Maurer|       Medio|
| Donald Golightly|       Medio|
|       David Gray| Fundamental|
|      Joy Bennett|       Medio|
|      Paul Kriese| Fundamental|
|Berniece Ornellas|    Superior|
|    Brian Farrell|    Superior|
+-----------------+------------+
only showing top 10 rows



# Etapa 4: Adicionar coluna "País"

In [11]:
paises = ["Argentina", "Bolívia", "Brasil", "Chile", "Colômbia", "Equador",
          "Guiana", "Paraguai", "Peru", "Suriname", "Uruguai", "Venezuela", "Guiana Francesa"]

# expressão para atribuir um país aleatório
paises_expr = "array(" + ", ".join([f"'{pais}'" for pais in paises]) + ")[cast(rand() * 13 as int)]"

# adicionando a coluna "Pais" ao DataFrame
df_nomes = df_nomes.withColumn("Pais", expr(paises_expr))

df_nomes.show(10)


+-----------------+------------+---------+
|            Nomes|Escolaridade|     Pais|
+-----------------+------------+---------+
|   Frances Bennet|    Superior| Colômbia|
|    Jamie Russell|       Medio|  Uruguai|
|   Edward Kistler|       Medio|Argentina|
|    Sheila Maurer|       Medio| Paraguai|
| Donald Golightly|       Medio|  Uruguai|
|       David Gray| Fundamental|  Bolívia|
|      Joy Bennett|       Medio| Paraguai|
|      Paul Kriese| Fundamental|    Chile|
|Berniece Ornellas|    Superior|    Chile|
|    Brian Farrell|    Superior|  Bolívia|
+-----------------+------------+---------+
only showing top 10 rows



# Etapa 5: Adicionar coluna "AnoNascimento"

In [12]:
# valores aleatórios entre 1945 e 2010
df_nomes = df_nomes.withColumn(
    "AnoNascimento",
    expr("cast(rand() * (2010 - 1945 + 1) + 1945 as int)")
)

df_nomes.show(10)


+-----------------+------------+---------+-------------+
|            Nomes|Escolaridade|     Pais|AnoNascimento|
+-----------------+------------+---------+-------------+
|   Frances Bennet|    Superior| Colômbia|         2010|
|    Jamie Russell|       Medio|  Uruguai|         1961|
|   Edward Kistler|       Medio|Argentina|         1988|
|    Sheila Maurer|       Medio| Paraguai|         2000|
| Donald Golightly|       Medio|  Uruguai|         2006|
|       David Gray| Fundamental|  Bolívia|         2000|
|      Joy Bennett|       Medio| Paraguai|         1948|
|      Paul Kriese| Fundamental|    Chile|         1967|
|Berniece Ornellas|    Superior|    Chile|         1953|
|    Brian Farrell|    Superior|  Bolívia|         1969|
+-----------------+------------+---------+-------------+
only showing top 10 rows



# Etapa 6: Selecionar pessoas nascidas neste século

In [13]:
df_select = df_nomes.filter(col("AnoNascimento") >= 2000)

df_select.select("Nomes").show(10)

+----------------+
|           Nomes|
+----------------+
|  Frances Bennet|
|   Sheila Maurer|
|Donald Golightly|
|      David Gray|
|    Ernest Hulet|
|  Lorenzo Woodis|
| Helen Blackwell|
|    Roxie Bernal|
|     Donald Vogt|
|   Milton Dillon|
+----------------+
only showing top 10 rows



# Etapa 7: Repetir com Spark SQL

In [14]:
df_nomes.createOrReplaceTempView("pessoas")

df_sql_select = spark.sql("SELECT Nomes FROM pessoas WHERE AnoNascimento >= 2000")
df_sql_select.show(10)

+----------------+
|           Nomes|
+----------------+
|  Frances Bennet|
|   Sheila Maurer|
|Donald Golightly|
|      David Gray|
|    Ernest Hulet|
|  Lorenzo Woodis|
| Helen Blackwell|
|    Roxie Bernal|
|     Donald Vogt|
|   Milton Dillon|
+----------------+
only showing top 10 rows



# Etapa 8: Contar Millenials (1980-1994)

In [15]:
millennials_count = df_nomes.filter(
    (col("AnoNascimento") >= 1980) & (col("AnoNascimento") <= 1994)
).count()

print(f"Número de Millenials: {millennials_count}")

Número de Millenials: 2271572


# Etapa 9: Contar Millenials com Spark SQL

In [16]:
millennials_sql = spark.sql(
    "SELECT COUNT(*) AS Millenials FROM pessoas WHERE AnoNascimento BETWEEN 1980 AND 1994"
)
millennials_sql.show()

+----------+
|Millenials|
+----------+
|   2271572|
+----------+



# Etapa 10: Contar pessoas por país e geração

In [17]:
# criando a coluna "Geração"
df_nomes = df_nomes.withColumn(
    "Geracao",
    expr("""
        CASE
            WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
            WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
            WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials'
            WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geração Z'
        END
    """)
)

# tabela temporária
df_nomes.createOrReplaceTempView("pessoas")

# contando pessoas por país e geração
query = """
    SELECT Pais, Geracao, COUNT(*) AS Quantidade
    FROM pessoas
    GROUP BY Pais, Geracao
    ORDER BY Pais, Geracao, Quantidade
"""
df_resultado = spark.sql(query)
df_resultado.show()

+---------+------------+----------+
|     Pais|     Geracao|Quantidade|
+---------+------------+----------+
|Argentina|Baby Boomers|    233716|
|Argentina|   Geração X|    175026|
|Argentina|   Geração Z|    186371|
|Argentina| Millennials|    174376|
|  Bolívia|Baby Boomers|    233638|
|  Bolívia|   Geração X|    175512|
|  Bolívia|   Geração Z|    186061|
|  Bolívia| Millennials|    174717|
|   Brasil|Baby Boomers|    232915|
|   Brasil|   Geração X|    174776|
|   Brasil|   Geração Z|    186766|
|   Brasil| Millennials|    174417|
|    Chile|Baby Boomers|    232961|
|    Chile|   Geração X|    174571|
|    Chile|   Geração Z|    186353|
|    Chile| Millennials|    174459|
| Colômbia|Baby Boomers|    233590|
| Colômbia|   Geração X|    174281|
| Colômbia|   Geração Z|    186769|
| Colômbia| Millennials|    174656|
+---------+------------+----------+
only showing top 20 rows

