In [None]:
%idle_timeout 30
%glue_version 3.0
%worker_type G.1X
%number_of_workers 2

import boto3
import sys
import re
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import Row
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)


## caminho do arquivo
s3_path = "s3://micro-dados-sp-outros/stage/2010/Amostra_Pessoas_35_RMSP.txt"

### Leitura do arquivo do S3

In [None]:
arquivo = spark.read.text(s3_path).rdd

## Definição da função de remover espaços

In [None]:
def remover_espacos(linha):
    
    linha = linha["value"].strip()
    linha = re.sub(r'\s+', '', linha)
    
    return Row(linha=linha)

In [None]:
linhas_ajustadas = arquivo.map(remover_espacos)

In [None]:
schema = StructType([StructField("linha", StringType(), True)])

df_linhas_ajustadas = spark.createDataFrame(linhas_ajustadas, schema)

### Definição da lista de variaveis, conforme o dicionario de dados do IBGE

In [None]:
# Informações conforme os dados disponibilizados pelo IBGE
# ('Nome da Coluna', inicio, deslocamento)

columns = [
    ('V0001', 1, 2),
    ('V0002', 3, 7),
    ('V0011', 8, 20),
    ('V0300', 21, 28),
    ('V0010', 29, 44),
    ('V1001', 45, 45),
    ('V1002', 46, 47),
    ('V1003', 48, 50),
    ('V1004', 51, 52),
    ('V1006', 53, 53),
    ('V0502', 54, 55),
    ('V0504', 56, 57),
    ('V0601', 58, 58),
    ('V6033', 59, 61),
    ('V6036', 62, 64),
    ('V6037', 65, 66),
    ('V6040', 67, 67),
    ('V0606', 68, 68),
    ('V0613', 69, 69),
    ('V0614', 70, 70),
    ('V0615', 71, 71),
    ('V0616', 72, 72),
    ('V0617', 73, 73),
    ('V0618', 74, 74),
    ('V0619', 75, 75),
    ('V0620', 76, 76),
    ('V0621', 77, 80),
    ('V0622', 81, 81),
    ('V6222', 82, 88),
    ('V6224', 89, 95),
    ('V0623', 96, 98),
    ('V0624', 99, 101),
    ('V0625', 102, 102),
    ('V6252', 103, 109),
    ('V6254', 110, 116),
    ('V6256', 117, 123),
    ('V0626', 124, 124),
    ('V6262', 125, 131),
    ('V6264', 132, 138),
    ('V6266', 139, 145),
    ('V0627', 146, 146),
    ('V0628', 147, 147),
    ('V0629', 148, 149),
    ('V0630', 150, 151),
    ('V0631', 152, 152),
    ('V0632', 153, 153),
    ('V0633', 154, 155),
    ('V0634', 156, 156),
    ('V0635', 157, 157),
    ('V6400', 158, 158),
    ('V6352', 159, 161),
    ('V6354', 162, 164),
    ('V6356', 165, 167),
    ('V0636', 168, 168),
    ('V6362', 169, 175),
    ('V6364', 176, 182),
    ('V6366', 183, 189),
    ('V0637', 190, 190),
    ('V0638', 191, 192),
    ('V0639', 193, 193),
    ('V0640', 194, 194),
    ('V0641', 195, 195),
    ('V0642', 196, 196),
    ('V0643', 197, 197),
    ('V0644', 198, 198),
    ('V0645', 199, 199),
    ('V6461', 200, 203),
    ('V6471', 204, 208),
    ('V0648', 209, 209),
    ('V0649', 210, 210),
    ('V0650', 211, 211),
    ('V0651', 212, 212),
    ('V6511', 213, 218),
    ('V6513', 219, 224),
    ('V6514', 225, 230),
    ('V0652', 231, 231),
    ('V6521', 232, 237),
    ('V6524', 238, 246),
    ('V6525', 247, 253),
    ('V6526', 254, 262),
    ('V6527', 263, 269),
    ('V6528', 270, 278),
    ('V6529', 279, 285),
    ('V6530', 286, 295),
    ('V6531', 296, 303),
    ('V6532', 304, 312),
    ('V0653', 313, 315),
    ('V0654', 316, 316),
    ('V0655', 317, 317),
    ('V0656', 318, 318),
    ('V0657', 319, 319),
    ('V0658', 320, 320),
    ('V0659', 321, 321),
    ('V6591', 322, 327),
    ('V0660', 328, 328),
    ('V6602', 329, 335),
    ('V6604', 336, 342),
    ('V6606', 343, 349),
    ('V0661', 350, 350),
    ('V0662', 351, 351),
    ('V0663', 352, 352),
    ('V6631', 353, 354),
    ('V6632', 355, 356),
    ('V6633', 357, 358),
    ('V0664', 359, 359),
    ('V6641', 360, 361),
    ('V6642', 362, 363),
    ('V6643', 364, 365),
    ('V0665', 366, 366),
    ('V6660', 367, 369),
    ('V6664', 370, 370),
    ('V0667', 371, 371),
    ('V0668', 372, 372),
    ('V6681', 373, 374),
    ('V6682', 375, 378),
    ('V0669', 379, 379),
    ('V6691', 380, 381),
    ('V6692', 382, 383),
    ('V6693', 384, 385),
    ('V6800', 386, 387),
    ('V0670', 388, 388),
    ('V0671', 389, 390),
    ('V6900', 391, 391),
    ('V6910', 392, 392),
    ('V6920', 393, 393),
    ('V6930', 394, 394),
    ('V6940', 395, 395),
    ('V1005', 172, 172)
]

### Aplicação da substring nas colunas

In [None]:
def extract_variable(data, start, end):
    return data[start-1:end]

def create_udf(start, end):
    return udf(lambda data: extract_variable(data, start, end), StringType())

In [None]:
for var, start, end in columns:
    df_linhas_ajustadas = df_linhas_ajustadas.withColumn(var, create_udf(start, end)(df_linhas_ajustadas["linha"]))

In [None]:
df_linhas_ajustadas = df_linhas_ajustadas.select([col(column).cast('int') for column in df_linhas_ajustadas.columns])

### Removendo coluna inicial

In [None]:
raw_data = df_linhas_ajustadas.drop('linha')

In [None]:
raw_data_dynamic_frame = DynamicFrame.fromDF(raw_data, glueContext, "raw_data_dynamic_frame")

### Salvando os dados na camada bronze

In [None]:
# Configuração do destino S3
s3_parquet = glueContext.getSink(
    path="s3://micro-dados-sp-outros/bronze/2010/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["V0002"],
    compression="gzip",
    enableUpdateCatalog=True,
    transformation_ctx="s3_parquet",
)

# Definindo informações do catálogo
s3_parquet.setCatalogInfo(
    catalogDatabase="ibge", catalogTableName="micro_dados_sp_outros"
)

# Definindo o formato dos dados
s3_parquet.setFormat("glueparquet")

# Escrevendo os dados
s3_parquet.writeFrame(raw_data_dynamic_frame)
