In [2]:
import csv
import datetime
import numpy
import os
import pandas
import re
import requests
import sqlalchemy
import sqlalchemy.ext.declarative
import sqlalchemy.orm
import sqlalchemy.orm.decl_api
import threading
import unidecode

# cfg

In [3]:
cfg = {
    "user": f"{os.environ['POSTGRE_USER']}",
    "password": f"{os.environ['POSTGRE_PASSWORD']}",
    "host": "localhost",
    "port": "5432",
    "database": "herbario"
}
n_splits = 8
filename = "original.csv"
filename_george = "dados-george.csv"
engine = sqlalchemy.create_engine(f"postgresql+psycopg2://{cfg['user']}:{cfg['password']}@{cfg['host']}:{cfg['port']}/{cfg['database']}", echo=True, pool_pre_ping=True)
Session = sqlalchemy.orm.sessionmaker(bind=engine)
Session.configure(bind=engine)
session = Session()

# Load data

In [4]:
dataframe = pandas.read_csv("original.csv", sep=";", low_memory=False, skipinitialspace=True)
dataframe_george = pandas.read_csv(filename_george, sep=";", low_memory=False, skipinitialspace=True)

# "Tables"

In [5]:
Base = sqlalchemy.ext.declarative.declarative_base()

class DataSP(Base):
    __tablename__ = "data"

    seq = sqlalchemy.Column(sqlalchemy.BigInteger, primary_key=True)
    modified = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
    institution_code = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    collection_code = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    catalog_number = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    basis_of_record = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    kingdom = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    phylum = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    classe = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    order = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    family = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    genus = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    specific_epithet = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    infraspecific_epithet = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    scientific_name = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    scientific_name_authorship = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    identified_by = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    year_identified = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    month_identified = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    day_identified = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    type_status = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    recorded_by = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    record_number = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    field_number = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    year = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
    month = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
    day = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
    event_time = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    continent_ocean = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    country = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    state_province = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    county = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    locality = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    decimal_longitude = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    decimal_latitude = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    verbatim_longitude = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    verbatim_latitude = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    coordinate_precision = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    bounding_box = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    minimum_elevation_in_meters = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
    maximum_elevation_in_meters = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
    minimum_depth_in_meters = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
    maximum_depth_in_meters = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
    sex = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    preparation_type = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    individual_count = sqlalchemy.Column(sqlalchemy.BigInteger, nullable=True)
    previous_catalog_number = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    relationship_type = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    related_catalog_item = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    occurrence_remarks = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    barcode = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    imagecode = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    geo_flag = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    george = sqlalchemy.Column(sqlalchemy.Boolean, nullable=True)
    my_country = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    my_state = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    my_city = sqlalchemy.Column(sqlalchemy.String, nullable=True)

    def __repr__(self):
        return "DataSP(seq=%s, modified=%s, institution_code=%s, collection_code=%s, catalog_number=%s, " \
               "basis_of_record=%s, kingdom=%s, phylum=%s, classe=%s, order=%s, family=%s, genus=%s, " \
               "specific_epithet=%s, infraspecific_epithet=%s, scientific_name=%s, scientific_name_authorship=%s, " \
               "identified_by=%s, year_identified=%s, month_identified=%s, day_identified=%s, type_status=%s, " \
               "recorded_by=%s, record_number=%s, field_number=%s, year=%s, month=%s, day=%s, event_time=%s, " \
               "continent_ocean=%s, country=%s, state_province=%s, county=%s, locality=%s, decimal_longitude=%s, " \
               "decimal_latitude=%s, verbatim_longitude=%s, verbatim_latitude=%s, coordinate_precision=%s, " \
               "bounding_box=%s, minimum_elevation_in_meters=%s, maximum_elevation_in_meters=%s, " \
               "minimum_depth_in_meters=%s, maximum_depth_in_meters=%s, sex=%s, preparation_type=%s, " \
               "individual_count=%s, previous_catalog_number=%s, relationship_type=%s, related_catalog_item=%s, " \
               "occurrence_remarks=%s, barcode=%s, imagecode=%s, geo_flag=%s) "

# County is muncipio, condado
class County(Base):
    __tablename__ = "county"

    id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
    county = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    county_normalized = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    uf = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    uf_normalized = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    uf_name = sqlalchemy.Column(sqlalchemy.String, nullable=True)
    uf_name_normalized = sqlalchemy.Column(sqlalchemy.String, nullable=True)

    def __repr__(self):
        return "County(id=%s, county=%s, county_normalized=%s, uf=%s, uf_normalized=%s, uf_name=%s, uf_name_normalized=%s)"

