# Seção Inicial sobre DataFrames no Spark

## Instalando as bibliotecas necessárias

In [8]:
# pyspark
!pip install pyspark

# findspark
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## Import das bibliotecas

In [9]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
      .master("local[1]") \
      .appName("SparkByExamples.com") \
      .getOrCreate()

## Criação do primeiro dataFrame sem Schema

In [10]:
df1 = spark.createDataFrame([("Pedro", 10), ("Maria", 20),("José", 40)])
df1.show()

+-----+---+
|   _1| _2|
+-----+---+
|Pedro| 10|
|Maria| 20|
| José| 40|
+-----+---+



## Criação do DataFrame com Schema

In [11]:
schema = "Id INT, Nome STRING"
dados = [[1, "Pedro"], [2, "Maria"]]
df2 = spark.createDataFrame(dados, schema)
df2.show()

df2.show(1)

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
+---+-----+

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
+---+-----+
only showing top 1 row



## Operações com o Spark

In [12]:
from pyspark.sql.functions import sum

In [13]:
schema2 = "Produtos STRING, Vendas INT"
vendas = [["Caneta", 10], ["Lapis", 20], ["Caneta", 40]]

df3 = spark.createDataFrame(vendas, schema2)
df3.show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lapis|    20|
|  Caneta|    40|
+--------+------+



In [14]:
agrupado = df3.groupBy("Produtos").agg(sum("Vendas"))
agrupado.show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lapis|         20|
+--------+-----------+



In [15]:
df3.groupBy("Produtos").agg(sum("Vendas")).show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lapis|         20|
+--------+-----------+



In [16]:
df3.select("Produtos").show()

+--------+
|Produtos|
+--------+
|  Caneta|
|   Lapis|
|  Caneta|
+--------+



In [17]:
df3.select("Produtos", "Vendas").show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lapis|    20|
|  Caneta|    40|
+--------+------+



In [18]:
df3.select("Vendas", "Produtos").show()

+------+--------+
|Vendas|Produtos|
+------+--------+
|    10|  Caneta|
|    20|   Lapis|
|    40|  Caneta|
+------+--------+



In [19]:
from pyspark.sql.functions import expr

In [20]:
df3.select("Produtos", "Vendas", expr("Vendas * 0.2")).show()

+--------+------+--------------+
|Produtos|Vendas|(Vendas * 0.2)|
+--------+------+--------------+
|  Caneta|    10|           2.0|
|   Lapis|    20|           4.0|
|  Caneta|    40|           8.0|
+--------+------+--------------+



# DataFrame utilizando os dados do download

In [21]:
!wget www.datascientist.com.br/bigdata/download.zip

--2023-01-05 17:07:10--  http://www.datascientist.com.br/bigdata/download.zip
Resolving www.datascientist.com.br (www.datascientist.com.br)... 108.179.192.203
Connecting to www.datascientist.com.br (www.datascientist.com.br)|108.179.192.203|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 207452 (203K) [application/zip]
Saving to: ‘download.zip’


2023-01-05 17:07:10 (1.04 MB/s) - ‘download.zip’ saved [207452/207452]



Extrair os dados

In [22]:
!unzip download.zip

Archive:  download.zip
   creating: download/Atividades/
  inflating: download/Atividades/Clientes.parquet  
  inflating: download/Atividades/ItensVendas.parquet  
  inflating: download/Atividades/Produtos.parquet  
  inflating: download/Atividades/Vendas.parquet  
  inflating: download/Atividades/Vendedores.parquet  
  inflating: download/Carros.csv     
  inflating: download/Churn.csv      
   creating: download/demo/
  inflating: download/demo/1.CreateTable.sql  
  inflating: download/demo/2.InsertClientes.sql  
  inflating: download/demo/3.InsertIntoProdutos.sql  
  inflating: download/demo/4.InsertIntoVendedores.sql  
  inflating: download/demo/5.InsertIntoVendas.sql  
  inflating: download/demo/6.InsertItensVenda.sql  
  inflating: download/despachantes.csv  
  inflating: download/iris.csv       
   creating: download/mongo/
  inflating: download/mongo/posts.json  
  inflating: download/reclamacoes.csv  
   creating: download/streaming/
  inflating: download/streaming/1.json14.js

In [23]:
from pyspark.sql.types import *

In [24]:
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"

despachante = spark.read.csv("/content/download/despachantes.csv", header=False, schema=arqschema)

In [25]:
despachante.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [26]:
desp_autoschema = spark.read.load("/content/download/despachantes.csv", 
                                  header=False, format="csv", sep=",", 
                                  inferSchema=True)

desp_autoschema.show()

+---+-------------------+-----+-------------+---+-------------------+
|_c0|                _c1|  _c2|          _c3|_c4|                _c5|
+---+-------------------+-----+-------------+---+-------------------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05 00:00:00|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05 00:00:00|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05 00:00:00|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05 00:00:00|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05 00:00:00|
+---+-------------------+-----+-------------+---+-------------------+



In [27]:
desp_autoschema.schema

