# Stack - Engenharia de Dados do Zero

- **Task**
  - Converta os arquivos em csv para parquet e os envie para processing zone.

- **Dataset**
  - Usaremos esse dataset https://www.kaggle.com/nhs/general-practice-prescribing-data

### Modos de leitura
- **permissive**: *Define todos os campos para NULL quando encontra registros corrompidos e coloca todos registros corrompidos em uma coluna chamada _corrupt_record.* (default)

- **dropMalformed**: *Apaga uma linha corrompida ou que este não consiga ler.*

- **failFast**: *Falha imediatamente quando encontra uma linha que não consiga ler.*

In [0]:
# ler arquivos vários arquivos csv do dbfs com spark
# Lendo todos os arquivos .csv do diretório bigdata (>4GB)

df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.load("/FileStore/tables/bigdata/*.csv")

In [0]:
df.printSchema()

In [0]:
# imprime as 10 primeiras linhas do dataframe
display(df.head(10))

In [0]:
# conta a quantidade de linhas
df.count()

#### Leva os dados convertidos para a Processing Zone

- *Atente para NÃO escrever e ler arquivos parquet em versoes diferentes*

In [0]:
# Converte para formato parquet
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/processing/df-parquet-file.parquet")

In [0]:
# lendo arquivos parquet
# atente para a velocidade de leitura

df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/processing/df-parquet-file.parquet")

In [0]:
# conta a quantidade de linhas do dataframe
df_parquet.count()

In [0]:
%scala
// script para pegar tamanho em Gigabytes
val path="/FileStore/tables/processing/df-parquet-file.parquet"
val filelist=dbutils.fs.ls(path)
val df_temp = filelist.toDF()
df_temp.createOrReplaceTempView("adlsSize")


In [0]:
%sql
-- consulta a view criada.
select round(sum(size)/(1024*1024*1024),3) as sizeInGB from adlsSize

In [0]:
display(df_parquet.head(10))

In [0]:
#Add columns to DataFrame using SQL
df_parquet.createOrReplaceTempView("view_df_parquet")

spark.sql("SELECT BNF_CODE as Bnf_code \
                  ,SUM(ACT_COST) as Soma_Act_cost \
                  ,SUM(QUANTITY) as Soma_Quantity \
                  ,SUM(ITEMS) as Soma_items \
                  ,SUM(ACT_COST) as Media_Act_cost \
           FROM view_df_parquet \
           GROUP BY bnf_code").show()

# Avançando com Pyspark
- Linguagem em plena ascensão.
- Linguagem simples e com uma comunidade crescente.
- Velocidade idêntica para as Apis SQL, Scala.

#### Criando um schema
- A opção **infer_schema** nem sempre vai definir o melhor datatype.
- Melhora a performance na leitura de grandes bases.
- Permite uma customização dos tipos das colunas.
- É importante saber para reescrita de aplicações. (Códigos pandas)

In [0]:
# visualizando datasets de exemplos da databricks
display(dbutils.fs.ls("/databricks-datasets"))

In [0]:
# Lendo o arquivo de dados
arquivo = "dbfs:/databricks-datasets/flights/"

In [0]:
# lendo o arquivo previamente com a opção inferSchema ligada
df = spark \
.read \
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [0]:
# imprime o schema do dataframe (infer_schema=True)
df.printSchema()

In [0]:
display(df)

In [0]:
# usa o objeto StructType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType

schema_df = StructType([
    StructField("date", StringType()),
    StructField("delay", IntegerType()),
    StructField("distance", IntegerType()),
    StructField("origin", StringType()),
    StructField("destination", StringType())
])

In [0]:
# verificando o tipo da variável schema_df
type(schema_df)

In [0]:
# usando o parâmetro schema()
df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.load(arquivo)

In [0]:
# imprime o schema do dataframe.
df.printSchema()

In [0]:
# imprime 10 primeiras linhas do dataframe.
df.show(10)

In [0]:
# imprime o tipo da varia'vel df 
type(df)

In [0]:
# retorna as primeiras 10 linhas do dataframe em formato de array.
df.take(10)

In [0]:
# imprime a quantidade de linhas no dataframe.
df.count()

In [0]:
from pyspark.sql.functions import max
df.select(max("delay")).take(1)

