In [None]:
# !conda install -n arcgis -q -y sqlalchemy psycopg2 python-slugify geopandas conda-forge::geoalchemy2

In [None]:
# import arcgis
import logging
import geopandas as gpd
import pandas as pd

from datetime import datetime
from pathlib import Path
from slugify import slugify
from sqlalchemy import create_engine, text, \
    Table, MetaData, Column, \
    Integer, String, BigInteger, Boolean, \
    Float, DateTime, Numeric, SmallInteger, \
    UniqueConstraint, CheckConstraint, ForeignKeyConstraint
from geoalchemy2 import Geometry

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Coletar os dados brutos e salvar em Parquet

## Informações de conexão de origem

In [None]:
with open("./dsn/source.txt") as r:
    src_engine = create_engine(r.read())

## Informações de template do Parquet

In [None]:
def create_filename(schema, table, extension, item, suffix="", output_path="./data/"):
    today = datetime.now().strftime("%Y/%m/%d")
    filename = f"{schema}_{table}_{suffix}".strip("_")
    extension = extension.strip().lower()

    if extension not in ("csv", "parquet"):
        raise Exception("Apenas CSV ou parquet")
    
    if suffix and not suffix.startswith("_"):
        suffix = "_" + str(suffix)
    
    _file = Path(output_path) / today / item / f"{filename}.{extension}"
    
    if not _file.exists():
        _file.parent.mkdir(parents=True, exist_ok=True)

    return  _file

## Informações de tabela de origem

In [None]:
table_item = "contagem_pintas_au"

# Argumentos para rodar o pipeline
src_table_prefix = "resultados_analiticos"
src_table_name = f"{src_table_prefix}_{table_item}"
src_schema = "geoq_valida"
src_primary_key = "objectid"

# Coordenadas
src_x_field = "longitude"
src_y_field = "latitude"
src_srid = 4674

# Temporal
src_ts_field = "data_de_analise"
src_ts_format = "%d/%m/%Y"

f"{src_schema}.{src_table_name}"

### Ler tabela de origem e salvar em Parquet

In [None]:
# Puxar os dados e converter em dataframe
try: 
    df = (
        pd.read_sql_table(src_table_name, src_engine, schema=src_schema, index_col=src_primary_key)
            # Sanitizar colunas, para não inserir valores fora de padrão na DDL do banco (acentos, caracteres especiais, etc)
            .rename(columns=lambda col: slugify(col, separator="_"))
    )
    
except Exception as e:
    logging.error(e)

assert not df.empty, "Esta tabela está vazia. Abortando..."
assert df.index.is_unique, f"Esta tabela não possui identifcador único de registro no campo <{primary_key}>"

out_raw_file = create_filename(src_schema, src_table_name, "parquet", table_item, suffix="raw")
df.to_parquet(out_raw_file, index=True)

# Info
df.info()

# Tratamento dos dados brutos e salvar em PostgreSQL

## Informações de conexão de destino

In [None]:
dst_schema = "digeop"

with open("./dsn/destiny-geobancao.txt") as r:
    dst_engine = create_engine(r.read(), plugins=["geoalchemy2"])

metadata = MetaData(schema=dst_schema)

## Tabela de amostras (survey)

### Análise de colunas

In [None]:
# Construção das colunas
pk_column = Column(src_primary_key, BigInteger(), primary_key=True, autoincrement=False)
geom_column = Column('geometry', Geometry('POINT', srid=src_srid, dimension=2), nullable=False)

fixed_columns = (
    Column("projeto_amostragem", String(255), nullable=False),
    Column("projeto_publicacao", String(255), nullable=False),
    Column("centro_de_custo", String(10), nullable=False),
    Column("classe", String(50), nullable=False),
    Column("numero_de_campo", String(20), nullable=False),
    Column("numero_de_laboratorio", String(8), nullable=True),
    Column("duplicata", Boolean(), nullable=False, default=False, server_default="0"),
    Column(src_x_field, Float(), nullable=False),
    Column(src_y_field, Float(), nullable=False),
    Column("laboratorio", String(255), nullable=False),
    Column("job", String(30), nullable=True),
    Column("data_de_analise", DateTime(), nullable=True),
    Column("abertura", String(100), nullable=True),
    Column("leitura", String(100), nullable=False),
    Column("observacao", String(1024), nullable=True),    
)

extra_columns = ()

# Juntando as colunas comuns ao dataframe
survey_columns = fixed_columns[:-2] + extra_columns + fixed_columns[-2:]
survey_column_names = tuple([col.name for col in survey_columns])

# Verificação se a coluna existe no DataFrame
for col in survey_column_names:
    assert col in df.columns, f"The fixed column <{col}> is not in table columns."

