# RAIS Centro-Oeste

Arquivo para divisão do dataset da RAIS Centro Oeste, conforme carregada pela DIEPS, em vários arquivos parquet com no máximo 1.000.000 de registros cada. 

In [None]:
import pyarrow.parquet as pq
import pyarrow as pa
import s3fs
import pandas as pd

In [None]:
ACCESS_KEY = ''
SECRET_KEY = ''
SOURCE_FILE = 'raw/DIEPS/bases/rais/RAIS_VINC_PUB_CENTRO_OESTE_2024.txt'
MINIO_HOST = 'http://minio:9000'
PANDAS_CHUNK_SIZE = 200000 # 200.000 linhas por chunk
PANDAS_NROWS = 1000000
PANDAS_OFFSET = 6000000
ANO = 2024

TARGET_FILE = f'intermediate/DIEPS/bases/rais/{ANO}/rais-centro-oeste-{PANDAS_OFFSET}-{(PANDAS_OFFSET + PANDAS_NROWS) - 1}.parquet'

# Metadados
RESPONSAVEL_NOME = 'matheus.fernandes'
RESPONSAVEL_EMAIL = 'matheus.fernandes@ipe.df.gov.br'
RESPONSAVEL_SETOR = 'UCTIS/COGEI/GEINO'
PROCESSED = 'False'

In [None]:
minio = s3fs.S3FileSystem(key=ACCESS_KEY, # change default key 
                        secret=SECRET_KEY, # change default secret
                        use_ssl=False,
                        client_kwargs={
                            'endpoint_url': MINIO_HOST} # change ip
                        )

# Verifiacação da conexão 
objects = minio.ls('raw/DIEPS/bases/rais')
objects

In [None]:


# Read CSV in chunks
chunks = []
i = 0
for chunk in pd.read_csv(
    f"s3://{SOURCE_FILE}",
    storage_options={
        "key": ACCESS_KEY,
        "secret": SECRET_KEY,
        "client_kwargs": {
            "endpoint_url": MINIO_HOST
        }
    },
    chunksize=PANDAS_CHUNK_SIZE,
    encoding='latin-1',
    sep=';',
    nrows=PANDAS_NROWS,
    skiprows=range(1, PANDAS_OFFSET)
):
    chunks.append(chunk)  # Store each chunk in a list
#
## Combine all chunks into a single DataFrame (if needed)
df = pd.concat(chunks, ignore_index=True)

In [None]:
# Decodifica
df.info(verbose=True)

In [None]:
df

In [None]:
# Converte colunas problemáticas

df['Mês Desligamento'] = df['Mês Desligamento'].astype('string')
df['Nacionalidade'] = df['Nacionalidade'].astype('string')


In [None]:
minio_url = f's3://{TARGET_FILE}'

pq.write_table(
    pa.Table.from_pandas(df),
    minio_url,
    filesystem=minio,
    use_dictionary=True,
    compression='snappy',
    version='2.6',
)

In [None]:
tags = {
    'User': RESPONSAVEL_NOME,
    'Department': RESPONSAVEL_SETOR,
    'Email': RESPONSAVEL_EMAIL,
    'Processed': PROCESSED,
}
parquet_partitions = minio.ls(TARGET_FILE)
# parquet_partitions
for p in parquet_partitions:
    minio.put_tags(p, tags, mode='o')

In [None]:
# Libera a memória do dataframe
del df