Importando Spark e CSV

In [1]:
import findspark
import shutil
import glob
import os
findspark.init()
from pyspark.sql import functions as F

from pyspark.sql import SparkSession
from pyspark.sql.functions import concat
from pyspark.sql.functions import col

# Criar a sessão Spark de forma mais simples
spark = SparkSession.builder \
    .appName("TestePySpark") \
    .master("local[*]") \
    .getOrCreate()

caminho_csv = "dados/dados_gol.csv"

Leitura do CSV usando rdd

In [2]:
rdd = spark.sparkContext.textFile(caminho_csv)

rdd_data = rdd.zipWithIndex().filter(lambda x: x[1] > 1).map(lambda x: x[0])

rdd_cleaned = rdd_data.map(lambda line: [field.replace('"', '') for field in line.split(";")])


Criando Cabeçalho e add no dataframe

In [3]:
df = (spark.read.option("header", "false").csv(caminho_csv))
# Obter a segunda linha como cabeçalho
novo_header = df.collect()[1]  
colunas = [str(c).replace('"', '').strip() for c in novo_header[0].split(";")]
df = rdd_cleaned.toDF(colunas)

Aplicando o Filtro

In [4]:
# Filtro
df_filtrado = df.filter(
    (df["EMPRESA_SIGLA"] == "GLO") &
    (df["GRUPO_DE_VOO"] == "REGULAR") &
    (df["NATUREZA"] == "DOMÉSTICA")
)
df_prep = df_filtrado.withColumn("MERCADO",
        concat(
            F.least(F.col("AEROPORTO_DE_ORIGEM_SIGLA"), F.col("AEROPORTO_DE_DESTINO_SIGLA")),
            F.greatest(F.col("AEROPORTO_DE_ORIGEM_SIGLA"), F.col("AEROPORTO_DE_DESTINO_SIGLA")))) \
    .select( "MES", "ANO", "RPK", "MERCADO") \
    .na.drop()

df_final = df_prep.withColumn(
    "MES",
    F.lpad(df_prep["MES"].cast("string"), 2, '0') 
)

df_prep.show()
df_final.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_final.columns]).show()


+---+----+-------+--------+
|MES| ANO|    RPK| MERCADO|
+---+----+-------+--------+
|  1|2001| 887040|SBBHSBSP|
|  1|2001| 743280|SBBHSBSV|
|  1|2001|1172660|SBBRSBGL|
|  1|2001|2845110|SBBRSBSP|
|  1|2001|1144680|SBBRSBSV|
|  1|2001|       |SBFLSBGL|
|  1|2001|1188820|SBFLSBPA|
|  1|2001|2937760|SBFLSBSP|
|  1|2001|1960530|SBBRSBGL|
|  1|2001|       |SBFLSBGL|
|  1|2001|       |SBGLSBPA|
|  1|2001|2606760|SBGLSBSP|
|  1|2001| 756378|SBGLSBSV|
|  1|2001|1264330|SBFLSBPA|
|  1|2001|       |SBGLSBPA|
|  1|2001|       |SBPASBSP|
|  1|2001| 859320|SBBHSBSP|
|  1|2001|2486300|SBBRSBSP|
|  1|2001|2892860|SBFLSBSP|
|  1|2001|3033360|SBGLSBSP|
+---+----+-------+--------+
only showing top 20 rows

+---+---+---+-------+
|MES|ANO|RPK|MERCADO|
+---+---+---+-------+
|  0|  0|  0|      0|
+---+---+---+-------+



Criando Filtro e o campo Mercado

In [5]:
df_filtrado = df.filter(
        (df["EMPRESA_SIGLA"] == "GLO") &
        (df["GRUPO_DE_VOO"] == "REGULAR") &
        (df["NATUREZA"] == "DOMÉSTICA")
    )
df_prep = df_filtrado.withColumn("MERCADO",
        concat(
            F.least(F.col("AEROPORTO_DE_ORIGEM_SIGLA"), F.col("AEROPORTO_DE_DESTINO_SIGLA")),
            F.greatest(F.col("AEROPORTO_DE_ORIGEM_SIGLA"), F.col("AEROPORTO_DE_DESTINO_SIGLA")))) \
    .select( "MES", "ANO", "RPK", "MERCADO")

