In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as Func
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Criar uma SparkSession
spark = SparkSession.builder.appName("Exemplo de Dataframe").getOrCreate()

25/04/29 10:52:46 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [30]:
#criar um data frame simples, sem schema
df1 = spark.createDataFrame([
    ("Pedro", 10),
    ("Maria",20),
    ("José",40)
])

#show é ação, então tudo o que foi feito anteriormente é executado, lazzy
df1.show()

+-----+---+
|   _1| _2|
+-----+---+
|Pedro| 10|
|Maria| 20|
| José| 40|
+-----+---+



In [31]:
#criar df com schema
schema = "Id INT, Nome STRING"
dados = [
    (1, "Pedro"),
    (2, "Maria"),
    (3, "José")
]

df2 = spark.createDataFrame(dados, schema)
df2.show()

+---+-----+
| Id| Nome|
+---+-----+
|  1|Pedro|
|  2|Maria|
|  3| José|
+---+-----+



In [32]:
#com transformação
schema2 = "Produtos STRING, Vendas INT"
vendas = [
    ["Caneta", 10],
    ["Lápis", 20],
    ["Caneta", 40]
]

df3 = spark.createDataFrame(vendas , schema2)

#podemos contatenar as operações, neste caso sem persitir
df3.groupBy("Produtos").agg(sum("Vendas")).show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



In [33]:
agrupado = df3.groupBy("Produtos").agg(sum("Vendas"))
agrupado.show()

+--------+-----------+
|Produtos|sum(Vendas)|
+--------+-----------+
|  Caneta|         50|
|   Lápis|         20|
+--------+-----------+



In [34]:
#selecionar colunas específicas
df3.select("Produtos").show()

+--------+
|Produtos|
+--------+
|  Caneta|
|   Lápis|
|  Caneta|
+--------+



In [35]:
df3.select("Produtos", "Vendas").show()

+--------+------+
|Produtos|Vendas|
+--------+------+
|  Caneta|    10|
|   Lápis|    20|
|  Caneta|    40|
+--------+------+



In [36]:
#expressões e select
df3.select("Produtos", "Vendas", expr("Vendas * 0.2")).show()

+--------+------+--------------+
|Produtos|Vendas|(Vendas * 0.2)|
+--------+------+--------------+
|  Caneta|    10|           2.0|
|   Lápis|    20|           4.0|
|  Caneta|    40|           8.0|
+--------+------+--------------+



In [37]:
#para ver o schema
df3.schema

StructType([StructField('Produtos', StringType(), True), StructField('Vendas', IntegerType(), True)])

In [38]:
#ver colunas
df3.columns

['Produtos', 'Vendas']

In [15]:
# Define o esquema do arquivo CSV com os tipos de dados para cada coluna
schema_arq = "id INT, " \
    "nome STRING, " \
    "status STRING, " \
    "cidade STRING, " \
    "vendas INT, " \
    "data STRING"

# Define o caminho do arquivo CSV que será lido
despachantes_csv = "../../arquivos/download/despachantes.csv"

# Lê o arquivo CSV usando o Spark, sem cabeçalho e aplicando o esquema definido
despachantes = spark.read.csv(
    despachantes_csv, 
    header = False, 
    schema = schema_arq
)

# Exibe as primeiras linhas do DataFrame
despachantes.show()

# Exibe o esquema do DataFrame
despachantes.schema

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True)])

In [40]:
#condição lógica com where
despachantes.select("id", "nome", "vendas").where(Func.col("vendas") > 20).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
|  7|    Noêmia   Orriça|    45|
|  8|      Roque Vásquez|    65|
|  9|      Uriel Queiroz|    54|
+---+-------------------+------+



In [41]:
#& para and, | para or, e ~ para not
despachantes.select("id","nome","vendas").where((Func.col("vendas") > 20) & (Func.col("vendas") < 40)).show()

+---+-------------------+------+
| id|               nome|vendas|
+---+-------------------+------+
|  1|   Carminda Pestana|    23|
|  2|    Deolinda Vilela|    34|
|  3|   Emídio Dornelles|    34|
|  4|Felisbela Dornelles|    36|
|  6|   Matilde Rebouças|    22|
+---+-------------------+------+



