# Apache Spark

## Etapa 1

### Instruções

Inicialmente iremos preparar o ambiente, definindo o diretório onde nosso código será desenvolvido. Para este diretório iremos copiar o arquivo nomes_aleatorios.txt.

Após, em nosso script Python, devemos importar as bibliotecas necessárias:

```
from pyspark.sql import SparkSession

from pyspark import SparkContext, SQLContext
```
Aplicando as bibliotecas do Spark, podemos definir a Spark Session e sobre ela definir o Context para habilitar o módulo SQL

```spark = SparkSession \

                .builder \

                .master("local[*]")\

                .appName("Exercicio Intro") \

                .getOrCreate()```

Nesta etapa, adicione código para ler o arquivo nomes_aleatorios.txt através do comando spark.read.csv. Carregue-o para dentro de um dataframe chamado df_nomes e, por fim, liste algumas linhas através do método show. Exemplo: df_nomes.show(5)

### Código

In [3]:
# Importa a biblioteca findspark para iniciar o serviço do Spark na máquina que hospeda o meu notebook
import findspark
findspark.init()
import pyspark

# Importa demais bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

# Aplica as bibliotefcas do spark
spark = SparkSession\
    .builder\
    .master("local[*]")\
    .appName("Exercicio Intro")\
    .getOrCreate()

# Lê o arquivo de texto
df_nomes = spark.read.csv("Files/nomes_aleatorios.txt")

# Exibe 5 linhas do dataframe
df_nomes.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/11 19:35:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+----------------+
|             _c0|
+----------------+
|    David Staten|
|  Heather Rivera|
|     Lester Wood|
|William Sandoval|
| Gwendolyn Hines|
+----------------+
only showing top 5 rows



## Etapa 2

### Instruções

No Python, é possível acessar uma coluna de um objeto dataframe pelo atributo (por exemplo df_nomes.nome) ou por índice (df_nomes['nome']). Enquanto a primeira forma é conveniente para a exploração de dados interativos, você deve usar o formato de índice, pois caso algum nome de coluna não esteja de acordo seu código irá falhar.

Como não informamos no momento da leitura do arquivo, o Spark não identificou o Schema por padrão e definiu todas as colunas como string. Para ver o Schema, use o método df_nomes.printSchema().

Nesta etapa, será necessário adicionar código para renomear a coluna para Nomes, imprimir o esquema e mostrar 10 linhas do dataframe.

### Código

In [4]:
# Renomeia a coluna para Nomes
df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")
# Exibe o esquema
df_nomes.printSchema()
# Exibe 10 linhas do dataframe
df_nomes.show(10)

root
 |-- Nomes: string (nullable = true)

+------------------+
|             Nomes|
+------------------+
|      David Staten|
|    Heather Rivera|
|       Lester Wood|
|  William Sandoval|
|   Gwendolyn Hines|
|Constance Mcmillan|
|        Billy Diaz|
|      Marcos Rolfe|
|         Cara Cole|
|         John Gant|
+------------------+
only showing top 10 rows



## Etapa 3

### Intruções

Ao dataframe (df_nomes), adicione nova coluna chamada Escolaridade e atribua para cada linha um dos três valores de forma aleatória: Fundamental, Medio ou Superior.

Para esta etapa, evite usar funções de iteração, como por exemplo: for, while, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

### Código

In [5]:
# Importa a biblioteca functions
from pyspark.sql import functions as f

df_nomes = df_nomes.withColumn(
  # Gera uma coluna com nome Escolaridade com um array contendo os valores declarados
  'Escolaridade',
  f.array(
  f.lit('Fundamental'),
  f.lit('Medio'),
  f.lit('Superior'),
  # Seleciona de forma aleatória os valores declarados para a coluna Escolaridade
  ).getItem((f.rand()*3).cast("int")
  )
)
# Exibe 10 linhas do dataframe
df_nomes.show(10)



+------------------+------------+
|             Nomes|Escolaridade|
+------------------+------------+
|      David Staten|    Superior|
|    Heather Rivera| Fundamental|
|       Lester Wood|       Medio|
|  William Sandoval| Fundamental|
|   Gwendolyn Hines|       Medio|
|Constance Mcmillan|       Medio|
|        Billy Diaz| Fundamental|
|      Marcos Rolfe| Fundamental|
|         Cara Cole|    Superior|
|         John Gant| Fundamental|
+------------------+------------+
only showing top 10 rows



## Etapa 4

### Instruções

Ao dataframe (df_nomes), adicione nova coluna chamada Pais e atribua para cada linha o nome de um dos 13 países da América do Sul, de forma aleatória.

Para esta etapa, evite usar funções de iteração, como por exemplo: for, while, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

### Código

In [6]:
# Importa as bibliotecas e funções necessárias
import hashlib
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Gera um array contendo o nome dos paíes da América do Sul
countries = ['Argentina', 'Bolívia', 'Brasil', 'Chile', 'Colômbia', 'Equador', 'Guiana', 'Guiana Francesa', 'Paraguai', 'Peru', 'Suriname', 'Uruguai', 'Venezuela']

