# Import libraries

In [23]:
import pandas as pd
from bs4 import BeautifulSoup
import requests
import zipfile
import re
import io

# Functions

In [24]:
def get_response(url):

    response = requests.get(url)
    response.raise_for_status()

    return response

In [25]:
def get_compiled_pattern(response, pattern):

    soup = BeautifulSoup(response.text, "html.parser")

    return re.compile(pattern), soup

In [26]:
def get_last_folder(url):

    response = get_response(url)

    pattern, soup = get_compiled_pattern(response, r"^\d{4}-\d{2}/$")

    folders = [a["href"].strip("/") for a in soup.find_all("a", href=pattern)]
    last_folder = sorted(folders)[-1]

    print("Última pasta encontrada:", last_folder)

    return last_folder

In [None]:
def get_source_data(
    endpoint,
    key,
    file_columns,
    file_dtypes
):

    last_updated_folder = get_last_folder(endpoint)

    response_folder = get_response(endpoint + last_updated_folder + "/")

    key_pattern, soup_folder = get_compiled_pattern(response_folder, r"^{key}\d+\.zip$".format(key=key))

    files = [a["href"] for a in soup_folder.find_all("a", href=key_pattern)]
    first_file = sorted(files, key=lambda x: int(re.search(r"(\d+)", x).group()))[1]
    print("Primeiro arquivo encontrado:", first_file)

    last_file = get_response(endpoint + last_updated_folder + "/" + first_file)

    z = zipfile.ZipFile(io.BytesIO(last_file.content))

    csv_name = z.namelist()[0]
    print("Arquivo dentro do ZIP:", csv_name)

    df = pd.read_csv(
        z.open(csv_name),
        sep=";",
        encoding="latin1",
        usecols=list(range(len(file_columns))),
        names=file_columns,
        dtype=file_dtypes,
        header=None,
        low_memory=False
    )

    print("Total de linhas:", len(df))

    return df    

In [28]:
def check_for_duplicates(df):

    duplicates = df.duplicated(keep=False).sum()

    if duplicates > 0:
        print(f"Existem {duplicates} registros duplicados")
    else:
        print("Não existem CNPJs duplicados")

# Read Data

In [29]:
endpoint = "https://arquivos.receitafederal.gov.br/dados/cnpj/dados_abertos_cnpj/"

## Empresas

In [30]:
def get_empresas(endpoint):

    company_columns = [
        "cnpj",
        "razao_social",
        "natureza_juridica",
        "qualificacao_responsavel",
        "capital_social",
        "cod_porte"
    ]

    company_dtypes = {
        "cnpj": "string",
        "razao_social": "string",
        "natureza_juridica": "Int64", 
        "qualificacao_responsavel": "Int64",
        "capital_social": "string",
        "cod_porte": "string"
    }

    df_empresas = (
        get_source_data(
            endpoint,
            "Empresas",
            company_columns,
            company_dtypes
        )
    )

    return df_empresas


df_empresas = get_empresas(endpoint)


Última pasta encontrada: 2025-09
Primeiro arquivo encontrado: Empresas1.zip
Arquivo dentro do ZIP: K3241.K03200Y1.D50913.EMPRECSV
Total de linhas: 4494860


## Sócios

In [31]:
def get_socios(endpoint):

    partners_columns = [
        "cnpj",
        "tipo_socio",
        "nome_socio",
        "documento_socio",
        "codigo_qualificacao_socio"
    ]

    partners_dtypes = {
        "cnpj": "string",
        "tipo_socio": "Int64",
        "nome_socio": "string", 
        "documento_socio": "string",
        "codigo_qualificacao_socio": "string"
    }

    df_socios = (
        get_source_data(
            endpoint,
            "Socios",
            partners_columns,
            partners_dtypes
        )
    )

    return df_socios


df_socios = get_socios(endpoint)

Última pasta encontrada: 2025-09
Primeiro arquivo encontrado: Socios1.zip
Arquivo dentro do ZIP: K3241.K03200Y1.D50913.SOCIOCSV
Total de linhas: 2019150


# Transform

## Empresas

In [32]:
# Drop Duplicates
df_empresas.drop_duplicates(inplace=True)

check_for_duplicates(df_empresas)

Não existem CNPJs duplicados


In [33]:
# Alterar campo capita_social para float
df_empresas["capital_social"] = (
    df_empresas["capital_social"]
    .str.replace(".", "", regex=False)   # remove pontos
    .str.replace(",", ".", regex=False)  # troca vírgula por ponto
    .astype(float)                       # converte para float
)

## Sócios

In [34]:
# Drop Duplicates
df_socios.drop_duplicates(inplace=True)

check_for_duplicates(df_socios)

Não existem CNPJs duplicados


In [35]:
# Check Null values for nome_socio
df_socios[df_socios["nome_socio"].isna()]

Unnamed: 0,cnpj,tipo_socio,nome_socio,documento_socio,codigo_qualificacao_socio
48931,00186210,2,,***467273**,16
57622,94583580,2,,***000000**,16
67434,94999851,2,,***000000**,16
98069,96758172,2,,***000000**,16
120731,00442603,2,,***160823**,16
...,...,...,...,...,...
918369,15038037,2,,***000000**,16
951697,65691743,2,,***000000**,16
951942,78493061,2,,***000000**,16
952031,11335429,2,,***000000**,16


