In [None]:
import sqlalchemy
from sqlalchemy.engine import URL
from sqlalchemy import create_engine, text
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import boto3
import ast
from math import ceil
import gc

In [None]:
SECRET_NAME = "database_tcepb"
REGION_NAME = "sa-east-1"

# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
    service_name = "secretsmanager",
    region_name = REGION_NAME
)

get_secret_value_response = client.get_secret_value(
    SecretId = SECRET_NAME
)
SERVER_TRIBUNAL = ast.literal_eval(get_secret_value_response["SecretString"])["host"] + ", " + ast.literal_eval(get_secret_value_response["SecretString"])["port"]
DB_TRIBUNAL = ast.literal_eval(get_secret_value_response["SecretString"])["dbname"]
USERNAME = ast.literal_eval(get_secret_value_response["SecretString"])["username"]
PASSWORD = ast.literal_eval(get_secret_value_response["SecretString"])["password"]

In [None]:
string_connection = "DRIVER={SQL Server};" + f"SERVER={SERVER_TRIBUNAL};DATABASE={DB_TRIBUNAL};UID={USERNAME};PWD={PASSWORD}"
url = URL.create("mssql+pyodbc", query={"odbc_connect": string_connection})
engine = create_engine(url) # connection engine

In [None]:
with open("main_query.sql", "r") as f:
    main_query = f.read()

with open("filter_medicine_with_cnae&anvisa.sql", "r") as f:
    filter_medicine_with_cnae_and_anvisa = f.read()
with open("filter_medicine_with_cnae&NOTanvisa.sql", "r") as f:
    filter_medicine_with_cnae_and_NOTanvisa = f.read()
with open("filter_medicine_with_NOTcnae&anvisa.sql", "r") as f:
    filter_medicine_with_NOTcnae_and_anvisa = f.read()
with open("filter_medicine_with_NOTcnae&NOTanvisa.sql", "r") as f:
    filter_medicine_with_NOTcnae_and_NOTanvisa = f.read()

with open("filter_hospital_material_with_cnae.sql", "r") as f:
    filter_hospital_material_with_cnae = f.read()
with open("filter_hospital_material_with_NOTcnae.sql", "r") as f:
    filter_hospital_material_with_NOTcnae = f.read()

with open("filter_others_with_cnae.sql", "r") as f:
    filter_others_with_cnae = f.read()
with open("filter_others_with_NOTcnae.sql", "r") as f:
    filter_others_with_NOTcnae = f.read()

dict_filters = { # organize filters
    "medicine": {
        "cnae&anvisa": filter_medicine_with_cnae_and_anvisa,
        "cnae&NOTanvisa": filter_medicine_with_cnae_and_NOTanvisa,
        "NOTcnae&anvisa": filter_medicine_with_NOTcnae_and_anvisa,
        "NOTcnae&NOTanvisa": filter_medicine_with_NOTcnae_and_NOTanvisa
    },
    "hospital_material": {
        "cnae": filter_hospital_material_with_cnae,
        "NOTcnae": filter_hospital_material_with_NOTcnae
    },
    "others": {
        "cnae": filter_others_with_cnae,
        "NOTcnae": filter_others_with_NOTcnae
    }
}

In [None]:
def execute_query(engine: sqlalchemy.engine.base.Engine, query: str) -> list[tuple]:
    """
    Execute input query using input engine and return the rows result as a list of tuples.
    """
    with engine.begin() as conn:
        result = conn.execute(text(query))
        rows = []
        for r in result:
            rows.append(tuple(r))
    return rows

In [None]:
data_medicine_with_cnae_and_anvisa = execute_query(engine, main_query + dict_filters["medicine"]["cnae&anvisa"])
data_medicine_with_cnae_and_NOTanvisa = execute_query(engine, main_query + dict_filters["medicine"]["cnae&NOTanvisa"])
data_medicine_with_NOTcnae_and_anvisa = execute_query(engine, main_query + dict_filters["medicine"]["NOTcnae&anvisa"])
data_medicine_with_NOTcnae_and_NOTanvisa = execute_query(engine, main_query + dict_filters["medicine"]["NOTcnae&NOTanvisa"])

data_hospital_material_with_cnae = execute_query(engine, main_query + dict_filters["hospital_material"]["cnae"])
data_hospital_material_with_NOTcnae = execute_query(engine, main_query + dict_filters["hospital_material"]["NOTcnae"])

data_others_with_cnae = execute_query(engine, main_query + dict_filters["others"]["cnae"])
data_others_with_NOTcnae = execute_query(engine, main_query + dict_filters["others"]["NOTcnae"])