# Define a função que seleciona aleatoriamente países da lista anterior
def choose_country(x):
  # Usa a função SHA-256 hash para gerar um valor de hash de 32-byte
  hash_value = hashlib.sha256(str(x).encode()).digest()
  # Converte os primeiros 2 bytes do valor do hash em inteiro
  random_int = int.from_bytes(hash_value[:2], byteorder="big")
  # Calcula o índice do nome aleatório do país aleatório
  index = random_int % len(countries)
  # Retorna o nome do país aleatório
  return countries[index]

# Armazena a função choose_country() como UDF 
choose_country_udf = udf(choose_country, StringType())
# Cria uma nova coluna chamada Pais e aplica a função anterior
df_nomes = df_nomes.withColumn('Pais', choose_country_udf("Nomes"))

df_nomes.show(10)

[Stage 4:>                                                          (0 + 1) / 1]

+------------------+------------+---------+
|             Nomes|Escolaridade|     Pais|
+------------------+------------+---------+
|      David Staten|    Superior| Paraguai|
|    Heather Rivera| Fundamental|Argentina|
|       Lester Wood|       Medio|  Bolívia|
|  William Sandoval| Fundamental| Suriname|
|   Gwendolyn Hines|       Medio|  Bolívia|
|Constance Mcmillan|       Medio|   Brasil|
|        Billy Diaz| Fundamental|  Equador|
|      Marcos Rolfe| Fundamental|  Bolívia|
|         Cara Cole|    Superior|Venezuela|
|         John Gant| Fundamental|  Equador|
+------------------+------------+---------+
only showing top 10 rows



                                                                                

## Etapa 5

### Intruções

Ao dataframe (df_nomes), adicione nova coluna chamada AnoNascimento e atribua para cada linha um valor de ano entre 1945 e 2010, de forma aleatória. 

Para esta etapa, evite usar funções de iteração, como por exemplo: for, while, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

### Código

In [7]:
# Importa as bibliotecas e funções necessárias
from random import randint
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# Define a função que gera valores aleatórios entre 1945 e 2010
def random_year(x):
    # Usa a função SHA-256 hash para gerar um valor de hash de 32-byte
    hash_value = hashlib.sha256(str(x).encode()).digest()
    # Converte os primeiros 4 bytes do valor do hash em inteiro
    random_int = int.from_bytes(hash_value[:4], byteorder="big")
    # Calcula o ano leatório entre 1945 e 2010
    year = 1945 + (random_int % 65)
    # Retorna o ano aleatório
    return year
# Armazena a função random_year() como um UDF
random_year_udf = udf(random_year, IntegerType())
# Cria uma nova coluna chamada AnoNascimento e aplica a função anterior
df_nomes = df_nomes.withColumn("AnoNascimento", random_year_udf("Nomes"))
# Exibe 10 linhas do dataframe
df_nomes.show(10)

+------------------+------------+---------+-------------+
|             Nomes|Escolaridade|     Pais|AnoNascimento|
+------------------+------------+---------+-------------+
|      David Staten|    Superior| Paraguai|         1989|
|    Heather Rivera| Fundamental|Argentina|         1974|
|       Lester Wood|       Medio|  Bolívia|         1972|
|  William Sandoval| Fundamental| Suriname|         1989|
|   Gwendolyn Hines|       Medio|  Bolívia|         1976|
|Constance Mcmillan|       Medio|   Brasil|         1980|
|        Billy Diaz| Fundamental|  Equador|         1976|
|      Marcos Rolfe| Fundamental|  Bolívia|         2007|
|         Cara Cole|    Superior|Venezuela|         1985|
|         John Gant| Fundamental|  Equador|         1977|
+------------------+------------+---------+-------------+
only showing top 10 rows



## Etapa 6

### Instruções

Usando o método select do dataframe (df_nomes), selecione as pessoas que nasceram neste século. Armazene o resultado em outro dataframe chamado df_select e mostre 10 nomes deste.

### Código

In [8]:
# Seleciona e armazena as colunas Nomes e AnoNascimento quando o ano é maior ou igual a 2000 em um novo dataframe 
df_select = df_nomes.select('Nomes', 'AnoNascimento').where("AnoNascimento > 2000")
# Exibe 10 linhas do novo dataframe
df_select.show(10)

[Stage 6:>                                                          (0 + 1) / 1]

+---------------+-------------+
|          Nomes|AnoNascimento|
+---------------+-------------+
|   Marcos Rolfe|         2007|
|  Lola Gauthier|         2009|
|    Kari Dewolf|         2001|
|    Jamie Mills|         2004|
|   Samuel Polak|         2003|
|      Jamie Low|         2002|
|  Sallie Gammon|         2001|
|Aaron Kondracki|         2007|
|   Corinna Hood|         2001|
|    Anna Schaus|         2009|
+---------------+-------------+
only showing top 10 rows



                                                                                

Usando Spark SQL repita o processo da Pergunta 6. Lembre-se que, para trabalharmos com SparkSQL, precisamos registrar uma tabela temporária e depois executar o comando SQL. Abaixo um exemplo de como executar comandos SQL com SparkSQL:

