# Big Data para Cientista de Dados

In [0]:
# Lendo o arquivo de dados
arquivo = "/FileStore/tables/curso/2015_summary.csv"

In [0]:
# lendo o arquivo de dados
# inferSchema = True
# header = True

flightData2015 = spark\
.read.format("csv")\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [0]:
# imprime os datatypes das colunas do dataframe
flightData2015.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [0]:
# imprime o tipo da variável flightData2015
type(flightData2015)

Out[4]: pyspark.sql.dataframe.DataFrame

In [0]:
# retorna as primeiras 3 linhas do dataframe em formato de array.
flightData2015.take(5)

Out[5]: [Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344),
 Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]

In [0]:
# Usando o comando display
display(flightData2015.show(3))

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
+-----------------+-------------------+-----+
only showing top 3 rows



In [0]:
# imprime a quantidade de linhas no dataframe.
flightData2015.count()

Out[7]: 256

In [0]:
# lendo o arquivo previamente com a opção inferSchema desligada
flightData2015 = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)

In [0]:
df = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.csv("/FileStore/tables/bronse/*.csv")

In [0]:
df.show(10)
type(df)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|    United States|            Ireland|  264|
|    United States|              India|   69|
|            Egypt|      United States|   24|
|Equatorial Guinea|      United States|    1|
|    United States|          Singapore|   25|
|    United States|            Grenada|   54|
|       Costa Rica|      United States|  477|
|          Senegal|      United States|   29|
|    United States|   Marshall Islands|   44|
+-----------------+-------------------+-----+
only showing top 10 rows

Out[14]: pyspark.sql.dataframe.DataFrame

In [0]:
# imprime a quantidade de linhas do datafrme
df.count()

Out[15]: 1502