In [0]:
# Filtrando linhas de um dataframe usando filter
df.filter("delay < 2").show(2)

In [0]:
# Usando where (um alias para o metodo filter)
df.where("delay < 2").show(2)

In [0]:
# ordena o dataframe pela coluna delay
df.sort("delay").show(5)

In [0]:
from pyspark.sql.functions import desc, asc, expr
# ordenando por ordem crescente
df.orderBy(expr("delay desc")).show(10)

In [0]:
# visualizando estatísticas descritivas
df.describe().show()

In [0]:
# iterando sobre todas as linhas do dataframe
for i in df.collect():
  #print (i)
  print(i[0], i[1], i[2] * 2)

In [0]:
# Adicionando uma coluna ao dataframe
df = df.withColumn('Nova Coluna',df['delay']+2)
df.show(10)

In [0]:
# Reovendo coluna
df = df.drop('Nova Coluna')
df.show(10)

In [0]:
# Renomenando uma coluna no dataframe
df.withColumnRenamed('Nova Coluna','New Column').show()

#### Trabalhando com missing values
- Tratamento de dados e limpeza de dados

In [0]:
# checa valoes null na coluna delay
df.filter("delay is NULL").show()

In [0]:
# conta a quantidade de linhas nulas
print ("Valores nulos coluna Delay: {0}".format(df.filter("delay is NULL").count()))
print ("Valores nulos coluna Date: {0}".format(df.filter("date is NULL").count()))
print ("Valores nulos coluna Distance: {0}".format(df.filter("distance is NULL").count()))
print ("Valores nulos coluna Origin: {0}".format(df.filter("origin is NULL").count()))
print ("Valores nulos coluna Destination: {0}".format(df.filter("destination is NULL").count()))

In [0]:
# preenche os dados missing com o valor 0
# para fazer o preenchimento sobrescreva a variável df e retire o método show()
df = df.na.fill(value=0)

In [0]:
# checa valoes null na coluna delay
df.filter("delay is NULL").show()

In [0]:
# preenche valores missing com valor 0 apenas da coluna delay
df.na.fill(value=0, subset=['delay']).show()

In [0]:
# imprime o dataframe
df.show()

In [0]:
# preenche os dados com valores de string vazia
df.na.fill("").show()

In [0]:
# remove qualquer linha nula de qualquer coluna
df = df.na.drop()

In [0]:
# obtem o valor máximo da coluna delay
from pyspark.sql.functions import max
df.select(max("delay")).take(1)

In [0]:
# Filtrando linhas de um dataframe usando filter
df.filter("delay < 2").show(2)

#### Manipulando Strings

In [0]:
# lendo os arquivos de dados de voos (2010_summary.csv...2015_summary.csv)
df = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.csv("/FileStore/tables/bronze2/*.csv")

In [0]:
# imprime 10 linhas do dataframe
df.show(10)

In [0]:
# imprime a quantidade de registros do dataframe
df.count()

In [0]:
from pyspark.sql.functions import lower, upper, col
df.select(col("DEST_COUNTRY_NAME"),lower(col("DEST_COUNTRY_NAME")),upper(lower(col("DEST_COUNTRY_NAME")))).show(10)

In [0]:
# remove espaços em branco a esquerda
from pyspark.sql.functions import ltrim
df.select(ltrim(col("DEST_COUNTRY_NAME"))).show(2)

In [0]:
# remove espaços a direita
from pyspark.sql.functions import rtrim
df.select(rtrim(col("DEST_COUNTRY_NAME"))).show(2)

In [0]:
# todas as operações juntas..
# a função lit cria uma coluna na cópia do dataframe
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

Estatística descritiva básica:
- **mean()** - Retorna o valor médio de cada grupo.
- **max()** - Retorna o valor máximo de cada grupo.
- **min()** - Retorna o valor mínimo de cada grupo.
- **sum()** - Retorna a soma de todos os valores do grupo.
- **avg()** - Retorna o valor médio de cada grupo.

In [0]:
# ler o dataset retail-data
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/retail/retail_2010_12_01.csv")

In [0]:
# imprime as 10 primeiras linhas do dataframe
df.show(10)

In [0]:
# Soma preços unitários por país
df.groupBy("Country").sum("UnitPrice").show()