In [None]:
schema = StructType([
    StructField(name="id_produto", dataType=IntegerType(), nullable=False),
    StructField(name="codigo_cfop", dataType=StringType(), nullable=True),
    StructField(name="codigo_cest", dataType=StringType(), nullable=True),
    StructField(name="codigo_ncm", dataType=StringType(), nullable=True),
    StructField(name="codigo_ean", dataType=StringType(), nullable=True),
    StructField(name="descricao", dataType=StringType(), nullable=False),
    StructField(name="unidade", dataType=StringType(), nullable=True),
    StructField(name="id_medicamento", dataType=IntegerType(), nullable=False),
    StructField(name="cod_anvisa", dataType=StringType(), nullable=True),
    StructField(name="id_combustivel", dataType=IntegerType(), nullable=False),
    StructField(name="codigo_anp", dataType=StringType(), nullable=True),
    StructField(name="cnpj", dataType=StringType(), nullable=True),
    StructField(name="razao_social", dataType=StringType(), nullable=True),
    StructField(name="nome_da_atividade_economica", dataType=StringType(), nullable=True),
    StructField(name="cnae_fiscal", dataType=StringType(), nullable=True),
    StructField(name="cnae_secundaria", dataType=StringType(), nullable=True)
])

In [None]:
spark = SparkSession.builder \
    .appName("training_base") \
    .config("spark.executor.memory", "8G") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "60G") \
    .master("local[*]") \
    .getOrCreate()

In [None]:
df_medicine_with_cnae_and_anvisa = spark.createDataFrame(data=data_medicine_with_cnae_and_anvisa, schema=schema)
df_medicine_with_cnae_and_NOTanvisa = spark.createDataFrame(data=data_medicine_with_cnae_and_NOTanvisa, schema=schema)
df_medicine_with_NOTcnae_and_anvisa = spark.createDataFrame(data=data_medicine_with_NOTcnae_and_anvisa, schema=schema)
df_medicine_with_NOTcnae_and_NOTanvisa = spark.createDataFrame(data=data_medicine_with_NOTcnae_and_NOTanvisa, schema=schema)

df_hospital_material_with_cnae = spark.createDataFrame(data=data_hospital_material_with_cnae, schema=schema)
df_hospital_material_with_NOTcnae = spark.createDataFrame(data=data_hospital_material_with_NOTcnae, schema=schema)

df_others_with_cnae = spark.createDataFrame(data=data_others_with_cnae, schema=schema)
df_others_with_NOTcnae = spark.createDataFrame(data=data_others_with_NOTcnae, schema=schema)

In [None]:
del data_medicine_with_cnae_and_anvisa, data_medicine_with_cnae_and_NOTanvisa, data_medicine_with_NOTcnae_and_anvisa, data_medicine_with_NOTcnae_and_NOTanvisa
del data_hospital_material_with_cnae, data_hospital_material_with_NOTcnae, data_others_with_cnae, data_others_with_NOTcnae
gc.collect()

In [None]:
print(f"Medicine with cnae and anvisa -> shape: {df_medicine_with_cnae_and_anvisa.count()},{len(df_medicine_with_cnae_and_anvisa.columns)}")
print(f"Medicine with cnae and without anvisa -> shape: {df_medicine_with_cnae_and_NOTanvisa.count()},{len(df_medicine_with_cnae_and_NOTanvisa.columns)}")
print(f"Medicine without cnae and with anvisa -> shape: {df_medicine_with_NOTcnae_and_anvisa.count()},{len(df_medicine_with_NOTcnae_and_anvisa.columns)}")
print(f"Medicine without cnae and anvisa -> shape: {df_medicine_with_NOTcnae_and_NOTanvisa.count()},{len(df_medicine_with_NOTcnae_and_NOTanvisa.columns)}\n")

print(f"Hospital material with cnae -> shape: {df_hospital_material_with_cnae.count()},{len(df_hospital_material_with_cnae.columns)}")
print(f"Hospital material without cnae -> shape: {df_hospital_material_with_NOTcnae.count()},{len(df_hospital_material_with_NOTcnae.columns)}\n")

print(f"Others with cnae -> shape: {df_others_with_cnae.count()},{len(df_others_with_cnae.columns)}")
print(f"Others without cnae -> shape: {df_others_with_NOTcnae.count()},{len(df_others_with_NOTcnae.columns)}")

In [None]:
# df_medicine_with_cnae_and_anvisa = df_medicine_with_cnae_and_anvisa.toPandas()
# df_medicine_with_cnae_and_NOTanvisa = df_medicine_with_cnae_and_NOTanvisa.toPandas()
# df_medicine_with_NOTcnae_and_anvisa = df_medicine_with_NOTcnae_and_anvisa.toPandas()
# df_medicine_with_NOTcnae_and_NOTanvisa = df_medicine_with_NOTcnae_and_NOTanvisa.toPandas()