# Adicionar pk e geometria na tupla final de colunas
survey_columns = (
    pk_column,
    *survey_columns, 
    geom_column
)
survey_columns

### Criação do objeto SQL Table

In [None]:
survey_table_name = f"{table_item}_amostras"

survey_tbl = Table(
    survey_table_name,
    metadata,
    *survey_columns,
    extend_existing=True  # !!!!
)

# survey_tbl

### Extração de dados de amostras

In [None]:
# GeoPandas
survey_df = gpd.GeoDataFrame(
    df.filter(survey_column_names)
        .apply(lambda col: col.replace("", None))
        .rename(columns=lambda col: slugify(col, separator="_"))    
        .assign(
            # forçar duplicata como booleano
            duplicata=lambda df: df.duplicata.fillna("não").str.match("(sim|1)", case=False)
        ),
    geometry=gpd.points_from_xy(
        df[src_x_field], 
        df[src_y_field], 
        crs=src_srid
    )
)

# Consertar capos de timestamp
if src_ts_field in survey_df.columns:
    data_analise_ser = pd.to_datetime(survey_df[src_ts_field], format=src_ts_format, errors='coerce')

    date_invalid_idx = (
        survey_df[[src_ts_field]]
            .join(
                data_analise_ser,
                rsuffix='_converted'
            )
            .loc[
                lambda df:df.data_de_analise_converted.isna()
            ].index
    )

    if not date_invalid_idx.empty:
        out_date_invalid_file = create_filename(src_schema, src_table_name, "parquet", table_item, suffix="survey_date_invalid")
        survey_df.loc[date_invalid_idx].to_parquet(out_date_invalid_file, index=True)
    
    survey_df[src_ts_field] = data_analise_ser
        
    del data_analise_ser

# ObjectID tem que ser único e não pode ter geometria nula ou vazia
assert survey_df.index.is_unique, f"ObjectID precisa ser único: {survey_df[survey_df.index.duplicated()].index.tolist()}"
assert not (survey_df.geometry.isna().all() and survey_df.geometry.is_empty.all()), "A tabela não pode ter geometria nula ou vazia"

# Gravar em Parquet
out_survey_file = create_filename(src_schema, src_table_name, "parquet", table_item, suffix="survey")
survey_df.to_parquet(out_survey_file, index=True)

survey_df.info()

## Tabelas de análises (assays)

### Análise de colunas

In [None]:
# Colunas a excluir do processo
assay_excluded_columns = ("globalid", "lote", "ra", "metodo", "created_user", "created_date", "last_edited_user", "last_edited_date")

# Colunas a serem pivotadas
assay_columns = [col for col in df.columns if col not in (survey_column_names + assay_excluded_columns)]

# # Colunas para a tabela de resultados analíticos
# assay_columns_fixed = (
#     Column("abertura", String(100), nullable=False),
#     Column("leitura", String(100), nullable=False)
# )

# for col in assay_columns_fixed:
#     assert col.name in df.columns, f"The fixed column <{col}> is not in assay columns."

assay_sample_column = Column("amostra", Integer(), nullable=False)
assay_analyte_column = Column("analito", String(20), nullable=False)
assay_value_column =  Column("quantidade", SmallInteger(), nullable=False)


assay_pivoted_columns = (
    Column("id", BigInteger(), primary_key=True, autoincrement=True),
    assay_sample_column,
    # *assay_columns_fixed,
    assay_analyte_column,
    # assay_unit_column,
    # assay_qualif_column,
    assay_value_column
)

# assay_pivoted_columns

### Criação do objeto SQL Table

In [None]:
assay_table_name = f"{table_item}_analises"

assay_tbl = Table(
    assay_table_name,
    metadata,
    *assay_pivoted_columns,
    UniqueConstraint(
        "amostra", 
        # "abertura", "leitura", 
        "analito", # "unidade", 
        name=f"{assay_table_name}_uniq"
    ),
    CheckConstraint(f"{assay_value_column.name} > 0", name=f"{assay_table_name}_qualif_chk"),
    ForeignKeyConstraint(
        [assay_sample_column.name],
        [f"{survey_table_name}.{src_primary_key}"],
        onupdate="RESTRICT",
        ondelete="CASCADE",
    ),
    extend_existing=True  # !!!!
)

assay_tbl

In [None]:
spaces_regex = r"\s"
missing_values = ["ND", "", "N.A.", 0, "0"] + [" ", "#N/A", "#N/A N/A", "#NA", "-1.#IND", "-1.#QNAN", "-NaN", "-nan", "1.#IND", "1.#QNAN", "<NA>", "N/A", "NA", "NULL", "NaN", "None", "n/a", "nan", "null"]

# normalize_values = {
#     ',': ".",
#     "-": "<",
#     "<.": "<0.",
# }