In [0]:
# Conta a quantidade de países distintos.
df.groupBy("Country").count().show()

In [0]:
# retorna o valor mínimo por grupo
df.groupBy("Country").min("UnitPrice").show()

In [0]:
# retorna o valor máximo por grupo
df.groupBy("Country").max("UnitPrice").show()

In [0]:
# retorna o valor médio por grupo
df.groupBy("Country").avg("UnitPrice").show()

In [0]:
# retorna o valor médio por grupo
df.groupBy("Country").mean("UnitPrice").show()

In [0]:
# GroupBy várias colunas
df.groupBy("Country","CustomerID") \
    .sum("UnitPrice") \
    .show()

#### Trabalhando com datas
- Existem diversas funçoes em Pyspark para manipular datas e timestamp.
- Evite escrever suas próprias funçoes para isso.
- Algumas funcoes mais usadas:
    - current_day():
    - date_format(dateExpr,format):
    - to_date():
    - to_date(column, fmt):
    - add_months(Column, numMonths):
    - date_add(column, days):
    - date_sub(column, days):
    - datediff(end, start)
    - current_timestamp():
    - hour(column):

In [0]:
# imprime o dataframe
df.show()

In [0]:
# imprime o schema
df.printSchema()

In [0]:
from pyspark.sql.functions import *
#current_date() = imprime
df.select(current_date().alias("current_date")).show(1)

In [0]:
# formata valores de data
df.select(col("InvoiceDate"), \
          date_format(col("InvoiceDate"), "dd/MM/yyyy hh:mm:ss")\
          .alias("Formato Brasil")).show()

In [0]:
# imprime a diferença entre duas datas
df.select(col("InvoiceDate"),
    datediff(current_date(),col("InvoiceDate")).alias("datediff")  
  ).show()

In [0]:
# meses entre datas
df.select(col("InvoiceDate"), 
    months_between(current_date(),col("InvoiceDate")).alias("months_between")  
  ).show()

In [0]:
# Extrai ano, mës, próximo dia, dia da semana.
df.select(col("InvoiceDate"), 
     year(col("InvoiceDate")).alias("year"), 
     month(col("InvoiceDate")).alias("month"), 
     next_day(col("InvoiceDate"),"Sunday").alias("next_day"), 
     weekofyear(col("InvoiceDate")).alias("weekofyear") 
  ).show()

In [0]:
# Dia da semana, dia do mës, dias do ano
df.select(col("InvoiceDate"),  
     dayofweek(col("InvoiceDate")).alias("dayofweek"), 
     dayofmonth(col("InvoiceDate")).alias("dayofmonth"), 
     dayofyear(col("InvoiceDate")).alias("dayofyear"), 
  ).show()

In [0]:
# imprime o timestamp atual
df.select(current_timestamp().alias("current_timestamp")
  ).show(1,truncate=False)

In [0]:
# retorna hora, minuto e segundo
df.select(col("InvoiceDate"), 
    hour(col("InvoiceDate")).alias("hour"), 
    minute(col("InvoiceDate")).alias("minute"),
    second(col("InvoiceDate")).alias("second") 
  ).show()

#### Condições com operadores boleanos

In [0]:
# Retorna linhas das colunas 'InvoiceNo' e 'Description' onde 'InvoiceNo' é diferente de 536365
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(10)

In [0]:
# usando o operador boleando com um predicado em uma expressão.
df.where("InvoiceNo <> 536365").show(5)

In [0]:
# usando o operador boleando com um predicado em uma expressão.
df.where("InvoiceNo == 536365").show(5)

In [0]:
# Entendendo a ordem dos operadores boleanos
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1

In [0]:
# aplicando os operadores como filtros
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

In [0]:
# Create a view ou tabela temporária.
df.createOrReplaceTempView("dfTable")

In [0]:
%sql
-- Aplicando a mesmo código em SQL
SELECT * 
FROM dfTable 
WHERE StockCode in ("DOT")
AND(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)

In [0]:
# Combinando filtros e operadores boleanos
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1

In [0]:
# Combinando filtros e operadores boleanos
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)

In [0]:
%sql
-- Aplicando as mesmas ideias usando SQL
SELECT UnitPrice, (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))

#### Comparando a performance de SQL vs Python Apis