In [42]:
#renomear coluna
novodf = despachantes.withColumnRenamed("nome","nomes")
novodf.columns

['id', 'nomes', 'status', 'cidade', 'vendas', 'data']

In [43]:
#coluna data está como string, vamos transformar em texto
despachantes2 = despachantes.withColumn("data2", to_timestamp(Func.col("data"), "yyyy-MM-dd"))
despachantes2.schema

StructType([StructField('id', IntegerType(), True), StructField('nome', StringType(), True), StructField('status', StringType(), True), StructField('cidade', StringType(), True), StructField('vendas', IntegerType(), True), StructField('data', StringType(), True), StructField('data2', TimestampType(), True)])

In [None]:
#operações sobre datas
despachantes2.select(year("data")).show()
despachantes2.select(year("data")).distinct().show()
despachantes2.select("nome",year("data")).orderBy("nome").show()
despachantes2.select("data").groupBy(year("data")).count().show()
despachantes2.select(Func.sum("vendas")).show()

In [None]:
# Cria um arquivo no formato parquet
despachantes.write.mode("overwrite").format("parquet").save("df_import_parquet")

In [48]:
# Cria um arquivo no formato csv
despachantes.write.mode("overwrite").format("csv").save("df_import_csv")

In [49]:
# Cria um arquivo no formato json
despachantes.write.mode("overwrite").format("json").save("df_import_json")

In [50]:
# Cria um arquivo no formato orc
despachantes.write.mode("overwrite").format("orc").save("df_import_orc")

In [19]:
# Criando o importe no formato parquet
parquet = spark.read.format("parquet").load("./df_import_parquet/despachantes.parquet")
parquet.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [18]:
# Criando o importe no formato json
json = spark.read.format("json").load("./df_import_json/despachantes.json")
json.show()

+-------------+----------+---+-------------------+------+------+
|       cidade|      data| id|               nome|status|vendas|
+-------------+----------+---+-------------------+------+------+
|  Santa Maria|2020-08-11|  1|   Carminda Pestana| Ativo|    23|
|Novo Hamburgo|2020-03-05|  2|    Deolinda Vilela| Ativo|    34|
| Porto Alegre|2020-02-05|  3|   Emídio Dornelles| Ativo|    34|
| Porto Alegre|2020-02-05|  4|Felisbela Dornelles| Ativo|    36|
| Porto Alegre|2020-02-05|  5|     Graça Ornellas| Ativo|    12|
| Porto Alegre|2019-01-05|  6|   Matilde Rebouças| Ativo|    22|
|  Santa Maria|2019-10-05|  7|    Noêmia   Orriça| Ativo|    45|
| Porto Alegre|2020-03-05|  8|      Roque Vásquez| Ativo|    65|
| Porto Alegre|2018-05-05|  9|      Uriel Queiroz| Ativo|    54|
| Porto Alegre|2020-09-05| 10|   Viviana Sequeira| Ativo|     0|
+-------------+----------+---+-------------------+------+------+



In [17]:
# Criando o importe no formato orc
orc = spark.read.format("orc").load("./df_import_orc/despachantes.orc")
orc.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [None]:
# Criando o importe no formato csv e definindo o schema
csv = spark.read.format("csv").load("./df_import_csv/despachantes.csv", schema = schema_arq)
csv.show()

+---+-------------------+------+-------------+------+----------+
| id|               nome|status|       cidade|vendas|      data|
+---+-------------------+------+-------------+------+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|    23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|    34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|    34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|    36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|    12|2020-02-05|
|  6|   Matilde Rebouças| Ativo| Porto Alegre|    22|2019-01-05|
|  7|    Noêmia   Orriça| Ativo|  Santa Maria|    45|2019-10-05|
|  8|      Roque Vásquez| Ativo| Porto Alegre|    65|2020-03-05|
|  9|      Uriel Queiroz| Ativo| Porto Alegre|    54|2018-05-05|
| 10|   Viviana Sequeira| Ativo| Porto Alegre|     0|2020-09-05|
+---+-------------------+------+-------------+------+----------+



In [21]:
# Para o Spark no final do script, se ainda não foi parado
if spark.sparkContext._jsc.sc().isStopped() == False:
    spark.stop()