# Importações necessárias

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Iniciando SparkSession

In [16]:
spark = SparkSession.builder \
    .master("local") \
        .appName("bancos-e-tabelas") \
            .config("spark.executer.memory", "1gb") \
                .getOrCreate()

# Banco de Dados e Tabelas

In [17]:
# mostrando banco de dados disponiveis
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
|     desp|
+---------+



In [18]:
# criando uma nova database
spark.sql("create database desp")

AnalysisException: Database 'desp' already exists

In [19]:
# verificar se banco foi criado
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
|     desp|
+---------+



In [20]:
# usar banco de dados criado
spark.sql("use desp")

DataFrame[]

In [21]:
# criando schema para dataframe
arqschema = "id INT, nome STRING, status STRING, cidade STRING, vendas INT, data STRING"

In [22]:
# importando dados csv para dataframe
despachantes = spark.read.csv("/media/robson/HD2/cursos/pyspark/download/despachantes.csv", header=False, schema = arqschema)

In [23]:
despachantes.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [26]:
# transformando dataframe em uma tabela (daframe "despachantes" dará origem a tabela "Despachantes" 
# no banco de dados "desp")
despachantes.write.saveAsTable("Despachantes2")

                                                                                

In [27]:
# para verificar se a tabela foi criada
spark.sql("select * from Despachantes2").show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [28]:
# outra forma de verificar
spark.sql("show tables").show()

+---------+-------------+-----------+
|namespace|    tableName|isTemporary|
+---------+-------------+-----------+
|     desp|despachantes2|      false|
+---------+-------------+-----------+



In [29]:
# modo overwrite escreve por cima da tabela criada
# modo append adiciona novas linhas de registro
despachantes.write.mode("overwrite").saveAsTable("Despachantes2")

# Tabelas Gerenciadas e Externas

In [30]:
# criando um df de uma tabela existente
despachantes1 = spark.sql("select * from despachantes2")
despachantes1.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [31]:
# criando arquivo parquet para tabela externa
despachantes.write.format("parquet") \
    .save("/media/robson/HD2/cursos/pyspark/anotacoes_e_praticas/dados/desparquet")

AnalysisException: path file:/media/robson/HD2/cursos/pyspark/anotacoes_e_praticas/dados/desparquet already exists.

In [32]:
# criando uma tabela externa
despachantes.write.option("path","/media/robson/HD2/cursos/pyspark/anotacoes_e_praticas/dados/desparquet/externa") \
    .saveAsTable("Despachantes_ng")

AnalysisException: CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory /media/robson/HD2/cursos/pyspark/anotacoes_e_praticas/dados/desparquet/externa . To allow overwriting the existing non-empty directory, set 'spark.sql.legacy.allowNonEmptyLocationInCTAS' to true.

In [None]:
# testando se a tabela foi criada
spark.sql("select * from Despachantes_ng").show()

In [None]:
spark.sql("show tables").show()

In [None]:
# verificando se a tabela é gerenciada ou não
spark.sql("show create table Despachantes") \
    .show(truncate=False)

In [None]:
# verificando se a tabela é gerenciada ou não
spark.sql("show create table Despachantes_ng") \
    .show(truncate=False)

In [None]:
# listando tabelas
spark.catalog.listTables()

# Views

In [None]:
# criando view temporária utilizando api de dataframe
despachantes.createOrReplaceTempView("Despachantes_view1")

In [None]:
# consultando view
spark.sql("select * from Despachantes_view1") \
    .show()

In [None]:
# criando uma view global
despachantes.createGlobalTempView("Despachantes_view2")

In [None]:
# consultando view global
spark.sql("select * from global_temp.Despachantes_view2") \
    .show()

In [None]:
# outra forma de criar views temporarias (comando SQL)
spark.sql("CREATE OR REPLACE TEMP VIEW DESP_VIEW AS select * from despachantes")

In [None]:
# consultando view
spark.sql("select * from DESP_VIEW").show()

In [None]:
# outra forma de criar views globais (comando SQL)
spark.sql("CREATE OR REPLACE GLOBAL TEMP VIEW DESP_VIEW AS select * from despachantes")

In [None]:
# para consultar
spark.sql("select * from global_temp.DESP_VIEW").show()

# Comparando DataFrames com Tabelas