In [36]:
# Check Null values como dummies for documento_socio
df_socios[df_socios["documento_socio"] == "***000000**"]

Unnamed: 0,cnpj,tipo_socio,nome_socio,documento_socio,codigo_qualificacao_socio
57622,94583580,2,,***000000**,16
67434,94999851,2,,***000000**,16
98069,96758172,2,,***000000**,16
141740,02146231,2,,***000000**,16
154181,00576411,2,,***000000**,16
...,...,...,...,...,...
951697,65691743,2,,***000000**,16
951942,78493061,2,,***000000**,16
952031,11335429,2,,***000000**,16
952086,91103044,2,,***000000**,16


In [37]:
# Removendo socios sem identificaçao
df_socios = df_socios[
    ~(
        ((df_socios["nome_socio"].isna()) & (df_socios["documento_socio"].isna())) |
        ((df_socios["nome_socio"].isna()) & (df_socios["documento_socio"] == "***000000**"))
    )
].copy()

In [38]:
# Create foreigner flag
df_socios["flag_socio_estrangeiro"] = (
    df_socios["documento_socio"]
    .fillna("")  # substitui NA por string vazia
    .eq("***999999**")  # compara com o valor desejado
    .astype(int)  # converte True/False para 1/0
)

# Checando se funcionou
df_socios[(df_socios["flag_socio_estrangeiro"] == 1) & (df_socios["documento_socio"] != "***999999**")]

Unnamed: 0,cnpj,tipo_socio,nome_socio,documento_socio,codigo_qualificacao_socio,flag_socio_estrangeiro


In [39]:
df_socios.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2019054 entries, 0 to 2019149
Data columns (total 6 columns):
 #   Column                     Dtype 
---  ------                     ----- 
 0   cnpj                       string
 1   tipo_socio                 Int64 
 2   nome_socio                 string
 3   documento_socio            string
 4   codigo_qualificacao_socio  string
 5   flag_socio_estrangeiro     int64 
dtypes: Int64(1), int64(1), string(4)
memory usage: 109.8 MB


In [None]:
# Agrupando por CNPJ qtde_socios e qtd_estrangeiros
df_socios_processed = df_socios.groupby("cnpj", dropna=False).agg(
    qtde_socios=("documento_socio", "count"),           # total de sócios
    qtd_estrangeiros=("flag_socio_estrangeiro", "sum") # total de estrangeiros
).reset_index()

In [41]:
df_socios_processed.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1101888 entries, 0 to 1101887
Data columns (total 3 columns):
 #   Column            Non-Null Count    Dtype 
---  ------            --------------    ----- 
 0   cnpj              1101888 non-null  string
 1   qtd_socios        1101888 non-null  int64 
 2   qtd_estrangeiros  1101888 non-null  int64 
dtypes: int64(2), string(1)
memory usage: 25.2 MB


# Relations

In [42]:
df_result = (
    ((df_empresas[["cnpj", "cod_porte"]]).drop_duplicates())
    .merge(
        df_socios_processed.drop_duplicates(),
        "inner",
        "cnpj"
    )
)

In [None]:
df_result["doc_alvo"] = df_result.apply(
    lambda row: True if row["cod_porte"] == "03" and row["qtde_socios"] > 1 else False,
    axis=1
)

In [44]:
# Criar flag booleana: True se qtd_estrangeiros > 0, False caso contrário
df_result["flag_socio_estrangeiro"] = df_result["qtd_estrangeiros"] > 0

In [None]:
df_result = df_result[["cnpj", "qtde_socios", "flag_socio_estrangeiro", "doc_alvo"]].drop_duplicates()

## DF final

In [46]:
df_result

Unnamed: 0,cnpj,qtd_socios,flag_socio_estrangeiro,doc_alvo
0,00000006,1,False,False
1,00000007,2,False,False
2,00000081,1,False,False
3,00000084,2,False,False
4,00000092,2,False,False
...,...,...,...,...
206386,04631835,2,False,False
206387,04631846,2,False,False
206388,04631849,2,False,False
206389,04631871,1,False,False


In [47]:
# Checando duplicados
duplicates = df_result["cnpj"].duplicated(keep=False).sum()

if duplicates > 0:
    print(f"Existem {duplicates} registros duplicados")
else:
    print("Não existem CNPJs duplicados")

Não existem CNPJs duplicados


In [48]:
# Checando nulos
null_percentage = (df_result.isna().sum() / len(df_result)) * 100
print(null_percentage.round(2))

cnpj                      0.0
qtd_socios                0.0
flag_socio_estrangeiro    0.0
doc_alvo                  0.0
dtype: float64


# Data Quality

In [59]:
import pandera as pa
from pandera import Column, DataFrameSchema, Check

In [None]:
table = "silver_companies"

schema = DataFrameSchema({
        "cnpj": Column(str, unique=True, nullable=False),
        "cod_porte": Column(str, checks=Check.isin(["00", "01", "03", "05"])),
        "natureza_juridica": Column(int, checks=Check.in_range(1011, 9999), nullable=False)
    })

try:
    schema.validate(df_empresas, lazy=True)
    print("ok")
except pa.errors.SchemaErrors as err:
    raise Exception(f"❌ Data Quality falhou para {table}:\n{err.failure_cases}")


ok
