In [1]:
from pyspark.sql import SparkSession

**Criando uma aplicação Spark**

In [2]:
spark = SparkSession.builder.master("local").appName("SparkByExamples.com").getOrCreate()

22/05/09 01:14:07 WARN Utils: Your hostname, rowedder-HP-240-G4-Notebook-PC resolves to a loopback address: 127.0.1.1; using 192.168.1.102 instead (on interface enp1s0)
22/05/09 01:14:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/09 01:14:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**Criando um dataframe sem definir um schema**

In [3]:
df1 = spark.createDataFrame([("Pedro", 10),("Maria", 20),("José", 40)])

In [4]:
df1.show()

[Stage 0:>                                                          (0 + 1) / 1]

+-----+---+
|   _1| _2|
+-----+---+
|Pedro| 10|
|Maria| 20|
| José| 40|
+-----+---+



                                                                                

**Criando um dataframe com schema**

In [5]:
schema = "id INT, nome STRING"
dados = [[1, "Pedro"], [2, "Maria"]]
df2 = spark.createDataFrame(dados, schema)

In [6]:
df2.show()

+---+-----+
| id| nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
+---+-----+



**Agregando dados**

In [7]:
from pyspark.sql.functions import sum  #função de soma (agregação)

In [8]:
schema2 = "produtos STRING, vendas INT"
vendas = [["Caneta", 10], ["Lápis", 15], ["Caneta", 40]]
df3 = spark.createDataFrame(vendas, schema2)

In [9]:
df3.show()

+--------+------+
|produtos|vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    15|
|  Caneta|    40|
+--------+------+



In [10]:
# Colocando o show na mesma linha do script de agregação.
agrupado = df3.groupBy("produtos").agg(sum("vendas")).show()

[Stage 3:>                                                          (0 + 1) / 1]