In [33]:
# importando modulos
from pyspark.sql import functions as Func
from pyspark.sql.functions import *

In [35]:

spark.sql("select * from Despachantes2") \
    .show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [36]:
# limitar consulta a certas colunas
spark.sql("select nome, vendas from Despachantes2") \
    .show()

+-------------------+------+
|               nome|vendas|
+-------------------+------+
|   Carminda Pestana|    23|
|    Deolinda Vilela|    34|
|   Emídio Dornelles|    34|
|Felisbela Dornelles|    36|
|     Graça Ornellas|    12|
|   Matilde Rebouças|    22|
|    Noêmia   Orriça|    45|
|      Roque Vásquez|    65|
|      Uriel Queiroz|    54|
|   Viviana Sequeira|     0|
+-------------------+------+



In [39]:
despachantes.select("nome", "vendas") \
    .show()

+-------------------+------+
|               nome|vendas|
+-------------------+------+
|   Carminda Pestana|    23|
|    Deolinda Vilela|    34|
|   Emídio Dornelles|    34|
|Felisbela Dornelles|    36|
|     Graça Ornellas|    12|
|   Matilde Rebouças|    22|
|    Noêmia   Orriça|    45|
|      Roque Vásquez|    65|
|      Uriel Queiroz|    54|
|   Viviana Sequeira|     0|
+-------------------+------+



In [None]:
# utilizando clausula com o select do SQL
spark.sql("select nome, vendas from Despachantes where vendas > 20") \
    .show()

In [None]:
despachantes.select("nome", "vendas") \
    .where(Func.col("vendas") > 20) \
        .show()

In [None]:
# vendas por cidade ordenado de forma decrescente 
spark.sql("select cidade, sum(vendas) from Despachantes group by cidade order by 2 desc") \
    .show()

In [None]:
# mesma coisa, com a api DataFrame
despachantes.groupBy("cidade") \
    .agg(sum("vendas")) \
        .orderBy(Func.col("sum(vendas)") \
            .desc()).show()

# Joins com DataFrames e SQL

In [None]:
# criando schema do dataframe reclamacoes
recschema = "idrec INT, datarec STRING, iddesp INT"

In [None]:
# criando dataframe de um arquivo csv
reclamacoes = spark.read.csv("/media/robson/HD2/cursos/pyspark/download/reclamacoes.csv", header=False, schema=recschema)

In [None]:
reclamacoes.show()

In [None]:
# criando tabela no banco de dados
reclamacoes.write.saveAsTable("reclamacoes")

In [None]:
# fazendo o inner join
spark.sql("select reclamacoes.*, despachantes.nome from despachantes \
    inner join reclamacoes on (despachantes.id = reclamacoes.iddesp)") \
        .show()

In [None]:
# utilizando o right join
spark.sql("select reclamacoes.*, despachantes.nome from despachantes \
    right join reclamacoes on (despachantes.id = reclamacoes.iddesp)") \
        .show()

In [None]:
# utilizando left join (no caso aparecem despachantes que NÂO tiveram reclamacoes)
spark.sql("select reclamacoes.*, despachantes.nome from despachantes \
    left join reclamacoes on (despachantes.id = reclamacoes.iddesp)") \
        .show()

In [None]:
# juncoes utilizando inner com api de dataframes
despachantes.join(reclamacoes, despachantes.id == reclamacoes.iddesp, "inner") \
    .select("idrec", "datarec", "iddesp", "nome") \
        .show()

In [None]:
# juncoes utilizando right com api de dataframes
despachantes.join(reclamacoes, despachantes.id == reclamacoes.iddesp, "right") \
    .select("idrec", "datarec", "iddesp", "nome") \
        .show()

In [None]:
# juncoes utilizando left com api de dataframes (aparecem despachantes sem reclamacoes)
despachantes.join(reclamacoes, despachantes.id == reclamacoes.iddesp, "left") \
    .select("idrec", "datarec", "iddesp", "nome") \
        .show()

# Utilizando Spark-SQL

In [40]:
spark.sql('use desp')

DataFrame[]

In [44]:
spark.sql("show tables").show()


+---------+-------------+-----------+
|namespace|    tableName|isTemporary|
+---------+-------------+-----------+
|     desp|despachantes2|      false|
+---------+-------------+-----------+