humanized_columns = {
    "ouro_0_5_mm": "ouro (<0,5mm)",
    "ouro_0_5_1_mm": "ouro (0,5 a 1mm)",
    "ouro_1_mm": "ouro (>1mm)"
}

# Pipes
def handle_missing(series, extra_missing_values=[]):
    return (
        series.str.replace(spaces_regex, "", regex=True)
            .replace(missing_values + extra_missing_values, None) 
    )

def handle_normalized(series, replaces={}):
    for key, value in replaces.items():
        series = series.str.replace(key, value)
    return series
    
# Primeiras limpezas
index_names = [col.name for col in (assay_sample_column, assay_analyte_column)]
value_name = assay_value_column.name

assay_df = (
    df.filter(assay_columns)
        .apply(lambda col: col.replace("", None))
        .rename(columns=lambda col: slugify(col, separator="_"))
        # Traz o objectid para o index do dataframe
        # .set_index(assay_meta, append=True)
        # Humaniza os nomes
        .rename(columns=humanized_columns)
        # De-pivot
        .stack()
        # Ajusta nomes
        .rename_axis(index_names)
        .rename(value_name)
        # handle missing data on values
        .pipe(handle_missing)
        .dropna()
        # # normalize values  qualificators
        # .pipe(handle_normalized, normalize_values)
)

# ObjectID tem que ser único
assert survey_df.index.is_unique

# Valores precisam atender a padrão de valores
values_match = assay_df.astype(str).str.match(r"^\d+$")

try:
    assert values_match.all(), f"Alguns valores <{assay_df[~values_match].shape[0]}> não coincidiram com a expressão regular: \n{assay_df[~values_match].head()}"
    
except AssertionError as e:
    logging.warn(e)
    assay_error_oids = assay_df[~values_match].index.get_level_values(0).drop_duplicates().tolist()

    # Write errors
    out_assay_error_file = create_filename(src_schema, src_table_name, "csv", table_item, suffix="assay_errors")
    df[df.index.isin(assay_error_oids)].to_csv(out_assay_error_file, index=True)
    logging.warn(f"Amostras com problemas de validação de valores estão salvas no arquivo '{out_assay_error_file}'")

# Gravar em Parquet
out_assay_file = create_filename(src_schema, src_table_name, "parquet", table_item, suffix="assay")
assay_df.to_frame().to_parquet(out_assay_file, index=True)

assay_df.info()

# Mandar para o banco de dados de destino

### Criar estruturas de SQL usando o SQLALchemy

In [None]:
# Destrói as duas tabelas
# metadata.drop_all(dst_engine)
# metadata.create_all(dst_engine)

In [None]:
# Verifica se passaram no teste de validação da regex
assert assay_df.equals(assay_df[values_match]), "Não são iguais"

# Identificar amostras sem análises inválidas
#valid_samples = assay_df[values_match].index.get_level_values(assay_sample_column.name).drop_duplicates().tolist()

# Gravar o parquet no banco de dados
with dst_engine.connect() as conn:
    with conn.begin():
        # Truncar tabelas
        logging.info("Truncando tabelas...")
        conn.execute(text('TRUNCATE TABLE %s CASCADE;' % survey_table_name))
        
        # 'to_postgis' não funciona com method. Ver como fazer isso com 'to_sql' tradicional
        logging.info("Gravando amostras...")
        ( #survey_df[survey_df.index.isin(valid_samples)]
        survey_df
            .to_postgis(
                survey_table_name, 
                conn, 
                if_exists='append', 
                schema=dst_schema, 
                index=True, 
                chunksize=5000
            )
        )

        logging.info("Gravando análises...")
        (assay_df[values_match]
            .reset_index()
            .rename_axis(assay_pivoted_columns[0].name)
            .to_sql(
                assay_table_name, 
                conn, 
                if_exists='append', 
                schema=dst_schema, 
                index=False, 
                chunksize=10000, 
                # method="multi" # https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method
            )
        )
        
        conn.commit()
        
    logging.info("Reindexando...")
    for tbl in [survey_table_name, assay_table_name]:
        conn.execute(text(f"REINDEX TABLE {tbl};"))
        
    logging.info("Finalizado!")

# Comparar com tabela de mineralometria

In [None]:
merged_df = gpd.read_postgis(
    'SELECT * FROM digeop.mineralometria_amostras', dst_engine, geom_col="geometry", index_col="objectid"
).merge(
    survey_df, how='inner', on=['numero_de_laboratorio']
).sort_index(axis="columns")

merged_df.info()

In [None]:
merged_df[["numero_de_laboratorio", "geometry_x", "geometry_y"]].assign(
    distance = lambda df: df.geometry_x.distance(df.geometry_y)
).sort_values('distance', ascending=False)