+--------+-----------+
|produtos|sum(vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         15|
+--------+-----------+



                                                                                

**Visualizando apenas as colunas que quero**

In [11]:
#O dataframe inteiro
df3.select("produtos","vendas").show()

+--------+------+
|produtos|vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    15|
|  Caneta|    40|
+--------+------+



In [12]:
# Apenas a coluna vendas
df3.select("vendas").show()

+------+
|vendas|
+------+
|    10|
|    15|
|    40|
+------+



In [13]:
# Invertendo a ordem 
df3.select("vendas","produtos").show()

+------+--------+
|vendas|produtos|
+------+--------+
|    10|  Caneta|
|    15|   Lápis|
|    40|  Caneta|
+------+--------+



**Criando uma expressão (qualquer cálculo matemático)**

In [14]:
from pyspark.sql.functions import expr

In [15]:
# O quantidade de vendas vezes 0.2 
df3.select("produtos", "vendas", expr("vendas * 0.2")).show()

+--------+------+--------------+
|produtos|vendas|(vendas * 0.2)|
+--------+------+--------------+
|  Caneta|    10|           2.0|
|   Lápis|    15|           3.0|
|  Caneta|    40|           8.0|
+--------+------+--------------+



**Visualizando o schema do dataframe**

In [16]:
df3.schema

StructType(List(StructField(produtos,StringType,true),StructField(vendas,IntegerType,true)))

**Visualizando as colunas do dataframe**

In [17]:
df3.columns

['produtos', 'vendas']

**Importando um arquivo CSV e definindo um schema para esse arquivo**

In [18]:
from pyspark.sql.types import *

In [19]:
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"
despachantes = spark.read.csv("/home/rowedder/download/despachantes.csv", header=False, schema = arqschema)

In [20]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [21]:
# Abrindo o CSV com load (um método genérico) no lugar do spark.read.csv
# Colocando o sep="," --> Para conseguir abrir o formato que o arquivo foi criado (, ou ;)
# inferSchema="True" --> Ele infere o schema automaticamente.
desp_autoschema = spark.read.load("/home/rowedder/download/despachantes.csv", header=False, format="csv", sep=",", inferSchema="True")

In [22]:
desp_autoschema.show()

+---+-------------------+-----+-------------+---+----------+
|_c0|                _c1|  _c2|          _c3|_c4|       _c5|
+---+-------------------+-----+-------------+---+----------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05|
+---+-------------------+-----+-------------+---+----------+



**Comparando os tipos de dados dos dataframes despachantes e desp_autoschema da coluna data**

In [23]:
despachantes.schema

StructType(List(StructField(id,IntegerType,true),StructField(nome,StringType,true),StructField(status,StringType,true),StructField(cidade,StringType,true),StructField(vendas,IntegerType,true),StructField(data,StringType,true)))

In [24]:
desp_autoschema.schema

StructType(List(StructField(_c0,IntegerType,true),StructField(_c1,StringType,true),StructField(_c2,StringType,true),StructField(_c3,StringType,true),StructField(_c4,IntegerType,true),StructField(_c5,StringType,true)))

**Filtrando dados com WHERE**

In [25]:
from pyspark.sql import functions as func

In [26]:
# Filtrando vendas maior que 20.
despachantes.select("id","nome","vendas").where(func.col("vendas") > 20).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



In [27]:
# Filtrando vendas maiores que 20 e menores do que 40.
despachantes.select("id","nome","vendas").where((func.col("vendas") > 20)&(func.col("vendas")<40)).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



**Renomeando colunas de um dataframe**

In [28]:
novodf = despachantes.withColumnRenamed("nome", "nomes")
novodf.columns

['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

In [29]:
novodf.show()

+---+-------------------+------+-------------+------+----------+
| id|              nomes|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



**Alterando o tipo de dado da coluna**

In [30]:
from pyspark.sql.functions import *

In [31]:
#Cria-se mais uma coluna porém podemos deletá-la depois.
despachantes2 = despachantes.withColumn("data2", to_timestamp(func.col("data"),"yyyy-mm-dd"))

In [32]:
despachantes2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- status: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- vendas: integer (nullable = true)
 |-- data: string (nullable = true)
 |-- data2: timestamp (nullable = true)



**Filtrando apenas o ano da data completa**

In [33]:
despachantes.select(year("data")).show()

+----------+
|year(data)|
+----------+
|      2020|
|      2020|
|      2020|
|      2020|
|      2020|
|      2019|
|      2019|
|      2020|
|      2018|
|      2020|
+----------+



**Filtrando as datas que aparecem apenas uma vez**

In [34]:
despachantes.select(year("data")).distinct().show()

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+



**Ordenando pelo nome e data**

In [36]:
despachantes2.select("nome", year("data")).orderBy("nome").show()

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Carminda Pestana|      2020|
|    Deolinda Vilela|      2020|
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|     Graça Ornellas|      2020|
|   Matilde Rebouças|      2019|
|    Noêmia   Orriça|      2019|
|      Roque Vásquez|      2020|
|      Uriel Queiroz|      2018|
|   Viviana Sequeira|      2020|
+-------------------+----------+



**Contando a quantidade de registros por data**

In [39]:
despachantes2.select("data").groupBy(year("data")).count().show()

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+



**Somar a quantidade de vendas**

In [40]:
despachantes2.select(func.sum("vendas")).show()

+-----------+
|sum(vendas)|
+-----------+
|        325|
+-----------+



**Principais ações e transformações em dataframes**

**Forma tabular**

In [42]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



**Forma de lista (take)** 

In [44]:
despachantes.take(2)

[Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11'),
 Row(id=2, nome='Deolinda Vilela', status='Ativo', cidade='Novo Hamburgo', vendas=34, data='2020-03-05')]

**Collect**

In [45]:
#Trás todos os dados na forma duma lista. Ação herdada dos RDDs.
despachantes.collect()

[Row(id=1, nome='Carminda Pestana', status='Ativo', cidade='Santa Maria', vendas=23, data='2020-08-11'),
 Row(id=2, nome='Deolinda Vilela', status='Ativo', cidade='Novo Hamburgo', vendas=34, data='2020-03-05'),
 Row(id=3, nome='Emídio Dornelles', status='Ativo', cidade='Porto Alegre', vendas=34, data='2020-02-05'),
 Row(id=4, nome='Felisbela Dornelles', status='Ativo', cidade='Porto Alegre', vendas=36, data='2020-02-05'),
 Row(id=5, nome='Graça Ornellas', status='Ativo', cidade='Porto Alegre', vendas=12, data='2020-02-05'),
 Row(id=6, nome='Matilde Rebouças', status='Ativo', cidade='Porto Alegre', vendas=22, data='2019-01-05'),
 Row(id=7, nome='Noêmia   Orriça', status='Ativo', cidade='Santa Maria', vendas=45, data='2019-10-05'),
 Row(id=8, nome='Roque Vásquez', status='Ativo', cidade='Porto Alegre', vendas=65, data='2020-03-05'),
 Row(id=9, nome='Uriel Queiroz', status='Ativo', cidade='Porto Alegre', vendas=54, data='2018-05-05'),
 Row(id=10, nome='Viviana Sequeira', status='Ativo', c

**Count**

In [47]:
#Trás os numeros de linhas (row) do dataframe.
despachantes.count()

10

**orderBy**

In [48]:
# Por padrão ordena do menor para o maior (crescente)
despachantes.orderBy("vendas").show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
+---+-------------------+------+-------------+------+----------+



In [51]:
# Decrescente
despachantes.orderBy(func.col("vendas").desc()).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [53]:
# Ordenando a coluna cidade e a coluna vendas de forma decrescente.
despachantes.orderBy(func.col("cidade").desc(), func.col("vendas").desc()).show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
+---+-------------------+------+-------------+------+----------+



**Total de vendas por cidade**

In [54]:
despachantes.groupBy("cidade").agg(sum("vendas")).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
|  Santa Maria|         68|
|Novo Hamburgo|         34|
| Porto Alegre|        223|
+-------------+-----------+



**Agregando as vendas por cidade e ordenando do maior para o menor**

In [56]:
# Agreguei as vendas por cidade e ordenei do maior para o menor.
despachantes.groupBy("cidade").agg(sum("vendas")).orderBy(func.col("sum(vendas)").desc()).show()

+-------------+-----------+
|       cidade|sum(vendas)|
+-------------+-----------+
| Porto Alegre|        223|
|  Santa Maria|         68|
|Novo Hamburgo|         34|
+-------------+-----------+



**Filter**

In [57]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [59]:
despachantes.filter(func.col("nome") == "Deolinda Vilela").show()

+---+---------------+------+-------------+------+----------+
| id|           nome|status|       cidade|vendas|      data|
+---+---------------+------+-------------+------+----------+
|  2|Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
+---+---------------+------+-------------+------+----------+