StructType([StructField('_c0', IntegerType(), True), StructField('_c1', StringType(), True), StructField('_c2', StringType(), True), StructField('_c3', StringType(), True), StructField('_c4', IntegerType(), True), StructField('_c5', TimestampType(), True)])

In [28]:
despachante.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True)])

## Funções

In [29]:
from pyspark.sql import functions as Func

In [30]:
despachante.select("id", "nome", "vendas").where(Func.col("vendas") > 20).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



In [31]:
despachante.select("id", "nome", "vendas").where((Func.col("vendas") > 20) & (Func.col("vendas") < 40)).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



## Modificar nome e tipos de colunas do dataframe

In [32]:
from pyspark.sql.functions import *

despachantes2 = despachante.withColumn("data2", to_timestamp(Func.col("data"), "yyyy-MM-dd"))

despachantes2.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True), StructField('data2', TimestampType(), True)])

In [33]:
despachantes2.select(year("data")).show()

+----------+
|year(data)|
+----------+
|      2020|
|      2020|
|      2020|
|      2020|
|      2020|
|      2019|
|      2019|
|      2020|
|      2018|
|      2020|
+----------+



In [34]:
despachantes2.select(year("data")).distinct().show()

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+



In [35]:
despachantes2.select("nome", year("data")).orderBy("nome").show()

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Carminda Pestana|      2020|
|    Deolinda Vilela|      2020|
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|     Graça Ornellas|      2020|
|   Matilde Rebouças|      2019|
|    Noêmia   Orriça|      2019|
|      Roque Vásquez|      2020|
|      Uriel Queiroz|      2018|
|   Viviana Sequeira|      2020|
+-------------------+----------+



## Operações realizadas no dataframe

In [36]:
despachantes2.select("data").groupBy(year("data")).count().show()

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+



In [37]:
despachantes2.select(Func.sum("vendas")).show()

+-----------+
|sum(vendas)|
+-----------+
|        325|
+-----------+



In [38]:
despachante.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [39]:
despachante.take(1)

[Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11')]

In [40]:
despachante.first()

Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11')

In [41]:
despachante.collect()

[Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11'),
 Row(id=2, nome='Deolinda Vilela', status='Ativo', cidade='Novo Hamburgo', vendas=34, data='2020-03-05'),
 Row(id=3, nome='Emídio Dornelles', status='Ativo', cidade='Porto Alegre', vendas=34, data='2020-02-05'),
 Row(id=4, nome='Felisbela Dornelles', status='Ativo', cidade='Porto Alegre', vendas=36, data='2020-02-05'),
 Row(id=5, nome='Graça Ornellas', status='Ativo', cidade='Porto Alegre', vendas=12, data='2020-02-05'),
 Row(id=6, nome='Matilde Rebouças', status='Ativo', cidade='Porto Alegre', vendas=22, data='2019-01-05'),
 Row(id=7, nome='Noêmia   Orriça', status='Ativo', cidade='Santa Maria', vendas=45, data='2019-10-05'),
 Row(id=8, nome='Roque Vásquez', status='Ativo', cidade='Porto Alegre', vendas=65, data='2020-03-05'),
 Row(id=9, nome='Uriel Queiroz', status='Ativo', cidade='Porto Alegre', vendas=54, data='2018-05-05'),
 Row(id=10, nome='Viviana Sequeira', status='Ativo', c

In [42]:
despachante.count()

10

In [43]:
despachante.orderBy("vendas").show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
+---+-------------------+------+-------------+------+----------+



In [44]:
despachante.orderBy(Func.col("vendas").desc()).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [45]:
despachante.orderBy(Func.col("cidade").desc(), Func.col("vendas").desc()).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
+---+-------------------+------+-------------+------+----------+



In [46]:
despachante.groupBy("cidade").agg(sum("vendas")).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
|  Santa Maria|         68|
|Novo Hamburgo|         34|
| Porto Alegre|        223|
+-------------+-----------+



In [47]:
despachante.groupBy("cidade").agg(sum("vendas")).orderBy(Func.col("sum(vendas)").desc()).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
| Porto Alegre|        223|
|  Santa Maria|         68|
|Novo Hamburgo|         34|
+-------------+-----------+



In [48]:
despachante.filter(Func.col("nome") == "Deolinda Vilela").show()

+---+---------------+------+-------------+------+----------+
| id|           nome|status|       cidade|vendas|      data|
+---+---------------+------+-------------+------+----------+
|  2|Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
+---+---------------+------+-------------+------+----------+



## Exportação de dados

In [49]:
despachante.write.format("parquet").save("/content/app/parquet")

In [50]:
despachante.write.format("csv").save("/content/app/csv")

In [51]:
despachante.write.format("json").save("/content/app/json")

In [52]:
despachante.write.format("orc").save("/content/app/orc")

## Importar Arquivos

Renomear os arquivos para despachantes.**

In [53]:
arqschema

'id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING'

In [59]:
csv = spark.read.format("csv").load("/content/app/csv/despachante.csv",
                                      schema=arqschema)

csv.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [60]:
parquet = spark.read.format("parquet").load("/content/app/parquet/despachante.parquet")

parquet.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [61]:
parquet.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True)])

In [62]:
csv.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True)])

Para os outros formatos basta trocar a pasta e o format