In [1]:
import pyspark
from pyspark.sql import SparkSession
import os

AWS_ACCESS_KEY = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
S3_ENDPOINT = os.getenv("S3_ENDPOINT")
NESSIE_URI = os.getenv("NESSIE_URI")

MASTER = "spark://spark-master:7077"

jar_packages = [
    "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1",
    "org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.99.0",
    #"software.amazon.awssdk:bundle:2.28.13",
    #"software.amazon.awssdk:url-connection-client:2.28.13",
    "org.apache.iceberg:iceberg-aws-bundle:1.6.1"
  ]

spark_extensions = [
    "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    "org.projectnessie.spark.extensions.NessieSparkSessionExtensions"
]
conf = (
    pyspark.SparkConf()
    .setAppName("Iceberg")
    .set("spark.master", MASTER)
    .set("spark.jars.packages", ','.join(jar_packages))
    .set("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
    .set("spark.executor.memory", "2g")
    .set("spark.executor.cores", "2")
    .set("spark.executor.instances", "1")
    .set("spark.driver.memory", "2g")
    .set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.nessie.s3.path-style-access", "true")
    .set("spark.sql.catalog.nessie.s3.endpoint", S3_ENDPOINT)
    .set("spark.sql.catalog.nessie.warehouse", "s3a://bronze/")
    .set("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
    .set("spark.sql.catalog.nessie.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.nessie.uri", NESSIE_URI)
    .set("spark.sql.catalog.nessie.ref", "main")
    .set("spark.sql.catalog.nessie.authentication.type", "NONE")
    .set("spark.sql.catalog.nessie.cache-enabled", "false")
    .set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY)
    .set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY)
    .set("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT)
    .set("spark.hadoop.fs.s3a.path.style.access", "true")
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
)

spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
print(spark)

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-13ead859-9399-46a3-a10b-df4738610ab6;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.1 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.99.0 in central
	found org.apache.iceberg#iceberg-aws-bundle;1.6.1 in central
:: resolution report :: resolve 597ms :: artifacts dl 24ms
	:: modules in use:
	org.apache.iceberg#iceberg-aws-bundle;1.6.1 from central in [default]
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.6.1 from central in [default]
	org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.99.0 from cen

<pyspark.sql.session.SparkSession object at 0x7f983ea9a840>


In [2]:
spark.sql("SHOW CATALOGS").show()

+-------------+
|      catalog|
+-------------+
|spark_catalog|
+-------------+



In [3]:
enem_2019 = spark.read.csv("s3a://landing/MICRODADOS_ENEM_2019.csv", header=True, inferSchema=True, sep=";")
enem_2020 = spark.read.csv("s3a://landing/MICRODADOS_ENEM_2020.csv", header=True, inferSchema=True, sep=";")
enem_2021 = spark.read.csv("s3a://landing/MICRODADOS_ENEM_2021.csv", header=True, inferSchema=True, sep=";")
enem_2022 = spark.read.csv("s3a://landing/MICRODADOS_ENEM_2022.csv", header=True, inferSchema=True, sep=";")
enem_2023 = spark.read.csv("s3a://landing/MICRODADOS_ENEM_2023.csv", header=True, inferSchema=True, sep=";")

                                                                                

In [4]:
spark.sql("""
CREATE NAMESPACE IF NOT EXISTS nessie.bronze
""")


DataFrame[]

In [17]:
spark.sql("""
CREATE OR REPLACE TABLE nessie.bronze.enem_2019 (
    NU_INSCRICAO STRING,
    NU_ANO INT,
    TP_FAIXA_ETARIA INT,
    TP_SEXO STRING,
    TP_ESTADO_CIVIL STRING,
    TP_COR_RACA INT,
    TP_NACIONALIDADE INT,
    TP_ST_CONCLUSAO INT,
    TP_ANO_CONCLUIU INT,
    TP_ESCOLA INT,
    TP_ENSINO INT,
    IN_TREINEIRO INT,
    CO_MUNICIPIO_ESC INT,
    NO_MUNICIPIO_ESC STRING,
    CO_UF_ESC INT,
    SG_UF_ESC STRING,
    TP_DEPENDENCIA_ADM_ESC INT,
    TP_LOCALIZACAO_ESC INT,
    TP_SIT_FUNC_ESC INT,
    CO_MUNICIPIO_PROVA INT,
    NO_MUNICIPIO_PROVA STRING,
    CO_UF_PROVA INT,
    SG_UF_PROVA STRING,
    TP_PRESENCA_CN INT,
    TP_PRESENCA_CH INT,
    TP_PRESENCA_LC INT,
    TP_PRESENCA_MT INT,
    CO_PROVA_CN INT,
    CO_PROVA_CH INT,
    CO_PROVA_LC INT,
    CO_PROVA_MT INT,
    NU_NOTA_CN DOUBLE,
    NU_NOTA_CH DOUBLE,
    NU_NOTA_LC DOUBLE,
    NU_NOTA_MT DOUBLE,
    TX_RESPOSTAS_CN STRING,
    TX_RESPOSTAS_CH STRING,
    TX_RESPOSTAS_LC STRING,
    TX_RESPOSTAS_MT STRING,
    TP_LINGUA INT,
    TX_GABARITO_CN STRING,
    TX_GABARITO_CH STRING,
    TX_GABARITO_LC STRING,
    TX_GABARITO_MT STRING,
    TP_STATUS_REDACAO INT,
    NU_NOTA_COMP1 DOUBLE,
    NU_NOTA_COMP2 DOUBLE,
    NU_NOTA_COMP3 DOUBLE,
    NU_NOTA_COMP4 DOUBLE,
    NU_NOTA_COMP5 DOUBLE,
    NU_NOTA_REDACAO DOUBLE,
    Q001 STRING,
    Q002 STRING,
    Q003 STRING,
    Q004 STRING,
    Q005 STRING,
    Q006 STRING,
    Q007 STRING,
    Q008 STRING,
    Q009 STRING,
    Q010 STRING,
    Q011 STRING,
    Q012 STRING,
    Q013 STRING,
    Q014 STRING,
    Q015 STRING,
    Q016 STRING,
    Q017 STRING,
    Q018 STRING,
    Q019 STRING,
    Q020 STRING,
    Q021 STRING,
    Q022 STRING,
    Q023 STRING,
    Q024 STRING,
    Q025 STRING
)
USING iceberg
LOCATION 's3a://bronze/enem_2019/'
""")

DataFrame[]

In [16]:
spark.sql("""
CREATE OR REPLACE TABLE nessie.bronze.enem_2020 (
    NU_INSCRICAO STRING,
    NU_ANO INT,
    TP_FAIXA_ETARIA INT,
    TP_SEXO STRING,
    TP_ESTADO_CIVIL STRING,
    TP_COR_RACA INT,
    TP_NACIONALIDADE INT,
    TP_ST_CONCLUSAO INT,
    TP_ANO_CONCLUIU INT,
    TP_ESCOLA INT,
    TP_ENSINO INT,
    IN_TREINEIRO INT,
    CO_MUNICIPIO_ESC INT,
    NO_MUNICIPIO_ESC STRING,
    CO_UF_ESC INT,
    SG_UF_ESC STRING,
    TP_DEPENDENCIA_ADM_ESC INT,
    TP_LOCALIZACAO_ESC INT,
    TP_SIT_FUNC_ESC INT,
    CO_MUNICIPIO_PROVA INT,
    NO_MUNICIPIO_PROVA STRING,
    CO_UF_PROVA INT,
    SG_UF_PROVA STRING,
    TP_PRESENCA_CN INT,
    TP_PRESENCA_CH INT,
    TP_PRESENCA_LC INT,
    TP_PRESENCA_MT INT,
    CO_PROVA_CN INT,
    CO_PROVA_CH INT,
    CO_PROVA_LC INT,
    CO_PROVA_MT INT,
    NU_NOTA_CN DOUBLE,
    NU_NOTA_CH DOUBLE,
    NU_NOTA_LC DOUBLE,
    NU_NOTA_MT DOUBLE,
    TX_RESPOSTAS_CN STRING,
    TX_RESPOSTAS_CH STRING,
    TX_RESPOSTAS_LC STRING,
    TX_RESPOSTAS_MT STRING,
    TP_LINGUA INT,
    TX_GABARITO_CN STRING,
    TX_GABARITO_CH STRING,
    TX_GABARITO_LC STRING,
    TX_GABARITO_MT STRING,
    TP_STATUS_REDACAO INT,
    NU_NOTA_COMP1 DOUBLE,
    NU_NOTA_COMP2 DOUBLE,
    NU_NOTA_COMP3 DOUBLE,
    NU_NOTA_COMP4 DOUBLE,
    NU_NOTA_COMP5 DOUBLE,
    NU_NOTA_REDACAO DOUBLE,
    Q001 STRING,
    Q002 STRING,
    Q003 STRING,
    Q004 STRING,
    Q005 STRING,
    Q006 STRING,
    Q007 STRING,
    Q008 STRING,
    Q009 STRING,
    Q010 STRING,
    Q011 STRING,
    Q012 STRING,
    Q013 STRING,
    Q014 STRING,
    Q015 STRING,
    Q016 STRING,
    Q017 STRING,
    Q018 STRING,
    Q019 STRING,
    Q020 STRING,
    Q021 STRING,
    Q022 STRING,
    Q023 STRING,
    Q024 STRING,
    Q025 STRING
)
USING iceberg
LOCATION 's3a://bronze/enem_2020/'
""")

DataFrame[]

In [15]:
spark.sql("""
CREATE OR REPLACE TABLE nessie.bronze.enem_2021 (
    NU_INSCRICAO STRING,
    NU_ANO INT,
    TP_FAIXA_ETARIA INT,
    TP_SEXO STRING,
    TP_ESTADO_CIVIL STRING,
    TP_COR_RACA INT,
    TP_NACIONALIDADE INT,
    TP_ST_CONCLUSAO INT,
    TP_ANO_CONCLUIU INT,
    TP_ESCOLA INT,
    TP_ENSINO INT,
    IN_TREINEIRO INT,
    CO_MUNICIPIO_ESC INT,
    NO_MUNICIPIO_ESC STRING,
    CO_UF_ESC INT,
    SG_UF_ESC STRING,
    TP_DEPENDENCIA_ADM_ESC INT,
    TP_LOCALIZACAO_ESC INT,
    TP_SIT_FUNC_ESC INT,
    CO_MUNICIPIO_PROVA INT,
    NO_MUNICIPIO_PROVA STRING,
    CO_UF_PROVA INT,
    SG_UF_PROVA STRING,
    TP_PRESENCA_CN INT,
    TP_PRESENCA_CH INT,
    TP_PRESENCA_LC INT,
    TP_PRESENCA_MT INT,
    CO_PROVA_CN INT,
    CO_PROVA_CH INT,
    CO_PROVA_LC INT,
    CO_PROVA_MT INT,
    NU_NOTA_CN DOUBLE,
    NU_NOTA_CH DOUBLE,
    NU_NOTA_LC DOUBLE,
    NU_NOTA_MT DOUBLE,
    TX_RESPOSTAS_CN STRING,
    TX_RESPOSTAS_CH STRING,
    TX_RESPOSTAS_LC STRING,
    TX_RESPOSTAS_MT STRING,
    TP_LINGUA INT,
    TX_GABARITO_CN STRING,
    TX_GABARITO_CH STRING,
    TX_GABARITO_LC STRING,
    TX_GABARITO_MT STRING,
    TP_STATUS_REDACAO INT,
    NU_NOTA_COMP1 DOUBLE,
    NU_NOTA_COMP2 DOUBLE,
    NU_NOTA_COMP3 DOUBLE,
    NU_NOTA_COMP4 DOUBLE,
    NU_NOTA_COMP5 DOUBLE,
    NU_NOTA_REDACAO DOUBLE,
    Q001 STRING,
    Q002 STRING,
    Q003 STRING,
    Q004 STRING,
    Q005 STRING,
    Q006 STRING,
    Q007 STRING,
    Q008 STRING,
    Q009 STRING,
    Q010 STRING,
    Q011 STRING,
    Q012 STRING,
    Q013 STRING,
    Q014 STRING,
    Q015 STRING,
    Q016 STRING,
    Q017 STRING,
    Q018 STRING,
    Q019 STRING,
    Q020 STRING,
    Q021 STRING,
    Q022 STRING,
    Q023 STRING,
    Q024 STRING,
    Q025 STRING
)
USING iceberg
LOCATION 's3a://bronze/enem_2021/'
""")

DataFrame[]

In [14]:
spark.sql("""
CREATE OR REPLACE TABLE nessie.bronze.enem_2022 (
    NU_INSCRICAO STRING,
    NU_ANO INT,
    TP_FAIXA_ETARIA INT,
    TP_SEXO STRING,
    TP_ESTADO_CIVIL STRING,
    TP_COR_RACA INT,
    TP_NACIONALIDADE INT,
    TP_ST_CONCLUSAO INT,
    TP_ANO_CONCLUIU INT,
    TP_ESCOLA INT,
    TP_ENSINO INT,
    IN_TREINEIRO INT,
    CO_MUNICIPIO_ESC INT,
    NO_MUNICIPIO_ESC STRING,
    CO_UF_ESC INT,
    SG_UF_ESC STRING,
    TP_DEPENDENCIA_ADM_ESC INT,
    TP_LOCALIZACAO_ESC INT,
    TP_SIT_FUNC_ESC INT,
    CO_MUNICIPIO_PROVA INT,
    NO_MUNICIPIO_PROVA STRING,
    CO_UF_PROVA INT,
    SG_UF_PROVA STRING,
    TP_PRESENCA_CN INT,
    TP_PRESENCA_CH INT,
    TP_PRESENCA_LC INT,
    TP_PRESENCA_MT INT,
    CO_PROVA_CN INT,
    CO_PROVA_CH INT,
    CO_PROVA_LC INT,
    CO_PROVA_MT INT,
    NU_NOTA_CN DOUBLE,
    NU_NOTA_CH DOUBLE,
    NU_NOTA_LC DOUBLE,
    NU_NOTA_MT DOUBLE,
    TX_RESPOSTAS_CN STRING,
    TX_RESPOSTAS_CH STRING,
    TX_RESPOSTAS_LC STRING,
    TX_RESPOSTAS_MT STRING,
    TP_LINGUA INT,
    TX_GABARITO_CN STRING,
    TX_GABARITO_CH STRING,
    TX_GABARITO_LC STRING,
    TX_GABARITO_MT STRING,
    TP_STATUS_REDACAO INT,
    NU_NOTA_COMP1 DOUBLE,
    NU_NOTA_COMP2 DOUBLE,
    NU_NOTA_COMP3 DOUBLE,
    NU_NOTA_COMP4 DOUBLE,
    NU_NOTA_COMP5 DOUBLE,
    NU_NOTA_REDACAO DOUBLE,
    Q001 STRING,
    Q002 STRING,
    Q003 STRING,
    Q004 STRING,
    Q005 STRING,
    Q006 STRING,
    Q007 STRING,
    Q008 STRING,
    Q009 STRING,
    Q010 STRING,
    Q011 STRING,
    Q012 STRING,
    Q013 STRING,
    Q014 STRING,
    Q015 STRING,
    Q016 STRING,
    Q017 STRING,
    Q018 STRING,
    Q019 STRING,
    Q020 STRING,
    Q021 STRING,
    Q022 STRING,
    Q023 STRING,
    Q024 STRING,
    Q025 STRING
)
USING iceberg
LOCATION 's3a://bronze/enem_2022/'
""")

DataFrame[]

In [13]:
spark.sql("""
CREATE OR REPLACE TABLE nessie.bronze.enem_2023 (
    NU_INSCRICAO STRING,
    NU_ANO INT,
    TP_FAIXA_ETARIA INT,
    TP_SEXO STRING,
    TP_ESTADO_CIVIL STRING,
    TP_COR_RACA INT,
    TP_NACIONALIDADE INT,
    TP_ST_CONCLUSAO INT,
    TP_ANO_CONCLUIU INT,
    TP_ESCOLA INT,
    TP_ENSINO INT,
    IN_TREINEIRO INT,
    CO_MUNICIPIO_ESC INT,
    NO_MUNICIPIO_ESC STRING,
    CO_UF_ESC INT,
    SG_UF_ESC STRING,
    TP_DEPENDENCIA_ADM_ESC INT,
    TP_LOCALIZACAO_ESC INT,
    TP_SIT_FUNC_ESC INT,
    CO_MUNICIPIO_PROVA INT,
    NO_MUNICIPIO_PROVA STRING,
    CO_UF_PROVA INT,
    SG_UF_PROVA STRING,
    TP_PRESENCA_CN INT,
    TP_PRESENCA_CH INT,
    TP_PRESENCA_LC INT,
    TP_PRESENCA_MT INT,
    CO_PROVA_CN INT,
    CO_PROVA_CH INT,
    CO_PROVA_LC INT,
    CO_PROVA_MT INT,
    NU_NOTA_CN DOUBLE,
    NU_NOTA_CH DOUBLE,
    NU_NOTA_LC DOUBLE,
    NU_NOTA_MT DOUBLE,
    TX_RESPOSTAS_CN STRING,
    TX_RESPOSTAS_CH STRING,
    TX_RESPOSTAS_LC STRING,
    TX_RESPOSTAS_MT STRING,
    TP_LINGUA INT,
    TX_GABARITO_CN STRING,
    TX_GABARITO_CH STRING,
    TX_GABARITO_LC STRING,
    TX_GABARITO_MT STRING,
    TP_STATUS_REDACAO INT,
    NU_NOTA_COMP1 DOUBLE,
    NU_NOTA_COMP2 DOUBLE,
    NU_NOTA_COMP3 DOUBLE,
    NU_NOTA_COMP4 DOUBLE,
    NU_NOTA_COMP5 DOUBLE,
    NU_NOTA_REDACAO DOUBLE,
    Q001 STRING,
    Q002 STRING,
    Q003 STRING,
    Q004 STRING,
    Q005 STRING,
    Q006 STRING,
    Q007 STRING,
    Q008 STRING,
    Q009 STRING,
    Q010 STRING,
    Q011 STRING,
    Q012 STRING,
    Q013 STRING,
    Q014 STRING,
    Q015 STRING,
    Q016 STRING,
    Q017 STRING,
    Q018 STRING,
    Q019 STRING,
    Q020 STRING,
    Q021 STRING,
    Q022 STRING,
    Q023 STRING,
    Q024 STRING,
    Q025 STRING
)
USING iceberg
LOCATION 's3a://bronze/enem_2023/'
""")

DataFrame[]

In [18]:
spark.sql("SHOW CATALOGS").show()

+-------------+
|      catalog|
+-------------+
|       nessie|
|spark_catalog|
+-------------+



In [19]:
spark.sql("SHOW TABLES IN nessie.bronze").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   bronze|enem_2019|      false|
|   bronze|enem_2020|      false|
|   bronze|enem_2021|      false|
|   bronze|enem_2022|      false|
|   bronze|enem_2023|      false|
+---------+---------+-----------+



In [20]:
enem_2019.write.format("iceberg").mode("overwrite").save("nessie.bronze.enem_2019")
enem_2020.write.format("iceberg").mode("overwrite").save("nessie.bronze.enem_2020")
enem_2021.write.format("iceberg").mode("overwrite").save("nessie.bronze.enem_2021")
enem_2022.write.format("iceberg").mode("overwrite").save("nessie.bronze.enem_2022")
enem_2023.write.format("iceberg").mode("overwrite").save("nessie.bronze.enem_2023")

                                                                                

In [21]:
spark.sql("SELECT * FROM nessie.bronze.enem_2023").show()

[Stage 15:>                                                         (0 + 1) / 1]

+------------+------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+----------------+---------+---------+----------------------+------------------+---------------+------------------+--------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|NU_INSCRICAO|NU_ANO|TP_FAIXA_ETARIA|TP_SEXO|TP_ESTADO_CIVIL|TP_COR_RACA|TP_NACIONALIDADE|TP_ST_CONCLUSAO|TP_

                                                                                

In [23]:
spark.sql("SELECT count(*) FROM nessie.bronze.enem_2023").show()

[Stage 16:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
| 3933955|
+--------+



                                                                                