# database

In [6]:
def make_operation(session):
    try:
        session.commit()
        session.flush()
    except Exception as e:
        session.rollback()
        print(e)
        raise
    finally:
        session.close()

def create_table_if_not_exists(table_name):
    if not sqlalchemy.inspect(engine).has_table(table_name, schema=cfg["database"]):
        Base.metadata.create_all(engine)

def create_datasp(info):
    return DataSP(seq=info["seq"],
                  modified=info["modified"], institution_code=info["institutionCode"],
                  collection_code=info["collectionCode"], catalog_number=info["catalogNumber"],
                  basis_of_record=info["basisOfRecord"], kingdom=info["kingdom"], phylum=info["phylum"],
                  classe=info["class"], order=info["order"], family=info["family"],
                  genus=info["genus"],
                  specific_epithet=info["specificEpithet"],
                  infraspecific_epithet=info["infraspecificEpithet"],
                  scientific_name=info["scientificName"],
                  scientific_name_authorship=info["scientificNameAuthorship"],
                  identified_by=info["identifiedBy"], year_identified=info["yearIdentified"],
                  month_identified=info["monthIdentified"], day_identified=info["dayIdentified"],
                  type_status=info["typeStatus"],
                  recorded_by=info["recordedBy"], record_number=info["recordNumber"],
                  field_number=info["fieldNumber"], year=info["year"], month=info["month"],
                  day=info["day"], event_time=info["eventTime"],
                  continent_ocean=info["continentOcean"], country=info["country"],
                  state_province=info["stateProvince"], county=info["county"], locality=info["locality"],
                  decimal_longitude=info["decimalLongitude"],
                  decimal_latitude=info["decimalLatitude"], verbatim_longitude=info["verbatimLongitude"],
                  verbatim_latitude=info["verbatimLatitude"],
                  coordinate_precision=info["coordinatePrecision"],
                  bounding_box=info["boundingBox"],
                  minimum_elevation_in_meters=info["minimumElevationInMeters"],
                  maximum_elevation_in_meters=info["maximumElevationInMeters"],
                  minimum_depth_in_meters=info["minimumDepthInMeters"],
                  maximum_depth_in_meters=info["maximumDepthInMeters"], sex=info["sex"],
                  preparation_type=info["preparationType"],
                  individual_count=info["individualCount"],
                  previous_catalog_number=info["previousCatalogNumber"],
                  relationship_type=info["relationshipType"],
                  related_catalog_item=info["relatedCatalogItem"],
                  occurrence_remarks=info["occurrenceRemarks"], barcode=info["barcode"],
                  imagecode=info["imagecode"], geo_flag=info["geoFlag"])

def create_county(json):
    uf, uf_name = get_uf(json)
    return County(id=get_id(json), county=get_county_name(json), county_normalized=normalized(get_county_name(json)),
                  uf=uf, uf_normalized=normalized(uf), uf_name=uf_name, uf_name_normalized=normalized(uf_name))

In [7]:
def get_key(json, key):
    if key in json:
        return json[key]
    raise KeyError(f"key {key} not found")


def get_id(json):
    if "id" in json:
        return json["id"]
    raise KeyError(f"key id not found")


def get_county_name(json):
    if "nome" in json:
        return json["nome"]
    raise KeyError(f"key nome not found")


def get_uf(json):
    if "microrregiao" in json:
        if "mesorregiao" in json["microrregiao"]:
            if "UF" in json["microrregiao"]["mesorregiao"]:
                return json["microrregiao"]["mesorregiao"]["UF"]["sigla"], json["microrregiao"]["mesorregiao"]["UF"]["nome"]
            raise KeyError("key UF not found")
        raise KeyError("key mesorregiao not found")
    raise KeyError("key microrregiao not found")


def remove_accent(string):
    return unidecode.unidecode(string)


def remove_special_characters(string):
    return re.sub('[^A-Za-z0-9]+', '', string)


def normalized(string):
    return remove_special_characters(remove_accent(string)).lower()