# df_hospital_material_with_cnae = df_hospital_material_with_cnae.toPandas()
# df_hospital_material_with_NOTcnae = df_hospital_material_with_NOTcnae.toPandas()

# df_others_with_cnae = df_others_with_cnae.toPandas()
# df_others_with_NOTcnae = df_others_with_NOTcnae.toPandas()

In [None]:
def filter_by_cnpjs(df: pyspark.sql.dataframe.DataFrame, limit: int) -> pyspark.sql.dataframe.DataFrame:
    """
    Get a pyspark dataframe and from the limit informed collect a quantity of products per cnpj
    and return a close number of rows from the limit.
    """
    if df.count() <= limit:
        return df
    else:
        cnpjs = [c[0] for c in df.select("cnpj").distinct().collect()]
        qtd_per_cnpj = ceil(limit/len(cnpjs)) # round to higher number
        id_produtos = []
        if qtd_per_cnpj <= 1: # if the number of cnpjs is equal or higher them the limit
            for c in cnpjs:
                id_prod = [i[0] for i in df.select("id_produto").filter(f"cnpj = {c}").head(1)]
                for id in id_prod:
                    id_produtos.append(id)
        else:
            for c in cnpjs:
                id_prod = [i[0] for i in df.select("id_produto").filter(f"cnpj = {c}").head(qtd_per_cnpj)]
                for id in id_prod:
                    id_produtos.append(id)

        id_produtos_str = ""
        for id in id_produtos:
            if id != id_produtos[-1]:
                id_produtos_str += str(id) + ","
            else:
                id_produtos_str += str(id)
        
        return df.filter(f"id_produto IN ({id_produtos_str})")

In [None]:
spark.catalog.clearCache()
df_medicine_with_cnae_and_anvisa = filter_by_cnpjs(df_medicine_with_cnae_and_anvisa, 2500)
spark.catalog.clearCache()
df_medicine_with_cnae_and_NOTanvisa = filter_by_cnpjs(df_medicine_with_cnae_and_NOTanvisa, 4194)
spark.catalog.clearCache()
df_medicine_with_NOTcnae_and_anvisa = filter_by_cnpjs(df_medicine_with_NOTcnae_and_anvisa, 2500)
spark.catalog.clearCache()
df_medicine_with_NOTcnae_and_NOTanvisa = filter_by_cnpjs(df_medicine_with_NOTcnae_and_NOTanvisa, 2500)
spark.catalog.clearCache()

df_hospital_material_with_cnae = filter_by_cnpjs(df_hospital_material_with_cnae, 5000)
spark.catalog.clearCache()
df_hospital_material_with_NOTcnae = filter_by_cnpjs(df_hospital_material_with_NOTcnae, 5000)
spark.catalog.clearCache()

df_others_with_cnae = filter_by_cnpjs(df_others_with_cnae, 5000)
spark.catalog.clearCache()
df_others_with_NOTcnae = filter_by_cnpjs(df_others_with_NOTcnae, 5000)
spark.catalog.clearCache()

In [None]:
print(f"Medicine with cnae and anvisa -> shape: {df_medicine_with_cnae_and_anvisa.count()},{len(df_medicine_with_cnae_and_anvisa.columns)}")
print(f"Medicine with cnae and without anvisa -> shape: {df_medicine_with_cnae_and_NOTanvisa.count()},{len(df_medicine_with_cnae_and_NOTanvisa.columns)}")
print(f"Medicine without cnae and with anvisa -> shape: {df_medicine_with_NOTcnae_and_anvisa.count()},{len(df_medicine_with_NOTcnae_and_anvisa.columns)}")
print(f"Medicine without cnae and anvisa -> shape: {df_medicine_with_NOTcnae_and_NOTanvisa.count()},{len(df_medicine_with_NOTcnae_and_NOTanvisa.columns)}\n")

print(f"Hospital material with cnae -> shape: {df_hospital_material_with_cnae.count()},{len(df_hospital_material_with_cnae.columns)}")
print(f"Hospital material without cnae -> shape: {df_hospital_material_with_NOTcnae.count()},{len(df_hospital_material_with_NOTcnae.columns)}\n")

print(f"Others with cnae -> shape: {df_others_with_cnae.count()},{len(df_others_with_cnae.columns)}")
print(f"Others without cnae -> shape: {df_others_with_NOTcnae.count()},{len(df_others_with_NOTcnae.columns)}")