# Etapa 1

## Import

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F

Definição do `Spark Session` com o context para habilitar o módulo SQL

In [2]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/06/20 12:17:33 WARN Utils: Your hostname, vlls3rkr-pc, resolves to a loopback address: 127.0.1.1; using 192.168.0.15 instead (on interface wlp2s0)
25/06/20 12:17:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Adicionar o código para ler o arquivo `nomes_aleatorios.txt` através do comando `spark.read.csv`. Carregar para dentro um dataframe chamado `df_nomes` e, por fim, listar algumas linhas através do método show.

In [3]:
df_nomes = spark.read.csv("../1-massa_de_dados/data/nomes_aleatorios.txt")
df_nomes.show(5)

+------------------+
|               _c0|
+------------------+
|       Roy Fuhrman|
|Roosevelt Kirchner|
|   Milton Longoria|
|     Thomas Ringel|
|     John Mckinney|
+------------------+
only showing top 5 rows


# Etapa 2

Adicionar o código para renomear a coluna para **Nomes** e imprimir o esquema e mostrar 10 linhas do dataframe.

In [4]:
df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")
df_nomes.printSchema()

root
 |-- Nomes: string (nullable = true)



In [5]:
df_nomes.show(10)

+-------------------+
|              Nomes|
+-------------------+
|        Roy Fuhrman|
| Roosevelt Kirchner|
|    Milton Longoria|
|      Thomas Ringel|
|      John Mckinney|
|Christopher Timothy|
|       Lisa Hammond|
|       Olga Naranjo|
|     Thomas Padgett|
|      Samuel Silver|
+-------------------+
only showing top 10 rows


# Etapa 3

Ao dataframe `df_nomes`, adicione nova coluna chamada **Escolaridade** e atribua para cada linha um dos três valores de forma aleatória: Fundamental, Medio ou Superior. Para esta etapa, evite usar funções de iteração, como por exemplo: *for*, *while*, entre outras. Dê preferências aos métodos oferecidos pelo próprio Spark.

In [6]:
df_nomes = df_nomes.withColumn(
    "Escolaridade",
    F.when(F.rand() < 1/3, "Fundamental")
     .when(F.rand() < 2/3, "Médio")
     .otherwise("Superior")
)

In [7]:
df_nomes.show(10)

+-------------------+------------+
|              Nomes|Escolaridade|
+-------------------+------------+
|        Roy Fuhrman|    Superior|
| Roosevelt Kirchner| Fundamental|
|    Milton Longoria| Fundamental|
|      Thomas Ringel|       Médio|
|      John Mckinney| Fundamental|
|Christopher Timothy|       Médio|
|       Lisa Hammond|    Superior|
|       Olga Naranjo|    Superior|
|     Thomas Padgett| Fundamental|
|      Samuel Silver|    Superior|
+-------------------+------------+
only showing top 10 rows


# Etapa 4

Ao dataframe `df_nomes` adicione nova coluna chamada *Pais* e atribua para cada linha o nome de um dos 13 países de América do Sul, de forma aleatória. Para esta etapa, evite usar funções de iteração, como por exemplo: *for*, *while*, entre outras. Dê preferências aos métodos oferecidos pelo próprio Spark.

In [8]:
paises = ['Brasil', 'Chile', 'Colômbia', 'Paraguai', 'Suriname', 'Venezuela', 'Paraguai', 'Peru', 'Equador', 'Bolívia', 'Uruguai', 'Guiana', 'Argentina']

paises_sql = "'" + "','".join(paises) + "'"

df_nomes = df_nomes.withColumn(
    "Pais",
    F.expr(f"element_at(array({paises_sql}), cast(rand() * {len(paises)} + 1 as int))")
)

In [9]:
df_nomes.show(10)

+-------------------+------------+---------+
|              Nomes|Escolaridade|     Pais|
+-------------------+------------+---------+
|        Roy Fuhrman|    Superior|Argentina|
| Roosevelt Kirchner| Fundamental| Paraguai|
|    Milton Longoria| Fundamental|  Bolívia|
|      Thomas Ringel|       Médio| Paraguai|
|      John Mckinney| Fundamental|Venezuela|
|Christopher Timothy|       Médio| Colômbia|
|       Lisa Hammond|    Superior|   Brasil|
|       Olga Naranjo|    Superior|    Chile|
|     Thomas Padgett| Fundamental|   Brasil|
|      Samuel Silver|    Superior|Venezuela|
+-------------------+------------+---------+
only showing top 10 rows


# Etapa 5

Ao dataframe `df_nomes`, adicione nova coluna chamada AnoNascimento e atribua para cada linha um valor de ano entre 1945 e 1020, de forma aleatória. Para esta estapa, evite usar funções de iteração, como por exemplo: *for*, *while*, entre outras. Dê preferências aos métodos oferecidos pelo próprio Spark.

In [10]:
df_nomes = df_nomes.withColumn(
    "AnoNascimento",
    F.floor(F.rand() * (2010 - 1940 + 1) + 1940)
) 

In [11]:
df_nomes.show(10)