In [8]:
if session.query(County).count() == 0:
    try:
        response = requests.get("https://servicodados.ibge.gov.br/api/v1/localidades/municipios")
    except Exception as e:
        raise print("error: {e}")

    create_table_if_not_exists("county")

    for i, county in enumerate(response.json()):
        session.add(create_county(county))
        make_operation(session)

if session.query(DataSP).count() == 0:
    create_table_if_not_exists("data")

    for row in dataframe.iterrows():
        session.add(create_datasp(row[1]))
        make_operation(session)

if session.query(DataSP).filter(DataSP.george == True).count() == 0:
    for row in dataframe.iterrows():
        if row[1]["GEORGE"].lower() == "sim":
            session.query(DataSP).filter(DataSP.seq == row[1]["seq"]).update({"george": True}, synchronize_session=False)
            make_operation(session)

2022-07-17 12:33:28,313 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2022-07-17 12:33:28,314 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-07-17 12:33:28,319 INFO sqlalchemy.engine.Engine select current_schema()
2022-07-17 12:33:28,320 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-07-17 12:33:28,325 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2022-07-17 12:33:28,325 INFO sqlalchemy.engine.Engine [raw sql] {}
2022-07-17 12:33:28,329 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-17 12:33:28,335 INFO sqlalchemy.engine.Engine SELECT count(*) AS count_1 
FROM (SELECT county.id AS county_id, county.county AS county_county, county.county_normalized AS county_county_normalized, county.uf AS county_uf, county.uf_normalized AS county_uf_normalized, county.uf_name AS county_uf_name, county.uf_name_normalized AS county_uf_name_normalized 
FROM county) AS anon_1
2022-07-17 12:33:28,336 INFO sqlalchemy.engine.Engine [generated in 0.00101s] {}
2022-07-17 1

# Preprocess (dataframe)

In [9]:
def convert_header_to_snake_case(dataframe):
    return  {column_name: re.sub(r'(?<!^)(?=[A-Z])', '_', column_name).lower() for column_name in get_columns_dataframe(dataframe)}


def change_header(dataframe):
    dataframe.rename(columns=convert_header_to_snake_case(dataframe), inplace=True)


def get_columns_numeric(dataframe):
    list_of_columns_numeric = list([])
    for columns_dataframe in get_columns_dataframe(dataframe):
        for columns_table in get_columns_table(DataSP):
            if check_if_column_is_numeric(columns_dataframe, columns_table):
                list_of_columns_numeric.append(columns_dataframe)
    return list_of_columns_numeric


def get_columns_table(table):
    return table.__table__.columns


def get_columns_dataframe(dataframe):
    return list([*dataframe.columns])


def check_if_column_is_numeric(columns_dataframe, columns_table):
    return str(columns_dataframe) in str(columns_table) and ("int" in str(columns_table.type).lower() or "float" in str(columns_table.type).lower())


def replace_nan_to_null(dataframe):
    return dataframe.replace({numpy.nan: None})


def replace_values_not_numeric(dataframe):
    for column in list([*get_columns_numeric(dataframe)]):
        dataframe[column] = pandas.to_numeric(dataframe.__getattr__(column), errors='coerce').fillna(-1)
    return dataframe


def preprocess(dataframe):
    return replace_nan_to_null(replace_values_not_numeric(dataframe))

In [10]:
data_piperaceae = session.query(DataSP).all()
data_county = session.query(County).all()

2022-07-17 12:33:28,510 INFO sqlalchemy.engine.Engine SELECT data.seq AS data_seq, data.modified AS data_modified, data.institution_code AS data_institution_code, data.collection_code AS data_collection_code, data.catalog_number AS data_catalog_number, data.basis_of_record AS data_basis_of_record, data.kingdom AS data_kingdom, data.phylum AS data_phylum, data.classe AS data_classe, data."order" AS data_order, data.family AS data_family, data.genus AS data_genus, data.specific_epithet AS data_specific_epithet, data.infraspecific_epithet AS data_infraspecific_epithet, data.scientific_name AS data_scientific_name, data.scientific_name_authorship AS data_scientific_name_authorship, data.identified_by AS data_identified_by, data.year_identified AS data_year_identified, data.month_identified AS data_month_identified, data.day_identified AS data_day_identified, data.type_status AS data_type_status, data.recorded_by AS data_recorded_by, data.record_number AS data_record_number, data.field_numb

