### Opcao Lambda com AWS Wrangler

In [10]:
import awswrangler as wr

# Define o dtype do DataFrame
dtype = {
    "year": "int",
    "industry_code": "string",
    "industry_name": "string",
    "variable": "string"
}

# Lê o CSV do S3 e cria um DataFrame com o dtype definido
df = wr.s3.read_csv(f"s3://371716203543-files/origin/base.csv", dtype=dtype)

# Escreve o DataFrame no formato Parquet no S3
wr.s3.to_parquet(
    df=df,
    path=f"s3://371716203543-files/target/",
    filename="base.parquet",
    database=database,
    table=table,
    # Sugestão de melhoria: incluir opção "partition_cols" para particionar o Parquet por ano e variável
    partition_cols=["year", "industry_code"],
    mode="overwrite"
)

{'paths': ['s3://371716203543-files/target/3b13264c5b9c4e819df9474498dd353b.snappy.parquet'],
 'partitions_values': {}}

### Opcao Glue com PySpark

In [None]:
import json
import os
import sys

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Inicializa o SparkContext e o GlueContext
print('INIT')
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

print('GLUE CONTEXT LOADED...')

# Define o schema do DataFrame
schema = StructType (
    [
        StructField ('year', IntegerType(), True),
        StructField ('industry_code', StringType(), True),
        StructField ('industry_name', StringType(), True),
        StructField ('variable', StringType(), True)
    ])

# Lê o CSV do S3 e cria um DataFrame
df = spark.read.option('mode', 'DROPMALFORMED').option("charset", "UTF-8").csv("s3://371716203543-files/origin/", schema=schema)

# Converte o DataFrame em um DynamicFrame do Glue
dynamicFrame = DynamicFrame.fromDF(df, glueContext, "nest")

# Exibe o schema e as primeiras linhas do DynamicFrame
dynamicFrame.printSchema()
dynamicFrame.toDF().show(5)

# Define opções adicionais para escrita no Glue Catalog
additionalOptions = {"enableUpdateCatalog": True}

# Escreve o DynamicFrame no Glue Catalog como tabela "dump_table" no database "dump_base"
sink = glueContext.write_dynamic_frame_from_catalog(frame=dynamicFrame, database="dump_base",
                                                    table_name="dump_table", transformation_ctx="write_sink",
                                                    additional_options=additionalOptions)