+-------------------+------------+---------+-------------+
|              Nomes|Escolaridade|     Pais|AnoNascimento|
+-------------------+------------+---------+-------------+
|        Roy Fuhrman|    Superior|Argentina|         1953|
| Roosevelt Kirchner| Fundamental| Paraguai|         1942|
|    Milton Longoria| Fundamental|  Bolívia|         2006|
|      Thomas Ringel|       Médio| Paraguai|         1984|
|      John Mckinney| Fundamental|Venezuela|         1988|
|Christopher Timothy|       Médio| Colômbia|         1946|
|       Lisa Hammond|    Superior|   Brasil|         1957|
|       Olga Naranjo|    Superior|    Chile|         1957|
|     Thomas Padgett| Fundamental|   Brasil|         2002|
|      Samuel Silver|    Superior|Venezuela|         1993|
+-------------------+------------+---------+-------------+
only showing top 10 rows


# Etapa 6

Usando o método select do dataframe `df_nomes`, selecione as pessoas que nasceram neste século. Armazene o resultado em outro dataframe chamado df_select e mostre 10 nomes deste.

In [12]:
df_select = df_nomes[df_nomes['AnoNascimento'] >= 2001].select(['Nomes'])

In [13]:
df_select.show(10)

+---------------+
|          Nomes|
+---------------+
|Milton Longoria|
| Thomas Padgett|
|  Marita Hively|
|   Maria Thomas|
|   Erica Wagner|
|   Wilbur Jones|
|   Susan Gillum|
| Theresa Lyford|
|   Michael Page|
|   Helen Taylor|
+---------------+
only showing top 10 rows


# Etapa 7

Usando `Spark SQL` repita o processo da etapa 6. Lembre-se que, para trablharmos com `SparkSQL` precisamos registrar uma tabela temporária e depois executar o comando `SQL`. Abaixo um exemplo de como executar comandos `SQL` com `SparkSQL`

In [14]:
df_nomes.createOrReplaceTempView("tabela")

df_select = spark.sql("""
    SELECT Nomes
    FROM tabela
    WHERE AnoNascimento >= 2001
""")

df_select.show(10)

+---------------+
|          Nomes|
+---------------+
|Milton Longoria|
| Thomas Padgett|
|  Marita Hively|
|   Maria Thomas|
|   Erica Wagner|
|   Wilbur Jones|
|   Susan Gillum|
| Theresa Lyford|
|   Michael Page|
|   Helen Taylor|
+---------------+
only showing top 10 rows


# Etapa 8

Usando o método filter do Dataframe `df_nomes` conte o número de pessoas que são da geração *Millenials* (nascidos entre 1980 e 1994) no Dataset.

In [15]:
df_nomes.filter((df_nomes['AnoNascimento'] >= 1980) & (df_nomes['AnoNascimento'] <= 1994)).select(['Nomes']).count()

                                                                                

2113022

# Etapa 9
Repita o processo da etapa 8 utilizando `Spark SQL`

In [16]:
df_nomes.createOrReplaceTempView("tabela")

millennials = spark.sql("""
    SELECT COUNT(*) AS total
    FROM tabela
    WHERE AnoNascimento BETWEEN 1980 AND 1994
""")

millennials.show()



+-------+
|  total|
+-------+
|2113022|
+-------+



                                                                                

# Etapa 10

Usando `Spark SQL` obtenha a quantidade de pessoas de cada país para cada uma das gerações abaixo. Armazene o resultado em um novo dataframe e depois mostre todas as linhas em ordem crescente de País, Geração e Quantidade:

- Baby Boomers -- nascidos entre 1944 e 1964;
- Geração X -- nascidos entre 1965 e 1979;
- Millennials (Geração Y) -- nascidos entre 1980 e 1994;
- Geração Z -- nascidos entre 1995 e 2015.

In [17]:
df_nomes.createOrReplaceTempView("tabela")

geracoes = spark.sql("""
    SELECT 
        Pais,
        CASE 
            WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
            WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
            WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials (Geração Y)'
            WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geração Z'
        END AS Geracao,
        COUNT(*) AS Quantidade
    FROM tabela
    WHERE AnoNascimento BETWEEN 1944 AND 2015
    GROUP BY Pais, Geracao
    ORDER BY Pais ASC, Geracao ASC, Quantidade ASC
""")

geracoes.show(n=geracoes.count(), truncate=False)


[Stage 20:>                                                         (0 + 8) / 8]

+---------+-----------------------+----------+
|Pais     |Geracao                |Quantidade|
+---------+-----------------------+----------+
|Argentina|Baby Boomers           |226644    |
|Argentina|Geração X              |163189    |
|Argentina|Geração Z              |173560    |
|Argentina|Millennials (Geração Y)|162360    |
|Bolívia  |Baby Boomers           |226645    |
|Bolívia  |Geração X              |161935    |
|Bolívia  |Geração Z              |172842    |
|Bolívia  |Millennials (Geração Y)|162733    |
|Brasil   |Baby Boomers           |227480    |
|Brasil   |Geração X              |162613    |
|Brasil   |Geração Z              |174053    |
|Brasil   |Millennials (Geração Y)|163004    |
|Chile    |Baby Boomers           |227509    |
|Chile    |Geração X              |163062    |
|Chile    |Geração Z              |172956    |
|Chile    |Millennials (Geração Y)|162263    |
|Colômbia |Baby Boomers           |226947    |
|Colômbia |Geração X              |162795    |
|Colômbia |Ge

                                                                                

# Fechando a Seção Spark

In [18]:
spark.sparkContext.stop()
spark.stop()