```df_nomes.registerTempTable("pessoas")

sqlContext.sql("select * from pessoas").show()
```

## Etapa 7

### Instruções

Usando Spark SQL repita o processo da Pergunta 6. Lembre-se que, para trabalharmos com SparkSQL, precisamos registrar uma tabela temporária e depois executar o comando SQL. Abaixo um exemplo de como executar comandos SQL com SparkSQL:

```
df_nomes.registerTempTable("pessoas")

sqlContext.sql("select * from pessoas").show()
```

### Código

In [9]:
# Registra a tabela temporária
df_nomes.registerTempTable("pessoas")
# Executa o comando sql
df_select = spark.sql(
    """select Nomes, AnoNascimento
    from pessoas
    where AnoNascimento > 2000"""
)
# Exibe o resultado
df_select.show()



+---------------+-------------+
|          Nomes|AnoNascimento|
+---------------+-------------+
|   Marcos Rolfe|         2007|
|  Lola Gauthier|         2009|
|    Kari Dewolf|         2001|
|    Jamie Mills|         2004|
|   Samuel Polak|         2003|
|      Jamie Low|         2002|
|  Sallie Gammon|         2001|
|Aaron Kondracki|         2007|
|   Corinna Hood|         2001|
|    Anna Schaus|         2009|
|   Samuel Polak|         2003|
|Rebecca Marquez|         2003|
|  Daniel Pigeon|         2004|
|   Marcos Rolfe|         2007|
|   Ruby Goodell|         2002|
|    James Klein|         2007|
|      Mary Camp|         2007|
|   Dixie Mercer|         2008|
|  Cody Lucchese|         2007|
|     Terry Wood|         2005|
+---------------+-------------+
only showing top 20 rows



                                                                                

## Etapa 8

### Instruções

Usando o método select do Dataframe df_nomes, Conte o número de pessoas que são da geração Millennials (nascidos entre 1980 e 1994) no Dataset

### Código

In [10]:

# Seleciona e armazena a colunas Nomes quando o ano está entre 1980 e 1994
df_millennials = df_nomes.select('Nomes').where("AnoNascimento between 1980 and 1994")
# Conta a quantidade de linhas e armazena em uma variável
count_millennials =  df_millennials.count()
# Exibe o resultado
print('Millennials:',count_millennials)



Millennials: 2269068


                                                                                

## Etapa 9

### Instruções

Repita o processo da Pergunta 8 utilizando Spark SQL

### Código

In [11]:
# Executa o comando sql
df_select = spark.sql(
    """select count(Nomes) as Millennials
    from pessoas
    where AnoNascimento between 1980 and 1994
    """
)
# Exibe o resultado
df_select.show()



+-----------+
|Millennials|
+-----------+
|    2269068|
+-----------+



                                                                                

## Etapa 10

### Instruções

Usando Spark SQL, obtenha a quantidade de pessoas de cada país para uma das gerações abaixo. Armazene o resultado em um novo dataframe e depois mostre todas as linhas em ordem crescente de Pais, Geração e Quantidade

- Baby Boomers – nascidos entre 1944 e 1964;

- Geração X – nascidos entre 1965 e 1979;

- Millennials (Geração Y) – nascidos entre 1980 e 1994;

- Geração Z – nascidos entre 1995 e 2015.

### Código

In [18]:

# Executa o comando sql
df_count_country_generation = spark.sql(
"""select Pais,
       case 
           when AnoNascimento between 1944 and 1964 then 'Baby Boomers'
           when AnoNascimento between 1965 and 1979 then 'Geração X'
           when AnoNascimento between 1980 and 1994 then 'Millennials'
           when AnoNascimento between 1995 and 2015 then 'Geração Z'
       end as Geracao,
       count(*) as QuantidadePessoas
from pessoas
group by Pais, Geracao
order by Pais, Geracao, QuantidadePessoas
"""
)
# Exibe o resultado
df_count_country_generation.show()

                                                                                

+---------+------------+-----------------+
|     Pais|     Geracao|QuantidadePessoas|
+---------+------------+-----------------+
|Argentina|Baby Boomers|           279995|
|Argentina|   Geração X|           203229|
|Argentina|   Geração Z|           183468|
|Argentina| Millennials|           186933|
|  Bolívia|Baby Boomers|           230201|
|  Bolívia|   Geração X|           176548|
|  Bolívia|   Geração Z|           169703|
|  Bolívia| Millennials|           170518|
|   Brasil|Baby Boomers|           303896|
|   Brasil|   Geração X|           157202|
|   Brasil|   Geração Z|           153307|
|   Brasil| Millennials|           169124|
|    Chile|Baby Boomers|           240553|
|    Chile|   Geração X|           183925|
|    Chile|   Geração Z|           226742|
|    Chile| Millennials|           163349|
| Colômbia|Baby Boomers|           259089|
| Colômbia|   Geração X|           193126|
| Colômbia|   Geração Z|           163303|
| Colômbia| Millennials|           189319|
+---------+

                                                                                

52