df_final.show(truncate=False)

+---+----+-------+--------+
|MES|ANO |RPK    |MERCADO |
+---+----+-------+--------+
|01 |2001|887040 |SBBHSBSP|
|01 |2001|743280 |SBBHSBSV|
|01 |2001|1172660|SBBRSBGL|
|01 |2001|2845110|SBBRSBSP|
|01 |2001|1144680|SBBRSBSV|
|01 |2001|       |SBFLSBGL|
|01 |2001|1188820|SBFLSBPA|
|01 |2001|2937760|SBFLSBSP|
|01 |2001|1960530|SBBRSBGL|
|01 |2001|       |SBFLSBGL|
|01 |2001|       |SBGLSBPA|
|01 |2001|2606760|SBGLSBSP|
|01 |2001|756378 |SBGLSBSV|
|01 |2001|1264330|SBFLSBPA|
|01 |2001|       |SBGLSBPA|
|01 |2001|       |SBPASBSP|
|01 |2001|859320 |SBBHSBSP|
|01 |2001|2486300|SBBRSBSP|
|01 |2001|2892860|SBFLSBSP|
|01 |2001|3033360|SBGLSBSP|
+---+----+-------+--------+
only showing top 20 rows



In [6]:
df_DB = df_prep.withColumn("mes", col("mes").cast("int")) \
             .withColumn("ano", col("ano").cast("int")) \
             .withColumn("rpk", col("rpk").cast("double"))

df_DB = df_DB.na.drop()

Conectando com o PSQL

In [7]:
from pyspark.sql import SparkSession

jdbc_driver_path = "C:/Users/User/AppData/Local/Programs/Python/Python311/Lib/site-packages/pyspark/jars/postgresql-42.7.5.jar"

spark = SparkSession.builder \
    .appName("PostgreSQL Connection") \
    .config("spark.jars", jdbc_driver_path) \
    .config("spark.driver.extraClassPath", jdbc_driver_path) \
    .getOrCreate()

database_url = "jdbc:postgresql://localhost:5432/anac"


Escrevendo o PSQL

In [8]:
df_DB.write \
    .format("jdbc") \
    .option("url", database_url) \
    .option("dbtable", "voos") \
    .option("user", "root") \
    .option("password", "root") \
    .option("driver", "org.postgresql.Driver") \
    .mode("append") \
    .save()

Lendo o Banco PSQL

In [None]:
properties = {
    "user": "root",
    "password": "root",
    "driver": "org.postgresql.Driver"
}
# Lendo uma tabela
df_DB = spark.read.jdbc(url=database_url, 
          table="voos", 
          properties=properties)

df_DB.show()

+---+----+---+--------+---------+
| id| ano|mes| mercado|      rpk|
+---+----+---+--------+---------+
|  1|2012|  6|SBILSBSV| 549900.0|
|  2|2012|  6|SBIZSBBR|3975550.0|
|  3|2012|  6|SBIZSBSL|1452580.0|
|  4|2012|  6|SBJPSBAR|  40824.0|
|  5|2012|  6|SBJPSBBR|6005780.0|
|  6|2012|  6|SBJPSBGL|1.35814E7|
|  7|2012|  6|SBJPSBGR|      0.0|
|  8|2012|  6|SBJPSBSV|2562850.0|
|  9|2012|  6|SBJUSBFZ| 868411.0|
| 10|2012|  6|SBJUSBRF|1253910.0|
| 11|2012|  6|SBJVSBKP|      0.0|
| 12|2012|  6|SBJVSBSP|1388040.0|
| 13|2012|  6|SBKGSBSV|3405680.0|
| 14|2012|  6|SBKPSBBR|6066400.0|
| 15|2012|  6|SBKPSBCF|4226530.0|
| 16|2012|  6|SBKPSBCT|1379600.0|
| 17|2012|  6|SBKPSBGL|5765030.0|
| 18|2012|  6|SBKPSBGR|  38678.0|
| 19|2012|  6|SBKPSBPA| 328624.0|
| 20|2012|  6|SBKPSBSP|  21924.0|
+---+----+---+--------+---------+
only showing top 20 rows

