![image](images/dataframe.png)

![image](images/tipos_dados.png)

![image](images/schema.png)

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
sc= spark.sparkContext

In [3]:
spark

In [4]:
df1 = spark.createDataFrame([("Pedro",10),("Maria",20),("José",40)])

df1.show()

+-----+---+
|   _1| _2|
+-----+---+
|Pedro| 10|
|Maria| 20|
| José| 40|
+-----+---+



In [5]:
schema = "Id INT, Nome STRING"

In [6]:
dados = [[1,"Pedro"],[2,"Maria"]]

In [7]:
df2 = spark.createDataFrame(dados, schema)

df2.show()

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
+---+-----+



In [8]:
df2.dtypes

[('Id', 'int'), ('Nome', 'string')]

In [9]:
from pyspark.sql.functions import sum

In [10]:
schema2 = "Produtos STRING, Vendas INT"

In [11]:
vendas = [["Caneta",10],["Lápis",20],["Caneta",40]]

In [13]:
df3 = spark.createDataFrame(vendas, schema2)

df3.show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    20|
|  Caneta|    40|
+--------+------+



In [15]:
df3.groupBy("Produtos").agg(sum("Vendas")).show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



In [16]:
df3.select("Produtos").show()

+--------+
|Produtos|
+--------+
|  Caneta|
|   Lápis|
|  Caneta|
+--------+



In [18]:
df3.select("Vendas","Produtos").show()

+------+--------+
|Vendas|Produtos|
+------+--------+
|    10|  Caneta|
|    20|   Lápis|
|    40|  Caneta|
+------+--------+



In [19]:
from pyspark.sql.functions import expr

In [20]:
df3.select("Produtos","Vendas", expr("Vendas * 0.2")).show()

+--------+------+--------------+
|Produtos|Vendas|(Vendas * 0.2)|
+--------+------+--------------+
|  Caneta|    10|           2.0|
|   Lápis|    20|           4.0|
|  Caneta|    40|           8.0|
+--------+------+--------------+



In [21]:
df3.schema

StructType([StructField('Produtos', StringType(), True), StructField('Vendas', IntegerType(), True)])

In [22]:
df3.columns

['Produtos', 'Vendas']

## Importando Dados

In [23]:
from pyspark.sql.types import *

### Inferindo Schema

In [30]:
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"

In [31]:
despachantes = spark.read.csv("/home/jovyan/work/notebooks/data/despachantes.csv",header=False, schema=arqschema)

In [32]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [36]:
despachantes.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- status: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- vendas: integer (nullable = true)
 |-- data: string (nullable = true)



### Auto Schema

In [33]:
desp_autoschema = spark.read.load("/home/jovyan/work/notebooks/data/despachantes.csv",header=False, format="csv",sep=",",inferSchema=True)

In [34]:
desp_autoschema.show()

+---+-------------------+-----+-------------+---+-------------------+
|_c0|                _c1|  _c2|          _c3|_c4|                _c5|
+---+-------------------+-----+-------------+---+-------------------+
|  1|   Carminda Pestana|Ativo|  Santa Maria| 23|2020-08-11 00:00:00|
|  2|    Deolinda Vilela|Ativo|Novo Hamburgo| 34|2020-03-05 00:00:00|
|  3|   Emídio Dornelles|Ativo| Porto Alegre| 34|2020-02-05 00:00:00|
|  4|Felisbela Dornelles|Ativo| Porto Alegre| 36|2020-02-05 00:00:00|
|  5|     Graça Ornellas|Ativo| Porto Alegre| 12|2020-02-05 00:00:00|
|  6|   Matilde Rebouças|Ativo| Porto Alegre| 22|2019-01-05 00:00:00|
|  7|    Noêmia   Orriça|Ativo|  Santa Maria| 45|2019-10-05 00:00:00|
|  8|      Roque Vásquez|Ativo| Porto Alegre| 65|2020-03-05 00:00:00|
|  9|      Uriel Queiroz|Ativo| Porto Alegre| 54|2018-05-05 00:00:00|
| 10|   Viviana Sequeira|Ativo| Porto Alegre|  0|2020-09-05 00:00:00|
+---+-------------------+-----+-------------+---+-------------------+



In [37]:
desp_autoschema.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: timestamp (nullable = true)



### Filtrando Dataframes

In [38]:
from pyspark.sql import functions as f

In [39]:
despachantes.select("id","nome","vendas").where(f.col("vendas")>20).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



In [40]:
despachantes.select("id","nome","vendas").where((f.col("vendas")>20) & (f.col("vendas")<40)).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



### Renomeando Colunas

In [41]:
novodf = despachantes.withColumnRenamed("nome","nomes")
novodf.columns

['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

### Mudando o tipo

In [42]:
from pyspark.sql.functions import *

In [44]:
despachantes2 = despachantes.withColumn("data2",to_timestamp(f.col("data"), "yyyy-MM-dd"))

In [45]:
despachantes2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- nome: string (nullable = true)
 |-- status: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- vendas: integer (nullable = true)
 |-- data: string (nullable = true)
 |-- data2: timestamp (nullable = true)



In [46]:
despachantes2.select(year("data")).show()

+----------+
|year(data)|
+----------+
|      2020|
|      2020|
|      2020|
|      2020|
|      2020|
|      2019|
|      2019|
|      2020|
|      2018|
|      2020|
+----------+



In [47]:
despachantes2.select(year("data")).distinct().show()

+----------+
|year(data)|
+----------+
|      2018|
|      2019|
|      2020|
+----------+



In [48]:
despachantes2.select("nome", year("data")).orderBy("nome").show()

+-------------------+----------+
|               nome|year(data)|
+-------------------+----------+
|   Carminda Pestana|      2020|
|    Deolinda Vilela|      2020|
|   Emídio Dornelles|      2020|
|Felisbela Dornelles|      2020|
|     Graça Ornellas|      2020|
|   Matilde Rebouças|      2019|
|    Noêmia   Orriça|      2019|
|      Roque Vásquez|      2020|
|      Uriel Queiroz|      2018|
|   Viviana Sequeira|      2020|
+-------------------+----------+



In [49]:
despachantes2.select("data").groupBy(year("data")).count().show()

+----------+-----+
|year(data)|count|
+----------+-----+
|      2018|    1|
|      2019|    2|
|      2020|    7|
+----------+-----+



In [50]:
despachantes2.select(f.sum("vendas")).show()

+-----------+
|sum(vendas)|
+-----------+
|        325|
+-----------+