In [37]:
def column_is_string_or_varchar_or_text(column):
    return "string" in str(column.type).lower() or "varchar" in str(column.type).lower() or "text" in str(column.type).lower()


def get_result(column_name, content, seq):
    return {"seq": seq, "column_name": column_name, "content": content}


def get_data_of_column(data, column):
    return data.__getattribute__(column)


def search(data, list_of_columns, list_values):
    list_of_values_searched = list([])
    for i, column in enumerate(list_of_columns):
        print(f"thread:{threading.currentThread().getName()};column:{column}-{i}/{len(list_of_columns)};")
        for j, values in enumerate(list_values):
            if any(re.search(value, get_data_of_column(data, column)) for key, value in values.items()):
                list_of_values_searched.append(get_result(get_data_of_column(data, "seq"), column, get_data_of_column(data, column)))
        return list_of_values_searched


def searching_country_state_county(data, list_of_columns, thread_name):
    list_of_country = list([{"country_pt": "brasil", "country_en": "brazil", "country": "br"}])
    list_of_state = list([{"uf_normalized": state.uf_normalized, "uf_name": state.uf_name_normalized} for state in data_county])
    list_of_county = list([{"county_normalized": county.county_normalized} for county in data_county])
    list_result_country = search(data, list_of_columns, list_of_country)
    list_result_state = search(data, list_of_columns, list_of_state)
    list_result_county = search(data, list_of_columns, list_of_county)
    result(list_result_country + list_result_state + list_result_county, thread_name)


def result(list_with_result, thread_name):
    print(f"founded: {len(list_with_result)}")
    header = ["seq", "column_name", "content"]
    try:
        with open(f"result-thread{thread_name}-{datetime.datetime.now().strftime('%d-%m-%Y-%H-%M-%S')}.csv", "w") as file:
            writer = csv.DictWriter(file, fieldnames=header)
            writer.writeheader()
            for lines in list_with_result:
                writer.writerows(lines)
            file.close()
    except Exception as e:
        print(e)
        raise


def search_in_data(list_of_data, list_of_columns_valid):
    for i, data in enumerate(list_of_data):
        searching_country_state_county(data, list_of_columns_valid, threading.current_thread().name)


def get_range(data_size, n_splits):
    return list(numpy.array_split(range(data_size), n_splits))


list_of_columns_valid = list([column.name.replace("data.", "") for column in get_columns_table(DataSP) if column_is_string_or_varchar_or_text(column)])
list_thread = list([])
# search_in_data(data_piperaceae, list_of_columns_valid)

# for i, index in enumerate(get_range(len(data_piperaceae), n_splits)):
#     list_thread.append(threading.Thread(name=f"{i}", target=search_in_data, args=(data_piperaceae[index[0]: index[len(index)-1]], list_of_columns_valid, )))
#
#
# for i, thread in enumerate(list_thread):
#     thread.start()
#
#
# if all(t.join() for t in list_thread):
#     print("all threads done")

thread:MainThread;column:institution_code-0/46;values:{'country_pt': 'brasil', 'country_en': 'brazil', 'country': 'br'}-0/1;
thread:MainThread;column:institution_code-0/46;values:{'uf_normalized': 'ro', 'uf_name': 'rondonia'}-0/5570;
thread:MainThread;column:institution_code-0/46;values:{'uf_normalized': 'ro', 'uf_name': 'rondonia'}-1/5570;
thread:MainThread;column:institution_code-0/46;values:{'uf_normalized': 'ro', 'uf_name': 'rondonia'}-2/5570;
thread:MainThread;column:institution_code-0/46;values:{'uf_normalized': 'ro', 'uf_name': 'rondonia'}-3/5570;
thread:MainThread;column:institution_code-0/46;values:{'uf_normalized': 'ro', 'uf_name': 'rondonia'}-4/5570;
thread:MainThread;column:institution_code-0/46;values:{'uf_normalized': 'ro', 'uf_name': 'rondonia'}-5/5570;
thread:MainThread;column:institution_code-0/46;values:{'uf_normalized': 'ro', 'uf_name': 'rondonia'}-6/5570;
thread:MainThread;column:institution_code-0/46;values:{'uf_normalized': 'ro', 'uf_name': 'rondonia'}-7/5570;
thr

KeyboardInterrupt: 

In [32]:
session.close()
engine.dispose()

2022-07-17 12:48:55,457 INFO sqlalchemy.engine.Engine ROLLBACK
