<img src="./barquinho.png">

# Teste Big Data da Beijaflore

Neste teste iremos avaliar as competências esperadas de PySpark e ETL.

## 1. Preparação

### 1.1 Depedências

In [93]:
# Importe aqui as bibliotecas usadas no desafio
# Importe aqui as bibliotecas usadas no desafio
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import *

### 1.2 Spark e SQL Context

In [2]:
sc = SparkContext.getOrCreate()# inicialize o Spark Context
sqlContext = SQLContext(sc)

### 1.3 Schema do Dataframe

Antes de carregar o DataFrame defina explicitamente o schema do dataset. Para entender melhor a estrutura dos dados confira a amostra `first100.csv`

In [36]:
schema = StructType([StructField("ID", IntegerType(), False),
        StructField("country", StringType(), False),
        StructField("description", StringType(), False),
        StructField("designation", StringType(), True),
        StructField("points", IntegerType(), True),
        StructField("price", DoubleType(), True),
        StructField("province", StringType(), True),
        StructField("region_1", StringType(), True),
        StructField("region_2", StringType(), True),
        StructField("taster_name", StringType(), True),
        StructField("taster_twitter_handle", StringType(), True),
        StructField("title", StringType(), True),
        StructField("variety", StringType(), True),
        StructField("winery", StringType(), True)
])

### 1.4 Carregamento dos dados

Carregue os dados do arquivo `reviews.csv` em um DataFrame

In [37]:
ds = sqlContext.read.csv("reviews.csv", schema)

In [38]:
ds.printSchema()

root
 |-- id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- description: string (nullable = true)
 |-- designation: string (nullable = true)
 |-- points: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- province: string (nullable = true)
 |-- region_1: string (nullable = true)
 |-- region_2: string (nullable = true)
 |-- taster_name: string (nullable = true)
 |-- taser_twitter_handle: string (nullable = true)
 |-- title: string (nullable = true)
 |-- variety: string (nullable = true)
 |-- winery: string (nullable = true)



O esquema deve ser parecido com o exemplo acima.

## 2. Desafios

### 2.1 Quantos registros com vinhos franceses existem no dataset?

**Resposta:** ...

In [None]:
dsFrance = ds.filter(ds.country == 'France')
dsFrance.count()

### 2.2 Salve todos registros de vinhos franceses em um arquivo parquet.

https://parquet.apache.org/

In [49]:
france = dsFrance.write.parquet("output/France.parquet")

In [50]:
# Salve o dataframe france em disco no formato correto
dsFrance.write.csv('France.csv')

### 2.3 Qual a província Italiana com o maior número de reviews no dataset?

**Resposta: ** ...

In [None]:
ds.filter(ds.country == 'Italy').groupby(ds.province).count().orderBy('count', ascending=False).show(1)

### 2.4 Dentro dessa província, quem fez mais reviews?
**Resposta: **

In [None]:
ds.filter(ds.province == 'Tuscany').groupby(ds.taster_name).count().orderBy('count', ascending=False).show(1)

### 2.5 Salve todas reviews desta pessoa no formato JSON com as seguintes colunas: designation, points, price, title, variety e winery.

In [69]:
kerin = ds.filter(ds.taster_name == 'Kerin O’Keefe').select(ds.designation, ds.points, ds.points, ds.price, ds.title, ds.variety, ds.winery)

In [72]:
kerin.write.json('kerin.json')

### 2.6 Qual o vinho com mais pontos da variedade mais comentada?
**Resposta:** ...

In [None]:
        ds.registerTempTable("reviews")
        dsCountVariety = sqlContext.sql("select variety, count(*) as Cont from reviews group by variety order by Cont desc").limit(1)
        dsTitle = ds.join(dsCountVariety, ds.variety == dsCountVariety.variety).select(dsTitle.title, dsTitle.points)
        dsTitle.orderBy('points', ascending=False).show(1)

**Teste criado por Diego Rodrigues da Beijaflore.**