In [0]:
# Opções de Plots
display(df.head(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


Output can only be rendered in Databricks

#Trabalhando com SQL

In [0]:
%sql
DROP TABLE IF EXISTS all_files;

In [0]:
%sql
CREATE TABLE all_files
USING csv
OPTIONS (path "/FileStore/tables/bronse/*.csv", header "true")

In [0]:
%sql
-- Consutando dados usando a linguagem SQL
SELECT * FROM all_files;

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
%sql
-- Consutando dados usando a linguagem SQL
SELECT count(*) FROM all_files;

count(1)
1502


In [0]:
%sql
-- Consutando dados usando a linguagem SQL
SELECT DEST_COUNTRY_NAME
       ,avg(count) AS Quantidade_Paises
FROM all_files
GROUP BY DEST_COUNTRY_NAME
ORDER BY Quantidade_Paises DESC;

DEST_COUNTRY_NAME,Quantidade_Paises
Canada,8175.333333333333
Mexico,6345.833333333333
United States,3190.5978260869565
United Kingdom,1824.3333333333333
Japan,1534.1666666666667
Germany,1416.8333333333333
Dominican Republic,1143.0
Brazil,939.1666666666666
The Bahamas,909.8333333333334
France,867.6666666666666


In [0]:
# Create a view or table temporária.
df.createOrReplaceTempView("2015_summary_csv")

In [0]:
%sql
select * from 2015_summary_csv

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


In [0]:
%sql
-- Query na view 2015_summary_csv com multiplicação.
SELECT DEST_COUNTRY_NAME 
      ,ORIGIN_COUNTRY_NAME
      ,count * 10 as count_multiplicado_por_dez
FROM 2015_summary_csv

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count_multiplicado_por_dez
United States,Romania,10
United States,Ireland,2640
United States,India,690
Egypt,United States,240
Equatorial Guinea,United States,10
United States,Singapore,250
United States,Grenada,540
Costa Rica,United States,4770
Senegal,United States,290
United States,Marshall Islands,440


In [0]:
from pyspark.sql.functions import max
df.select(max("count")).take(1)

Out[20]: [Row(max(count)=370002)]

In [0]:
# Filtrando linhas de um dataframe usando filter
df.filter("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
# Usando where (um alias para o metodo filter)
df.where("count < 2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|    1|
|Equatorial Guinea|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [0]:
%sql
-- filtrando linhas com sql
SELECT * 
FROM 2015_summary_csv
WHERE count < 2
LIMIT 2

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
Equatorial Guinea,United States,1


In [0]:
# obtendo linhas únicas
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

Out[23]: 320

### Manipulando Dataframes

In [0]:
df.sort("count").show(5)

+-----------------+--------------------+-----+
|DEST_COUNTRY_NAME| ORIGIN_COUNTRY_NAME|count|
+-----------------+--------------------+-----+
|         Slovakia|       United States|    1|
|          Liberia|       United States|    1|
|    United States|              Cyprus|    1|
|Equatorial Guinea|       United States|    1|
|    United States|Bosnia and Herzeg...|    1|
+-----------------+--------------------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import desc, asc, expr
# ordenando por ordem crescente
df.orderBy(expr("count desc")).show(10)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Burundi|      United States|    1|
|    United States|            Croatia|    1|
|Equatorial Guinea|      United States|    1|
|    United States|              Niger|    1|
|        Lithuania|      United States|    1|
|    United States|               Mali|    1|
|    United States|           Malaysia|    1|
|           Rwanda|      United States|    1|
|          Hungary|      United States|    1|
|       Kyrgyzstan|      United States|    1|
+-----------------+-------------------+-----+
only showing top 10 rows



In [0]:
# visualizando estatísticas descritivas
df.describe().show()

+-------+-----------------+-------------------+------------------+
|summary|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|             count|
+-------+-----------------+-------------------+------------------+
|  count|             1502|               1502|              1502|
|   mean|             null|               null|1718.3189081225032|
| stddev|             null|               null|22300.368619668894|
|    min|      Afghanistan|        Afghanistan|                 1|
|    max|         Zimbabwe|           Zimbabwe|            370002|
+-------+-----------------+-------------------+------------------+



In [0]:
# iterando sobre todas as linhas do dataframe
for i in df.collect():
  #print (i)
  print(f'{i[0]} | {i[1]} | {i[2]}')

United States | Romania | 1
United States | Ireland | 264
United States | India | 69
Egypt | United States | 24
Equatorial Guinea | United States | 1
United States | Singapore | 25
United States | Grenada | 54
Costa Rica | United States | 477
Senegal | United States | 29
United States | Marshall Islands | 44
Guyana | United States | 17
United States | Sint Maarten | 53
Malta | United States | 1
Bolivia | United States | 46
Anguilla | United States | 21
Turks and Caicos Islands | United States | 136
United States | Afghanistan | 2
Saint Vincent and the Grenadines | United States | 1
Italy | United States | 390
United States | Russia | 156
United States | Federated States of Micronesia | 48
Pakistan | United States | 9
United States | Netherlands | 570
Iceland | United States | 118
Marshall Islands | United States | 77
Luxembourg | United States | 91
Honduras | United States | 391
The Bahamas | United States | 903
El Salvador | United States | 519
United States | Senegal | 46
Samoa | Uni

In [0]:
from pyspark.sql.functions import lower, upper, col
df.select(col("DEST_COUNTRY_NAME"),lower(col("DEST_COUNTRY_NAME")),upper(lower(col("DEST_COUNTRY_NAME")))).show(10)

+-----------------+------------------------+-------------------------------+
|DEST_COUNTRY_NAME|lower(DEST_COUNTRY_NAME)|upper(lower(DEST_COUNTRY_NAME))|
+-----------------+------------------------+-------------------------------+
|    United States|           united states|                  UNITED STATES|
|    United States|           united states|                  UNITED STATES|
|    United States|           united states|                  UNITED STATES|
|            Egypt|                   egypt|                          EGYPT|
|Equatorial Guinea|       equatorial guinea|              EQUATORIAL GUINEA|
|    United States|           united states|                  UNITED STATES|
|    United States|           united states|                  UNITED STATES|
|       Costa Rica|              costa rica|                     COSTA RICA|
|          Senegal|                 senegal|                        SENEGAL|
|    United States|           united states|                  UNITED STATES|

In [0]:
%sql
-- Usando SQL..
SELECT DEST_COUNTRY_NAME
      ,lower(DEST_COUNTRY_NAME)
      ,Upper(DEST_COUNTRY_NAME)
FROM 2015_summary_csv

DEST_COUNTRY_NAME,lower(DEST_COUNTRY_NAME),upper(DEST_COUNTRY_NAME)
United States,united states,UNITED STATES
United States,united states,UNITED STATES
United States,united states,UNITED STATES
Egypt,egypt,EGYPT
Equatorial Guinea,equatorial guinea,EQUATORIAL GUINEA
United States,united states,UNITED STATES
United States,united states,UNITED STATES
Costa Rica,costa rica,COSTA RICA
Senegal,senegal,SENEGAL
United States,united states,UNITED STATES


In [0]:
# remove espaços em branco a esquerda
from pyspark.sql.functions import ltrim
df.select(ltrim(col("DEST_COUNTRY_NAME"))).show(2)

+------------------------+
|ltrim(DEST_COUNTRY_NAME)|
+------------------------+
|           United States|
|           United States|
+------------------------+
only showing top 2 rows



In [0]:
# remove espaços a direita
from pyspark.sql.functions import rtrim
df.select(rtrim(col("DEST_COUNTRY_NAME"))).show(2)

+------------------------+
|rtrim(DEST_COUNTRY_NAME)|
+------------------------+
|           United States|
|           United States|
+------------------------+
only showing top 2 rows



In [0]:
# todas as operações juntas..
# a função lit cria uma coluna na cópia do dataframe
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)

+------+------+-----+---+----------+
| ltrim| rtrim| trim| lp|        rp|
+------+------+-----+---+----------+
|HELLO | HELLO|HELLO|HEL|HELLO     |
|HELLO | HELLO|HELLO|HEL|HELLO     |
+------+------+-----+---+----------+
only showing top 2 rows



In [0]:
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|       Azerbaijan|      United States|    1|
|          Belarus|      United States|    1|
|          Belarus|      United States|    1|
|           Brunei|      United States|    1|
|         Bulgaria|      United States|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|               Togo|    1|
|       Kazakhstan|      United States|    1|
+-----------------+-------------------+-----+
only showing top 2 rows

+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
|    United States|      United States|370002|
|    United States|      United States|358354|
+-----------------+-------------------+------+
only showing top 2 rows



In [0]:
# utilizando SQL
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM 2015_summary_csv
GROUP BY DEST_COUNTRY_NAME
""")

In [0]:
# Utilizando Python
dataFrameWay = df.groupBy("DEST_COUNTRY_NAME").count()

In [0]:
# imprime o plano de execução do código
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#193], functions=[finalmerge_count(merge count#1327L) AS count(1)#1315L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#193, 200), ENSURE_REQUIREMENTS, [plan_id=1565]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#193], functions=[partial_count(1) AS count#1327L])
         +- FileScan csv [DEST_COUNTRY_NAME#193] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(6 paths)[dbfs:/FileStore/tables/bronse/2010_summary.csv, dbfs:/FileStore/tables..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [0]:
# imprime o plano de execução do código
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#193], functions=[finalmerge_count(merge count#1332L) AS count(1)#1322L])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#193, 200), ENSURE_REQUIREMENTS, [plan_id=1611]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#193], functions=[partial_count(1) AS count#1332L])
         +- FileScan csv [DEST_COUNTRY_NAME#193] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(6 paths)[dbfs:/FileStore/tables/bronse/2010_summary.csv, dbfs:/FileStore/tables..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>




In [0]:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")


In [0]:
#imprime as  10 primeiras linhas
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


In [0]:
# Tipos Boleanos
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5)

+---------+--------------------+
|InvoiceNo|         Description|
+---------+--------------------+
|   536366|HAND WARMER UNION...|
|   536366|HAND WARMER RED P...|
|   536367|ASSORTED COLOUR B...|
|   536367|POPPY'S PLAYHOUSE...|
|   536367|POPPY'S PLAYHOUSE...|
+---------+--------------------+
only showing top 5 rows



In [0]:
# cria a tabela temporária dftrable
df.createOrReplaceTempView("dfTable")

In [0]:
# imprime 10 primeiras linhas
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


In [0]:
# usando o operador boleando com um predicado em uma expressão.
df.where("InvoiceNo <> 536365").show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536366|    22633|HAND WARMER UNION...|       6|2010-12-01 08:28:00|     1.85|   17850.0|United Kingdom|
|   536366|    22632|HAND WARMER RED P...|       6|2010-12-01 08:28:00|     1.85|   17850.0|United Kingdom|
|   536367|    84879|ASSORTED COLOUR B...|      32|2010-12-01 08:34:00|     1.69|   13047.0|United Kingdom|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|   13047.0|United Kingdom|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|2010-12-01 08:34:00|      2.1|   13047.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
# usando o operador boleando com um predicado em uma expressão.
df.where("InvoiceNo = 536365").show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
# Entendendo a ordem dos operadores boleanos
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1

In [0]:
# aplicando os operadores como filtros
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()

+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+
|   536544|      DOT|DOTCOM POSTAGE|       1|2010-12-01 14:32:00|   569.77|      null|United Kingdom|
|   536592|      DOT|DOTCOM POSTAGE|       1|2010-12-01 17:06:00|   607.49|      null|United Kingdom|
+---------+---------+--------------+--------+-------------------+---------+----------+--------------+



In [0]:
%sql
-- Aplicando a mesmo código em SQL
SELECT * 
FROM dfTable 
WHERE StockCode in ("DOT")
AND(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536544,DOT,DOTCOM POSTAGE,1,2010-12-01T14:32:00.000+0000,569.77,,United Kingdom
536592,DOT,DOTCOM POSTAGE,1,2010-12-01T17:06:00.000+0000,607.49,,United Kingdom


In [0]:
# Combinando filtros e operadores boleanos
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1


In [0]:
# Combinando filtros e operadores boleanos
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)

+---------+-----------+
|unitPrice|isExpensive|
+---------+-----------+
|   569.77|       true|
|   607.49|       true|
+---------+-----------+



In [0]:
%sql
-- Aplicando as mesmas ideias usando SQL
SELECT UnitPrice, (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))

UnitPrice,isExpensive
569.77,True
607.49,True


# Trabalhando com tipos diferentes de arquivos

### Modos de leitura
- **permissive**: *Define todos os campos para NULL quando encontra registros corrompidos e coloca todos registros corrompidos em uma coluna chamada _corrupt_record.* (default)

- **dropMalformed**: *Apaga uma linha corrompida ou que este não consiga ler.*

- **failFast**: *Falha imediatamente quando encontra uma linha que não consiga ler.*

In [0]:
# Lendo arquivos csv
spark.read.format("csv")
.option("mode", "permissive")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()

[0;36m  File [0;32m"<command-2719157821580108>"[0;36m, line [0;32m3[0m
[0;31m    .option("mode", "permissive")[0m
[0m    ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
# leia o arquivo alterando os modos de leitura (failfast, permissive, dropmalformed)
df = spark.read.format("csv")\
.option("mode", "failfast")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")

In [0]:
# imprime as 10 primeiras linhas do dataframe
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


#### Criando um schema
- A opção **infer_schema** nem sempre vai definir o melhor datatype.
- Melhora a performance na leitura de grandes bases.
- Permite uma customização dos tipos das colunas.
- É importante saber para reescrita de aplicações. (Códigos pandas)

In [0]:
# imprime o schema do dataframe (infer_schema=True)
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
# usa o objeto StructType
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType
schema_df = StructType([
    StructField("InvoiceNo", IntegerType()),
    StructField("StockCode", IntegerType()),
    StructField("Description", StringType()),
    StructField("Quantity", IntegerType()),
    StructField("InvoiceDate", TimestampType()),
    StructField("UnitPrice", DoubleType()),
    StructField("CustomerID", DoubleType()),
    StructField("Country", StringType())
])

In [0]:
# verificando o tipo da variável schema_df
type(schema_df)

Out[4]: pyspark.sql.types.StructType

In [0]:
# usando o parâmetro schema()
df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.option("timestampFormat",'yyyy-/MM/DD')\
.load("/FileStore/tables/bronze/2010_12_01.csv")

In [0]:
# imprime o schema do dataframe.
df.printSchema()

root
 |-- InvoiceNo: integer (nullable = true)
 |-- StockCode: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
display(df.show(5))

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|     null|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|     null|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|     null|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|     null|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
display(df.head(5))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365,71053.0,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365,,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365,,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom


In [0]:
# imprime 10 primeiras linhas do dataframe.
display(df.collect())

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365.0,,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,2.55,17850.0,United Kingdom
536365.0,71053.0,WHITE METAL LANTERN,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365.0,,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01T08:26:00.000+0000,2.75,17850.0,United Kingdom
536365.0,,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365.0,,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01T08:26:00.000+0000,3.39,17850.0,United Kingdom
536365.0,22752.0,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01T08:26:00.000+0000,7.65,17850.0,United Kingdom
536365.0,21730.0,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01T08:26:00.000+0000,4.25,17850.0,United Kingdom
536366.0,22633.0,HAND WARMER UNION JACK,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536366.0,22632.0,HAND WARMER RED POLKA DOT,6,2010-12-01T08:28:00.000+0000,1.85,17850.0,United Kingdom
536367.0,84879.0,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01T08:34:00.000+0000,1.69,13047.0,United Kingdom


### Arquivos JSON

In [0]:
df_json = spark.read.format("json")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_summary.json")

In [0]:
df_json.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [0]:
display(df_json.head(10))

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,1
United States,Ireland,264
United States,India,69
Egypt,United States,24
Equatorial Guinea,United States,1
United States,Singapore,25
United States,Grenada,54
Costa Rica,United States,477
Senegal,United States,29
United States,Marshall Islands,44


### Escrevendo arquivos
- **append** : Adiciona arquivos de saída na lista de arquivos que já existem na localizaçao.
- **overwrite** : Sobreescreve os arquivos no destino.
- **erroIfExists** : Emite um erro e para se já existir arquivos no destino.
- **ignore** : Se existir o dado no destino náo faz nada.

In [0]:
# escrevendo arquivos csv
df.write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")

In [0]:
# observe o arquivo gerado.
file = "/FileStore/tables/bronze/saida_2010_12_01.csv/part-00000-tid-8221397275273258438-4fef621d-9eb9-48fa-b408-86484b4c7d45-32-1-c000.csv"
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema", "True")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load(file)

In [0]:
# imprime as 10 primeiras linhas do dataframe 
df.show(10)

+------+-----+----------------------------------+---+------------------------+----+-------+--------------+
|536365|  _c1|WHITE HANGING HEART T-LIGHT HOLDER|  6|2010-12-01T08:26:00.000Z|2.55|17850.0|United Kingdom|
+------+-----+----------------------------------+---+------------------------+----+-------+--------------+
|536365|71053|               WHITE METAL LANTERN|  6|    2010-12-01T08:26:...|3.39|17850.0|United Kingdom|
|536365| null|              CREAM CUPID HEART...|  8|    2010-12-01T08:26:...|2.75|17850.0|United Kingdom|
|536365| null|              KNITTED UNION FLA...|  6|    2010-12-01T08:26:...|3.39|17850.0|United Kingdom|
|536365| null|              RED WOOLLY HOTTIE...|  6|    2010-12-01T08:26:...|3.39|17850.0|United Kingdom|
|536365|22752|              SET 7 BABUSHKA NE...|  2|    2010-12-01T08:26:...|7.65|17850.0|United Kingdom|
|536365|21730|              GLASS STAR FROSTE...|  6|    2010-12-01T08:26:...|4.25|17850.0|United Kingdom|
|536366|22633|              HAND WARM

#### Escrevendo dados em paralelo

In [0]:
# reparticionando o dado arquivos csv
# observe o diretório criado
df.repartition(5).write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")

In [0]:
df.show(10)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-2427028480414467>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdf[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;36m10[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     49[0m                 logger.log_success(
[1;32m     50[0m      

### Arquivos Parquet

#####**Convertendo .csv para .parquet**
- Dataset .csv usado https://www.kaggle.com/nhs/general-practice-prescribing-data

In [0]:
# Lendo todos os arquivos .csv do diretório bigdata (>4GB)
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.load("/FileStore/tables/bigdata/*.csv")

In [0]:
display(df.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
5668,8092,592,2,44.1,40.84,189
1596,17512,16983,2,1.64,1.64,35
1596,25587,16124,1,1.26,1.28,42
1596,12551,1282,2,0.86,1.02,42
1596,18938,10575,1,1.85,1.82,56
1596,8777,21507,1,3.31,3.18,56
1596,9369,12008,1,63.15,58.56,56
1596,27926,17643,2,158.66,147.07,56
1596,26148,10230,1,0.35,0.44,14
1596,9148,3381,1,0.26,0.35,7


In [0]:
df.printSchema()

root
 |-- practice: integer (nullable = true)
 |-- bnf_code: integer (nullable = true)
 |-- bnf_name: integer (nullable = true)
 |-- items: integer (nullable = true)
 |-- nic: double (nullable = true)
 |-- act_cost: double (nullable = true)
 |-- quantity: integer (nullable = true)



In [0]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+
|summary|         practice|          bnf_code|          bnf_name|             items|               nic|          act_cost|          quantity|
+-------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+
|  count|        141100796|         141100796|         141100796|         141100796|         141100796|         141100796|         141100796|
|   mean|5312.586701077151|14056.373502237366|12505.010365086813|  9.00373134677426| 73.28301600885612| 68.20840060923398| 725.8706041176408|
| stddev|3062.198077089131| 8127.961749304247| 7105.801700604521|30.122243761334516|196.69454628561465|182.53164754538028|4088.2285246657857|
|    min|                0|                 0|                 0|                 1|               0.0|               0.0|                 0|
|    m

In [0]:
# conta a quantidade de linhas
df.count()

Out[4]: 141100796

*Atente para NÃO escrever e ler arquivos parquet em versoes diferentes*

In [0]:
# escrevendo em formato parquet
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/bronze/df-parquet-file.parquet")

In [0]:
%fs
ls /FileStore/tables/bronze/df-parquet-file.parquet

path,name,size,modificationTime
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_SUCCESS,_SUCCESS,0,1667011866000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_1845185167357572826,_committed_1845185167357572826,4024,1667011865000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_started_1845185167357572826,_started_1845185167357572826,0,1667011362000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00000-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-150-1-c000.snappy.parquet,part-00000-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-150-1-c000.snappy.parquet,43318185,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00001-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-151-1-c000.snappy.parquet,part-00001-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-151-1-c000.snappy.parquet,43239474,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00002-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-152-1-c000.snappy.parquet,part-00002-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-152-1-c000.snappy.parquet,43318185,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00003-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-153-1-c000.snappy.parquet,part-00003-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-153-1-c000.snappy.parquet,43239474,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00004-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-154-1-c000.snappy.parquet,part-00004-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-154-1-c000.snappy.parquet,43410342,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00005-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-155-1-c000.snappy.parquet,part-00005-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-155-1-c000.snappy.parquet,43275632,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00006-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-156-1-c000.snappy.parquet,part-00006-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-156-1-c000.snappy.parquet,43278145,1667011479000


In [0]:
# lendo arquivos parquet
# atente para a velocidade de leitura
df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/bronze/df-parquet-file.parquet")

In [0]:
# conta a quantidade de linhas do dataframe
df_parquet.count()

Out[9]: 141100796

In [0]:
df_parquet.describe().show()

+-------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+
|summary|          practice|          bnf_code|          bnf_name|             items|              nic|          act_cost|         quantity|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+
|  count|         141100796|         141100796|         141100796|         141100796|        141100796|         141100796|        141100796|
|   mean| 5312.586701077151|14056.373502237366|12505.010365086813|  9.00373134677426|73.28301600894046| 68.20840060921658|725.8706041176408|
| stddev|3062.1980770890905| 8127.961749304212| 7105.801700604489|30.122243761334673|196.6945462856128|182.53164754538042|4088.228524665831|
|    min|                 0|                 0|                 0|                 1|              0.0|               0.0|                0|
|    max|    

In [0]:
# visualizando o dataframe
display(df_parquet.head(10))

practice,bnf_code,bnf_name,items,nic,act_cost,quantity
3626,12090,20521,3,8.4,7.82,168
3626,23511,11576,1,32.18,29.81,28
3626,14802,14672,162,141.13,133.93,4760
3626,14590,10011,17,15.01,14.12,532
3626,24483,13726,69,57.57,54.67,2121
3626,7768,22070,155,113.03,109.41,4144
3626,1877,13598,102,68.5,67.4,2370
3626,18110,3990,189,156.66,150.44,5222
3626,14058,2144,23,23.52,22.48,588
3626,4558,5695,32,116.64,109.21,756


In [0]:
# visualizando o tamanho dos arquivos
display(dbutils.fs.ls("/FileStore/tables/bronze/df-parquet-file.parquet"))

path,name,size,modificationTime
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_SUCCESS,_SUCCESS,0,1667011866000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_committed_1845185167357572826,_committed_1845185167357572826,4024,1667011865000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/_started_1845185167357572826,_started_1845185167357572826,0,1667011362000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00000-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-150-1-c000.snappy.parquet,part-00000-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-150-1-c000.snappy.parquet,43318185,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00001-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-151-1-c000.snappy.parquet,part-00001-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-151-1-c000.snappy.parquet,43239474,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00002-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-152-1-c000.snappy.parquet,part-00002-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-152-1-c000.snappy.parquet,43318185,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00003-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-153-1-c000.snappy.parquet,part-00003-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-153-1-c000.snappy.parquet,43239474,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00004-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-154-1-c000.snappy.parquet,part-00004-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-154-1-c000.snappy.parquet,43410342,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00005-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-155-1-c000.snappy.parquet,part-00005-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-155-1-c000.snappy.parquet,43275632,1667011480000
dbfs:/FileStore/tables/bronze/df-parquet-file.parquet/part-00006-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-156-1-c000.snappy.parquet,part-00006-tid-1845185167357572826-9f446e30-83c8-44d5-aab9-5f20c009fafe-156-1-c000.snappy.parquet,43278145,1667011479000


In [0]:
%scala
// script para pegar tamanho em Gigabytes
val path="/FileStore/tables/bronze/df-parquet-file.parquet"
val filelist=dbutils.fs.ls(path)
val df_temp = filelist.toDF()
df_temp.createOrReplaceTempView("adlsSize")

In [0]:
%sql
-- consulta a view criada.
select round(sum(size)/(1024*1024*1024),3) as sizeInGB from adlsSize

sizeInGB
1.404


### Spark + PostgreSQL
- Consultar e escrever em um banco de dados relacional.

In [0]:
# Isso é equivalente a executar uma query como: select * from pg_catalog.pg_tables
# jdbc:postgresql://pgserver-27.postgres.database.azure.com:5432/{your_database}?user=wanderson_server@pgserver-27&password={your_password}&sslmode=require
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-27.postgres.database.azure.com:5432/postgres?user=wanderson_server@pgserver-27&password=wanderson&sslmode=require")\
.option("dbtable", "pg_catalog.pg_tables")\
.option("user", "wanderson_server").option("password", "wanderson").load()

In [0]:
# imprime todas as linhas do dataframe
display(pgDF.collect())

schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
pg_catalog,pg_statistic,azure_superuser,,True,False,False,False
pg_catalog,pg_foreign_table,azure_superuser,,True,False,False,False
pg_catalog,pg_authid,azure_superuser,pg_global,True,False,False,False
pg_catalog,pg_user_mapping,azure_superuser,,True,False,False,False
pg_catalog,pg_subscription,azure_superuser,pg_global,True,False,False,False
pg_catalog,pg_largeobject,azure_superuser,,True,False,False,False
pg_catalog,pg_type,azure_superuser,,True,False,False,False
pg_catalog,pg_attribute,azure_superuser,,True,False,False,False
pg_catalog,pg_proc,azure_superuser,,True,False,False,False
pg_catalog,pg_class,azure_superuser,,True,False,False,False


In [0]:
# consulta dados da coluna schemaname
pgDF.select("schemaname").distinct().show()

+------------------+
|        schemaname|
+------------------+
|information_schema|
|        pg_catalog|
+------------------+



In [0]:
# Especifica uma query diretamente.
# Útil para evitar o "select * from."
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-27.postgres.database.azure.com:5432/postgres?user=wanderson_server@pgserver-27&password=wanderson&sslmode=require")\
.option("query", "select schemaname,tablename from pg_catalog.pg_tables")\
.option("user", "wanderson_server").option("password", "wanderson").load()

In [0]:
# imprime todas as linhas do dataframe
display(pgDF.collect())

schemaname,tablename
pg_catalog,pg_statistic
pg_catalog,pg_foreign_table
pg_catalog,pg_authid
pg_catalog,pg_user_mapping
pg_catalog,pg_subscription
pg_catalog,pg_largeobject
pg_catalog,pg_type
pg_catalog,pg_attribute
pg_catalog,pg_proc
pg_catalog,pg_class


In [0]:
# imprime as 5 linhas do dataframe df
# não se esqueça de recriar esse dataframe.
df.show(5)

+--------+--------+--------+-----+----+--------+--------+
|practice|bnf_code|bnf_name|items| nic|act_cost|quantity|
+--------+--------+--------+-----+----+--------+--------+
|    5668|    8092|     592|    2|44.1|   40.84|     189|
|    1596|   17512|   16983|    2|1.64|    1.64|      35|
|    1596|   25587|   16124|    1|1.26|    1.28|      42|
|    1596|   12551|    1282|    2|0.86|    1.02|      42|
|    1596|   18938|   10575|    1|1.85|    1.82|      56|
+--------+--------+--------+-----+----+--------+--------+
only showing top 5 rows



In [0]:
# cria a tabela "produtos" a apartir dos dados do dataframe df.
pgDF.write.mode("overwrite")\
.format("jdbc")\
.option("url", "jdbc:postgresql://pgserver-27.postgres.database.azure.com:5432/postgres?user=wanderson_server@pgserver-27&password=wanderson&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "wanderson_server")\
.option("password", "wanderson")\
.save()

In [0]:
# cria o dataframe df_produtos a partir da tabela criada.
df_produtos = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-27.postgres.database.azure.com:5432/postgres?user=wanderson_server@pgserver-27&password=wanderson&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "wanderson_server").option("password", "wanderson").load()

In [0]:
# imprime as linhas do dataframe.
display(df_produtos.collect())

schemaname,tablename
public,produtos
pg_catalog,pg_statistic
pg_catalog,pg_foreign_table
pg_catalog,pg_authid
pg_catalog,pg_user_mapping
pg_catalog,pg_subscription
pg_catalog,pg_largeobject
pg_catalog,pg_type
pg_catalog,pg_attribute
pg_catalog,pg_proc


# Avançando com PySpark

- **mean()** - Retorna o valor médio de cada grupo.
- **max()** - Retorna o valor máximo de cada grupo.
- **min()** - Retorna o valor mínimo de cada grupo.
- **sum()** - Retorna a soma de todos os valores do grupo.
- **avg()** - Retorna o valor médio de cada grupo.

In [0]:
# Armazenando o Dataframe na variável
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")


In [0]:
# escrevendo em formato parquet
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/bronze/df-parquet-file-2010_12_01.parquet")

In [0]:
# Imprime as 10 primeiras linhas do dataframe
df.describe().show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C

In [0]:
df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/bronze/df-parquet-file-2010_12_01.parquet")

In [0]:
df_parquet.describe().show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|         UnitPrice|        CustomerID|       Country|
+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+--------------+
|  count|             3108|              3108|                3098|              3108|              3108|              1968|          3108|
|   mean| 536516.684944841|27834.304044117645|                null| 8.627413127413128| 4.151946589446603|15661.388719512195|          null|
| stddev|72.89447869788873|17407.897548583845|                null|26.371821677029203|15.638659854603892|1854.4496996893627|          null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|               -24|               0.0|           12431.0|     Australia|
|    max|          C

In [0]:
df.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|2010-12-01 08:26:00|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS S

In [0]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
df.groupBy('Country').sum("UnitPrice").show()

+--------------+------------------+
|       Country|    sum(UnitPrice)|
+--------------+------------------+
|       Germany| 93.82000000000002|
|        France|             55.29|
|          EIRE|133.64000000000001|
|        Norway|            102.67|
|     Australia|              73.9|
|United Kingdom|12428.080000000024|
|   Netherlands|             16.85|
+--------------+------------------+



In [0]:
# Contagem agrupada por Cidade
df.groupBy('Country').count().sort("count").show()

+--------------+-----+
|       Country|count|
+--------------+-----+
|   Netherlands|    2|
|     Australia|   14|
|        France|   20|
|          EIRE|   21|
|       Germany|   29|
|        Norway|   73|
|United Kingdom| 2949|
+--------------+-----+



In [0]:
# Retorna o valor mínimo por grupo
df.groupBy('Country').min('UnitPrice').show()

+--------------+--------------+
|       Country|min(UnitPrice)|
+--------------+--------------+
|       Germany|          0.42|
|        France|          0.42|
|          EIRE|          0.65|
|        Norway|          0.29|
|     Australia|          0.85|
|United Kingdom|           0.0|
|   Netherlands|          1.85|
+--------------+--------------+



In [0]:
# Retorna o valor máximo por grupo
df.groupBy('Country').max('UnitPrice').show()

+--------------+--------------+
|       Country|max(UnitPrice)|
+--------------+--------------+
|       Germany|          18.0|
|        France|          18.0|
|          EIRE|          50.0|
|        Norway|          7.95|
|     Australia|           8.5|
|United Kingdom|        607.49|
|   Netherlands|          15.0|
+--------------+--------------+



In [0]:
# Retorna o valor médio por grupo
df.groupBy('Country').avg('UnitPrice').show()

+--------------+------------------+
|       Country|    avg(UnitPrice)|
+--------------+------------------+
|       Germany| 3.235172413793104|
|        France|            2.7645|
|          EIRE|6.3638095238095245|
|        Norway|1.4064383561643836|
|     Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
|   Netherlands|             8.425|
+--------------+------------------+



In [0]:
# Retorna o valor médio por grupo
df.groupBy('Country').mean('UnitPrice').show()

+--------------+------------------+
|       Country|    avg(UnitPrice)|
+--------------+------------------+
|       Germany| 3.235172413793104|
|        France|            2.7645|
|          EIRE|6.3638095238095245|
|        Norway|1.4064383561643836|
|     Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
|   Netherlands|             8.425|
+--------------+------------------+



In [0]:
# GroupBy de várias colunas
df.groupBy('Country', 'CustomerID') \
  .sum('UnitPrice') \
  .show()

+--------------+----------+------------------+
|       Country|CustomerID|    sum(UnitPrice)|
+--------------+----------+------------------+
|United Kingdom|   17420.0| 38.99999999999999|
|United Kingdom|   15922.0|              48.5|
|United Kingdom|   16250.0|             47.27|
|United Kingdom|   13065.0| 73.11000000000001|
|United Kingdom|   18074.0|62.150000000000006|
|United Kingdom|   16048.0|12.969999999999999|
|       Germany|   12472.0|             49.45|
|United Kingdom|   18085.0|              34.6|
|United Kingdom|   17905.0|109.90000000000003|
|United Kingdom|   17841.0|254.63999999999982|
|United Kingdom|   15291.0|               6.0|
|United Kingdom|   17951.0|22.000000000000004|
|United Kingdom|   13255.0|27.299999999999997|
|United Kingdom|   17690.0|              34.8|
|United Kingdom|   18229.0|             48.65|
|United Kingdom|   15605.0| 58.20000000000002|
|United Kingdom|   18011.0| 66.10999999999999|
|United Kingdom|   17809.0|              1.45|
|United Kingd

# Trabalhando com datas

- Funções mais usadas para trabalhar com dastas usando Pyspark:
  - current_day():
  - data_format(dataExpr, format):
  - to_date():
  - to_date(column, fmt):
  - add_months(Column, numMonths):
  - date_add(column, days):
  - date_sub(column, days):
  - datediff(end, start):
  - current_timestamp():
  - hour(column):

In [0]:
df.show(5)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|2010-12-01 08:26:00|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|2010-12-01 08:26:00|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010-12-01 08:26:00|     3.39|   17850.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
from pyspark.sql.functions import *
df.select(current_date().alias('current_date')).show(1)

+------------+
|current_date|
+------------+
|  2022-10-30|
+------------+
only showing top 1 row



In [0]:
# Usando date_format para formatar datas:
df.select(col('InvoiceDate'), \
         date_format(current_date(), 'dd/MM/yyyy hh:mm:ss').alias('current_date'), \
         date_format(col('InvoiceDate'), 'dd/MM/yyyy hh:mm:ss').alias('date_format')).show()

+-------------------+-------------------+-------------------+
|        InvoiceDate|       current_date|        date_format|
+-------------------+-------------------+-------------------+
|2010-12-01 08:26:00|30/10/2022 12:00:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|30/10/2022 12:00:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|30/10/2022 12:00:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|30/10/2022 12:00:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|30/10/2022 12:00:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|30/10/2022 12:00:00|01/12/2010 08:26:00|
|2010-12-01 08:26:00|30/10/2022 12:00:00|01/12/2010 08:26:00|
|2010-12-01 08:28:00|30/10/2022 12:00:00|01/12/2010 08:28:00|
|2010-12-01 08:28:00|30/10/2022 12:00:00|01/12/2010 08:28:00|
|2010-12-01 08:34:00|30/10/2022 12:00:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|30/10/2022 12:00:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|30/10/2022 12:00:00|01/12/2010 08:34:00|
|2010-12-01 08:34:00|30/10/2022 12:00:00|01/12/2010 08:34:00|
|2010-12

In [0]:
#datadiff()
df.select(col('InvoiceDate'),
         datediff(current_date(),col('InvoiceDate')).alias('datediff')
         ).show()

+-------------------+--------+
|        InvoiceDate|datediff|
+-------------------+--------+
|2010-12-01 08:26:00|    4351|
|2010-12-01 08:26:00|    4351|
|2010-12-01 08:26:00|    4351|
|2010-12-01 08:26:00|    4351|
|2010-12-01 08:26:00|    4351|
|2010-12-01 08:26:00|    4351|
|2010-12-01 08:26:00|    4351|
|2010-12-01 08:28:00|    4351|
|2010-12-01 08:28:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
|2010-12-01 08:34:00|    4351|
+-------------------+--------+
only showing top 20 rows



In [0]:
#months_between()

df.select(col('InvoiceDate'),
         months_between(current_date(),col('InvoiceDate')).alias('months_between')
         ).show()

+-------------------+--------------+
|        InvoiceDate|months_between|
+-------------------+--------------+
|2010-12-01 08:26:00|  142.92414875|
|2010-12-01 08:26:00|  142.92414875|
|2010-12-01 08:26:00|  142.92414875|
|2010-12-01 08:26:00|  142.92414875|
|2010-12-01 08:26:00|  142.92414875|
|2010-12-01 08:26:00|  142.92414875|
|2010-12-01 08:26:00|  142.92414875|
|2010-12-01 08:28:00|  142.92410394|
|2010-12-01 08:28:00|  142.92410394|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
|2010-12-01 08:34:00|  142.92396953|
+-------------------+--------------+
only showing top 20 rows



In [0]:
# utiliza as funções para adicionar, subtrair meses e dias:
df.select(col('InvoiceDate'),
          add_months(col('InvoiceDate'), 3).alias('add_months'),
          add_months(col('InvoiceDate'), -3).alias('sub_months'),
          date_add(col('InvoiceDate'), 4).alias('date_add'),
          date_sub(col('InvoiceDate'), 4).alias('date_add')
         ).show()

+-------------------+----------+----------+----------+----------+
|        InvoiceDate|add_months|sub_months|  date_add|  date_add|
+-------------------+----------+----------+----------+----------+
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:26:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:28:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:28:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:34:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:34:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-01 08:34:00|2011-03-01|2010-09-01|2010-12-05|2010-11-27|
|2010-12-0

In [0]:
# Extrai ano, mês, próximo dia, dia da semana:
df.select(col('InvoiceDate'),
          year(col('InvoiceDate')).alias('year'),
          month(col('InvoiceDate')).alias('month'),
          next_day(col('InvoiceDate'), 'Sunday').alias('next_day'),
          weekofyear(col('InvoiceDate')).alias('weekofyear')
         ).show()

+-------------------+----+-----+----------+----------+
|        InvoiceDate|year|month|  next_day|weekofyear|
+-------------------+----+-----+----------+----------+
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:26:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:28:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:28:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-01 08:34:00|2010|   12|2010-12-05|        48|
|2010-12-0

In [0]:
# Extrai ano, mês, próximo dia, dia da semana:
df.select(col('InvoiceDate'),
          dayofweek(col('InvoiceDate')).alias('dayofweek'),
          dayofmonth(col('InvoiceDate')).alias('dayofmonth'),
          dayofyear(col('InvoiceDate')).alias('dayofyear'),
          current_date().alias('data_atual'),
          dayofweek(current_date()).alias('dayofweek2'),
          dayofmonth(current_date()).alias('dayofmonth2'),
          dayofyear(current_date()).alias('dayofyear2'),          
         ).show()

+-------------------+---------+----------+---------+----------+----------+-----------+----------+
|        InvoiceDate|dayofweek|dayofmonth|dayofyear|data_atual|dayofweek2|dayofmonth2|dayofyear2|
+-------------------+---------+----------+---------+----------+----------+-----------+----------+
|2010-12-01 08:26:00|        4|         1|      335|2022-10-30|         1|         30|       303|
|2010-12-01 08:26:00|        4|         1|      335|2022-10-30|         1|         30|       303|
|2010-12-01 08:26:00|        4|         1|      335|2022-10-30|         1|         30|       303|
|2010-12-01 08:26:00|        4|         1|      335|2022-10-30|         1|         30|       303|
|2010-12-01 08:26:00|        4|         1|      335|2022-10-30|         1|         30|       303|
|2010-12-01 08:26:00|        4|         1|      335|2022-10-30|         1|         30|       303|
|2010-12-01 08:26:00|        4|         1|      335|2022-10-30|         1|         30|       303|
|2010-12-01 08:28:00

In [0]:
# Imprime o timestamp atual:
df.select(current_timestamp().
          alias('current_timestamp')
         ).show(1, truncate=False)

+-----------------------+
|current_timestamp      |
+-----------------------+
|2022-10-30 17:46:52.362|
+-----------------------+
only showing top 1 row



In [0]:
# retorna hora, minuto e segundo
df.select(col('InvoiceDate'),
         hour(col('InvoiceDate')).alias('hour'),
         minute(col('InvoiceDate')).alias('minute'),
         second(col('InvoiceDate')).alias('second')
         ).show()

+-------------------+----+------+------+
|        InvoiceDate|hour|minute|second|
+-------------------+----+------+------+
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:26:00|   8|    26|     0|
|2010-12-01 08:28:00|   8|    28|     0|
|2010-12-01 08:28:00|   8|    28|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
|2010-12-01 08:34:00|   8|    34|     0|
+-------------------+----+------+------+
only showing top

# Missing Values com Pyspark

In [0]:
# visualizando datasets de exemplos da databricks
display(dbutils.fs.ls('/databricks-datasets'))

path,name,size,modificationTime
dbfs:/databricks-datasets/,databricks-datasets/,0,0
dbfs:/databricks-datasets/COVID/,COVID/,0,0
dbfs:/databricks-datasets/README.md,README.md,976,1532468253000
dbfs:/databricks-datasets/Rdatasets/,Rdatasets/,0,0
dbfs:/databricks-datasets/SPARK_README.md,SPARK_README.md,3359,1455043490000
dbfs:/databricks-datasets/adult/,adult/,0,0
dbfs:/databricks-datasets/airlines/,airlines/,0,0
dbfs:/databricks-datasets/amazon/,amazon/,0,0
dbfs:/databricks-datasets/asa/,asa/,0,0
dbfs:/databricks-datasets/atlas_higgs/,atlas_higgs/,0,0


In [0]:
# lendo o arquivo de dados
# inferSchema = True
# header = True

arquivo = 'dbfs:/databricks-datasets/flights/'

df = spark.read.format('csv').option('inferSchema', 'True').option('header', 'True').csv(arquivo)

In [0]:
df.show(5)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 5 rows



In [0]:
#visualizando os valores nulos:
df.filter('delay is NULL').show()

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| null|    null|  null|       null|
|Aberdeen\tSD\tUSA...| null|    null|  null|       null|
|Abilene\tTX\tUSA\...| null|    null|  null|       null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|       null|
|Alamosa\tCO\tUSA\...| null|    null|  null|       null|
|Albany\tGA\tUSA\tABY| null|    null|  null|       null|
|Albany\tNY\tUSA\tALB| null|    null|  null|       null|
|Albuquerque\tNM\t...| null|    null|  null|       null|
|Alexandria\tLA\tU...| null|    null|  null|       null|
|Allentown\tPA\tUS...| null|    null|  null|       null|
|Alliance\tNE\tUSA...| null|    null|  null|       null|
|Alpena\tMI\tUSA\tAPN| null|    null|  null|       null|
|Altoona\tPA\tUSA\...| null|    null|  null|       null|
|Amarillo\tTX\tUSA...| null|    null|  null|       null|
|Anahim Lake\tBC\t...| null|   

In [0]:
# filtrando valores missing
df.filter(df.delay.isNull()).show(10)

+--------------------+-----+--------+------+-----------+
|                date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford\tBC\tC...| null|    null|  null|       null|
|Aberdeen\tSD\tUSA...| null|    null|  null|       null|
|Abilene\tTX\tUSA\...| null|    null|  null|       null|
| Akron\tOH\tUSA\tCAK| null|    null|  null|       null|
|Alamosa\tCO\tUSA\...| null|    null|  null|       null|
|Albany\tGA\tUSA\tABY| null|    null|  null|       null|
|Albany\tNY\tUSA\tALB| null|    null|  null|       null|
|Albuquerque\tNM\t...| null|    null|  null|       null|
|Alexandria\tLA\tU...| null|    null|  null|       null|
|Allentown\tPA\tUS...| null|    null|  null|       null|
+--------------------+-----+--------+------+-----------+
only showing top 10 rows



In [0]:
# Alterando os valores nulos por 0 de TODAS as colunas:
df.na.fill(value=0).show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# Alterando os valores nulos por 0 de uma coluna especifica:
df.na.fill(value=0, subset=['delay']).show()

In [0]:
# Alterando os valores nulos por uma string vazia:
df.na.fill("").show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

In [0]:
# Excluindo linhas com valores nulos:
df.na.drop().show()

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
|01061215|   -6|     602|   ABE|        ATL|
|01061725|   69|     602|   ABE|        ATL|
|01061230|    0|     369|   ABE|        DTW|
|01060625|   -3|     602|   ABE|        ATL|
|01070600|    0|     369|   ABE|        DTW|
|01071725|    0|     602|   ABE|        ATL|
|01071230|    0|     369|   ABE|        DTW|
|01070625|    0|     602|   ABE|        ATL|
|01071219|    0|     569|   ABE|        ORD|
|01080600|

# Tarefas básicas em dataframes

In [0]:
# Adicionando uma coluna ao dataframe
df = df.withColumn('Nova Coluna', df['delay']+2)
df.show(10)

+--------+-----+--------+------+-----------+-----------+
|    date|delay|distance|origin|destination|Nova Coluna|
+--------+-----+--------+------+-----------+-----------+
|01011245|    6|     602|   ABE|        ATL|        8.0|
|01020600|   -8|     369|   ABE|        DTW|       -6.0|
|01021245|   -2|     602|   ABE|        ATL|        0.0|
|01020605|   -4|     602|   ABE|        ATL|       -2.0|
|01031245|   -4|     602|   ABE|        ATL|       -2.0|
|01030605|    0|     602|   ABE|        ATL|        2.0|
|01041243|   10|     602|   ABE|        ATL|       12.0|
|01040605|   28|     602|   ABE|        ATL|       30.0|
|01051245|   88|     602|   ABE|        ATL|       90.0|
|01050605|    9|     602|   ABE|        ATL|       11.0|
+--------+-----+--------+------+-----------+-----------+
only showing top 10 rows



In [0]:
# Removendo coluna
df = df.drop('Nova Coluna')
df.show(10)

+--------+-----+--------+------+-----------+
|    date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245|    6|     602|   ABE|        ATL|
|01020600|   -8|     369|   ABE|        DTW|
|01021245|   -2|     602|   ABE|        ATL|
|01020605|   -4|     602|   ABE|        ATL|
|01031245|   -4|     602|   ABE|        ATL|
|01030605|    0|     602|   ABE|        ATL|
|01041243|   10|     602|   ABE|        ATL|
|01040605|   28|     602|   ABE|        ATL|
|01051245|   88|     602|   ABE|        ATL|
|01050605|    9|     602|   ABE|        ATL|
+--------+-----+--------+------+-----------+
only showing top 10 rows



In [0]:
# Renomeando uma coluna no Dataframe:
df.withColumnRenamed('Nova Coluna', 'Delay_2').show()

+--------+-----+--------+------+-----------+-------+
|    date|delay|distance|origin|destination|Delay_2|
+--------+-----+--------+------+-----------+-------+
|01011245|    6|     602|   ABE|        ATL|    8.0|
|01020600|   -8|     369|   ABE|        DTW|   -6.0|
|01021245|   -2|     602|   ABE|        ATL|    0.0|
|01020605|   -4|     602|   ABE|        ATL|   -2.0|
|01031245|   -4|     602|   ABE|        ATL|   -2.0|
|01030605|    0|     602|   ABE|        ATL|    2.0|
|01041243|   10|     602|   ABE|        ATL|   12.0|
|01040605|   28|     602|   ABE|        ATL|   30.0|
|01051245|   88|     602|   ABE|        ATL|   90.0|
|01050605|    9|     602|   ABE|        ATL|   11.0|
|01061215|   -6|     602|   ABE|        ATL|   -4.0|
|01061725|   69|     602|   ABE|        ATL|   71.0|
|01061230|    0|     369|   ABE|        DTW|    2.0|
|01060625|   -3|     602|   ABE|        ATL|   -1.0|
|01070600|    0|     369|   ABE|        DTW|    2.0|
|01071725|    0|     602|   ABE|        ATL|  

# Trabalhando com UDFs

- Integração de código entre as APIs
- É preciso cuidado com performance dos códigos usando UDFs

In [0]:
from pyspark.sql.types import LongType

In [0]:
#define a função:
def quadrado(s):
  return s * s

In [0]:
quadrado(45)

45 * 45 = 2025


In [0]:
# Registra no banco de dados do spark e define o tipo de retorno, por padrão o tipo é stringtype: 
spark.udf.register('Func_Py_Quadrado', quadrado, LongType())

Out[117]: <function __main__.quadrado(s)>

In [0]:
# Gera valores aleatórios:
spark.range(1, 20).show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [0]:
# Criar uma visão View_temp:
spark.range(1, 20).createOrReplaceTempView('View_temp')

In [0]:
%sql
--- Usando a função criada em python juntamente com o código SQL:
SELECT id, Func_Py_Quadrado(id) AS id_ao_quadrado
FROM View_temp

id,id_ao_quadrado
1,1
2,4
3,9
4,16
5,25
6,36
7,49
8,64
9,81
10,100


## UDFs com Dataframes

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
Func_Py_Quadrado = udf(quadrado, LongType())

In [0]:
#Transforma a View em um Dataframe Spark:
df = spark.table('View_temp')

In [0]:
df.show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



In [0]:
# Utiliza a função UDF para interar sobre todas as linhas da coluna 'id' e adiciona esse resultado em outra coluna:
display(df.select('id', Func_Py_Quadrado('id').alias('id_quadrado')))

id,id_quadrado
1,1
2,4
3,9
4,16
5,25
6,36
7,49
8,64
9,81
10,100


# Koalas
- Koalas é um projeto de código aberto que fornece um substituto imediato para o pandas.
- É uma biblioteca útil tanto para usuários pandas quanto para usuários Spark.
  - Koalas suporta muitas tarefas que são difíceis de fazer com PySpark, por exemplo, plotar dados diretamente de um PySpark DataFrame.
- Koalas suporta SQL diretamente em seus dataframes.

In [0]:
pip install koalas

Python interpreter will be restarted.
Collecting koalas
  Downloading koalas-1.8.2-py3-none-any.whl (390 kB)
Installing collected packages: koalas
Successfully installed koalas-1.8.2
Python interpreter will be restarted.


In [0]:
import numpy as np
import pandas as pd
import databricks.koalas as ks





In [0]:
# cria um pandas dataframe
pdf = pd.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [0]:
# imprime o tipo do dataframe
type(pdf)

Out[4]: pandas.core.frame.DataFrame

In [0]:
# Cria um Koalas DataFrame
kdf = ks.DataFrame({'A': np.random.rand(5),
                    'B': np.random.rand(5)})

In [0]:
# imprime o tipo do dataframe
type(kdf)

Out[6]: databricks.koalas.frame.DataFrame

In [0]:
# Cria um Koalas dataframe a partir de um pandas dataframe
kdf = ks.DataFrame(pdf)
type(kdf)

Out[7]: databricks.koalas.frame.DataFrame

In [0]:
# Outra forma de converter um Pandas Dataframe para uma Koalas Dataframe:
kdf = ks.from_pandas(pdf)
type(kdf)

Out[8]: databricks.koalas.frame.DataFrame

In [0]:
pdf.head()

Unnamed: 0,A,B
0,0.221208,0.831157
1,0.973241,0.690808
2,0.295934,0.670521
3,0.718163,0.216527
4,0.175921,0.79859


In [0]:
kdf.head()

Unnamed: 0,A,B
0,0.221208,0.831157
1,0.973241,0.690808
2,0.295934,0.670521
3,0.718163,0.216527
4,0.175921,0.79859


In [0]:
# imprime as estatitiscas quantitativas:
kdf.describe()

Unnamed: 0,A,B
count,5.0,5.0
mean,0.476893,0.64152
std,0.351168,0.247246
min,0.175921,0.216527
25%,0.221208,0.670521
50%,0.295934,0.690808
75%,0.718163,0.79859
max,0.973241,0.831157


In [0]:
# ordena pela coluna B
kdf.sort_values(by='B')

Unnamed: 0,A,B
3,0.718163,0.216527
2,0.295934,0.670521
1,0.973241,0.690808
4,0.175921,0.79859
0,0.221208,0.831157


In [0]:
# define configurações de layout de células:
from databricks.koalas.config import set_option, get_option
ks.get_option('compute.max_rows')
ks.set_option('compute.max_rows', 2000)

In [0]:
# slice
kdf[['A', 'B']]

Unnamed: 0,A,B
0,0.221208,0.831157
1,0.973241,0.690808
2,0.295934,0.670521
3,0.718163,0.216527
4,0.175921,0.79859


In [0]:
# loc
kdf.loc[1:2]

Unnamed: 0,A,B
1,0.973241,0.690808
2,0.295934,0.670521


In [0]:
# iloc
kdf.iloc[:3, 1:2]



Unnamed: 0,B
0,0.831157
1,0.690808
2,0.670521


# Usando funções pyton com dataframe Koalas

In [0]:
# cria função python
def quadrado(x):
  return x ** x

In [0]:
# Habilita computação de dataframes e séries:
from databricks.koalas.config import set_option, reset_option
set_option('compute.ops_on_diff_frames', True)

In [0]:
# Cria uma nova coluna a partir da função quadrado:
kdf['C'] = kdf.A.apply(quadrado)



In [0]:
display(kdf)

A,B,C
0.2212077478738144,0.8311567940014337,0.7162502021069398
0.9732407502358158,0.6908076210439843,0.9739473969007166
0.295933854058675,0.6705207489230484,0.6974427992758827
0.7181625316904535,0.2165267829821043,0.7883962705847626
0.1759212585719468,0.7985901598213483,0.7366063350875234


In [0]:
# Agrupamento de dados
kdf.groupby('A').sum()

Unnamed: 0_level_0,B,C
A,Unnamed: 1_level_1,Unnamed: 2_level_1
0.221208,0.831157,0.71625
0.973241,0.690808,0.973947
0.295934,0.670521,0.697443
0.718163,0.216527,0.788396
0.175921,0.79859,0.736606


In [0]:
# Agrupamento mais de uma coluna
kdf.groupby(['A', 'B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C
A,B,Unnamed: 2_level_1
0.221208,0.831157,0.71625
0.973241,0.690808,0.973947
0.295934,0.670521,0.697443
0.718163,0.216527,0.788396
0.175921,0.79859,0.736606


In [0]:
# Necessário carrerar o inline para visualizar os plots no notebook
%matplotlib inline

In [0]:
#plotando um gráfico de barras:
speed = [0.1, 17.5, 40, 48, 52, 69, 88]
lifespan = [2, 8, 70, 1.5, 25, 12, 28]

index = ['snail', 'pip', 'elephant', 
        'rabbit', 'giraffe', 'coyote', 'horse']

kdf = ks.DataFrame({'speed': speed,
                   'lifespan': lifespan}, index=index)

kdf.plot.bar()

# Usando SQL no Koalas

In [0]:
# Cria um dataframe Koalas
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                   'pig': [20, 18, 489, 675, 1776],
                   'horse': [4, 25, 281, 600, 1900]})

In [0]:
# Faz query no dataframe koalas
ks.sql("SELECT * FROM {kdf} WHERE pip > 100")

Unnamed: 0,year,pip,horse
0,2003,489,281
1,2009,675,600
2,2014,1776,1900


In [0]:
# Cria um dataframe pandas
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
                   'sheep': [22, 50, 121, 445, 791],
                   'chicken': [250, 326, 589, 1241, 2118]})

In [0]:
# Query com inner join entre dataframe pandas e koalas
ks.sql('''
    SELECT ks.pig, pd.chicken
    FROM {kdf} ks INNER JOIN {pdf} pd
    ON ks.year = pd.year
    ORDER BY ks.pig, pd.chicken''')

Unnamed: 0,pig,chicken
0,18,326
1,20,250
2,489,589
3,675,1241
4,1776,2118


In [0]:
# converter koalas dataframe para Pyspark
kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
pydf = kdf.to_spark()

In [0]:
type(pydf)

Out[45]: pyspark.sql.dataframe.DataFrame