In [0]:
## utilizando SQL
sqlWay = spark.sql("""
SELECT StockCode, count(*)
FROM dfTable
GROUP BY StockCode
""")

In [0]:
# Utilizando Python
dataFrameWay = df.groupBy("StockCode").count()

In [0]:
# imprime o plano de execução do código
sqlWay.explain()

In [0]:
# imprime o plano de execução do código
dataFrameWay.explain()

### Trabalhando com Joins

In [0]:
# Cria dataframes
pessoa = spark.createDataFrame([
(0, "João de Maria", 0, [100]),
(1, "Norma Maria", 1, [500, 250, 100]),
(2, "João de Deus", 1, [250, 100]),
(3, "Ana Maria Silva", 4, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")

programa_graduacao = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")

status = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")

In [0]:
# cria tabelas para os dataframes criados acima
pessoa.createOrReplaceTempView("pessoa")
programa_graduacao.createOrReplaceTempView("programa_graduacao")
status.createOrReplaceTempView("status")

In [0]:
# imprime os dataframes criados
pessoa.show()
programa_graduacao.show()
status.show()

In [0]:
# cria um objeto com as chaves para fazer join
keys_join = pessoa["graduate_program"] == programa_graduacao['id']

In [0]:
# imprime objeto
type(keys_join)

In [0]:
# dataframe com inner join entre pessoa e programa de graduação
pessoa.join(programa_graduacao, keys_join).show()

In [0]:
# dataframe com inner join entre pessoa e programa de graduação
# sintaxe join(dataframealvo, condição-de-join, tipo-de-join)

pessoa.join(programa_graduacao, pessoa["graduate_program"] == programa_graduacao['id'], 'inner').show()

In [0]:
%sql
-- Inner join em SQL
SELECT * 
FROM pessoa INNER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id

In [0]:
# Outer joins: retorna null para linhas que não existam em um dos dataframes e retorna qualquer dado em qualquer dataframe caso exista a chave
join_type = "outer"
pessoa.join(programa_graduacao, keys_join, join_type).show()

In [0]:
%sql
-- Outer join em SQL
SELECT * 
FROM pessoa FULL OUTER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id

In [0]:
# Left joins: retorna null para linhas que não existam no dataframe da direita
join_type = "left_outer"
pessoa.join(programa_graduacao, keys_join, join_type).show()

In [0]:
%sql
-- Left outer join em SQL
SELECT * 
FROM pessoa LEFT OUTER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id

In [0]:
# Right joins: retorna null para linhas que não existam no dataframe a esquerda
join_type = "right_outer"
pessoa.join(programa_graduacao, keys_join, join_type).show()

In [0]:
%sql
-- Right join em SQL
SELECT * 
FROM pessoa RIGHT OUTER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id

#### Condições

In [0]:
# altera a condição de join
keys_join = ((pessoa["graduate_program"] == programa_graduacao["id"]) & (pessoa["graduate_program"] > 0))
join_type = "inner"
pessoa.join(programa_graduacao, keys_join, join_type).show()

In [0]:
%sql
-- Inner join em SQL
-- adicionando uma codição where
SELECT * 
FROM pessoa INNER JOIN programa_graduacao
ON pessoa.graduate_program = programa_graduacao.id
WHERE pessoa.graduate_program > 0

In [0]:
# Condições mais complexas usando expressão

from pyspark.sql.functions import expr

pessoa.withColumnRenamed("id", "personId")\
.join(status, expr("array_contains(spark_status, id)")).show()

In [0]:
%sql
-- Condições mais complexas usando expressão feitas em SQL
SELECT * 
FROM
  (select id as personId
         ,name
         ,graduate_program
         ,spark_status
   FROM pessoa)
  INNER JOIN status ON array_contains(spark_status, id)

#### Trabalhando com UDFs
- Integração de código entre as APIs
- É preciso cuidado com performance dos códigos usando UDFs

In [0]:
from pyspark.sql.types import LongType
# define a função
def quadrado(s):
  return s * s

In [0]:
# registra no banco de dados do spark e define o tipo de retorno por padrão é stringtype
from pyspark.sql.types import LongType
spark.udf.register("Func_Py_Quadrado", quadrado, LongType())

In [0]:
# gera valores aleatórios
spark.range(1, 20).show()

In [0]:
# cria a visão View_temp
spark.range(1, 20).createOrReplaceTempView("View_temp")

In [0]:
%sql
-- Usando a função criada em python juntamente com código SQL
select id, 
       Func_Py_Quadrado(id) as id_ao_quadrado
from View_temp

#### UDFs com Dataframes

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
# registra a Udf
Func_Py_Quadrado = udf(quadrado, LongType())

In [0]:
# cria um dataframe apartir da tabela temporária
df = spark.table("View_temp")

In [0]:
# imprime o dataframe
df.show(10)

In [0]:
# usando o dataframe juntamente com a Udf registrada
df.select("id", Func_Py_Quadrado("id").alias("id_quadrado")).show(20)

#### Koalas
- Koalas é um projeto de código aberto que fornece um substituto imediato para os pandas. 
- O pandas é comumente usado por ser um pacote que fornece estruturas de dados e ferramentas de análise de dados fáceis de usar para a linguagem de programação Python.
- O Koalas preenche essa lacuna fornecendo APIs equivalentes ao pandas que funcionam no Apache Spark. 
- Koalas é útil não apenas para usuários de pandas, mas também para usuários de PySpark.
  - Koalas suporta muitas tarefas que são difíceis de fazer com PySpark, por exemplo, plotar dados diretamente de um PySpark DataFrame.
- Koalas suporta SQL diretamente em seus dataframes.

In [0]:
import numpy as np
import pandas as pd
import databricks.koalas as ks

In [0]:
# cria um pandas DataFrame
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [0]:
# imprime um pandas dataframe
type(pdf)

In [0]:
# Cria um Koalas DataFrame
kdf = ks.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [0]:
# imprime o tipo de dados
type(kdf)

In [0]:
# Cria um Koalas dataframe a partir de um pandas dataframe
kdf = ks.DataFrame(pdf)
type(kdf)

In [0]:
# métodos já conhecidos
pdf.head()

In [0]:
# métodos já conhecidos
kdf.head()

In [0]:
# método describe()
kdf.describe()

In [0]:
# ordenando um dataframe
kdf.sort_values(by='B')

In [0]:
# define configurações de layout de células
from databricks.koalas.config import set_option, get_option
ks.get_option('compute.max_rows')
ks.set_option('compute.max_rows', 2000)

In [0]:
# slice
kdf[['A', 'B']]

In [0]:
# slice
kdf[['A', 'B']]

In [0]:
# iloc
kdf.iloc[:3, 1:2]

#### Usando funções python com dataframe koalas

In [0]:
# cria função python
def quadrado(x):
    return x ** 2

In [0]:
# habilita computação de dataframes e séries.
from databricks.koalas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)

In [0]:
# cria uma nova coluna a partir da função quadrado
kdf['C'] = kdf.A.apply(quadrado)

In [0]:
# visualizando o dataframe
kdf.head()

In [0]:
# agrupando dados
kdf.groupby('A').sum()

In [0]:
# agrupando mais de uma coluna
kdf.groupby(['A', 'B']).sum()

In [0]:
# para plotar gráfico diretamente na célula use o inline
%matplotlib inline

speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]

index = ['snail', 'pig', 'elephant',
         'rabbit', 'giraffe', 'coyote', 'horse']

kdf = ks.DataFrame({'speed': speed,
                   'lifespan': lifespan}, index=index)
kdf.plot.bar()

**Usando SQL no Koalas**

In [0]:
# cria um dataframe Koalas
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'pig': [20, 18, 489, 675, 1776],
                    'horse': [4, 25, 281, 600, 1900]})

In [0]:
# Faz query no dataframe koalas
ks.sql("SELECT * FROM {kdf} WHERE pig > 100")

In [0]:
# cria um dataframe pandas
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                    'sheep': [22, 50, 121, 445, 791],
                    'chicken': [250, 326, 589, 1241, 2118]})

In [0]:
# Query com inner join entre dataframe pandas e koalas
ks.sql('''
    SELECT ks.pig, pd.chicken
    FROM {kdf} ks INNER JOIN {pdf} pd
    ON ks.year = pd.year
    ORDER BY ks.pig, pd.chicken''')

In [0]:
# converte koalas dataframe para Pyspark
kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})

In [0]:
pydf = kdf.to_spark()

In [0]:
type(pydf)