In [None]:
pip install pymysql

In [None]:
pip install awswrangler

In [None]:
pip install hvac

In [3]:
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from awswrangler import s3
from botocore.exceptions import HTTPClientError
from pandas.core.series import Series
from pandas import Timestamp
from sqlalchemy import create_engine
from pandas import (
    read_sql,
    to_datetime,
    DataFrame,
)
from pandas import read_parquet as pd_read_parquet
import json
import boto3
from dataclasses import dataclass
from functools import reduce
import io
import traceback
import sys
from numpy import dtype
from sqlalchemy.engine.base import Engine
import hvac

In [26]:
CONFIG = {
    "s3": {
        "bucket_name": "teste-emr",
        "base_path": "platfarm",
        "table_prefix": "platfarm_",
    },
    "platfarm": {
        # S3 Path to database credentials file
        "auth": "platfarm_aurora_prd",
        # Configs for table ingestion
        "tables": "etc/platfarm_tables_config.json",
    }
}

In [5]:
def read_json_valt(path):
    client = hvac.Client(
        url='https://vault-dados.portalfarm.com.br/',
        token='hvs.due0AhBPNqvz2a3laFSHzc61',
    )
    #print(client.is_authenticated())
    #quit()

    path1 = path.replace(".json", "")
    path = path1.replace("https://vault-dados.portalfarm.com.br/ui/vault/secrets/farm-bi-aws-glue-configs/show/platfarm_aurora_prd","https://vault-dados.portalfarm.com.br/farm-bi-aws-glue-configs/platfarm_aurora_prd")
    # Reading a secret
    result = client.secrets.kv.v2.read_secret_version(mount_point='farm-bi-aws-glue-configs', path=path)
    #print('base_url = ' + result['data']['data']['base_url'])
    #print('token = ' + result['data']['data']['token'])
    return result['data']['data']

In [6]:
def convert_dates(data, date_cols: list):
    df = data.copy()
    for column in date_cols:
        try:
            assert column in df.columns
            df[column] = to_datetime(df[column].astype(str), errors="coerce")
        except AssertionError:
            print(f"Column {column} not in dataframe")
    return df

In [7]:
def read_json_configs(key):
    bucket = "teste-emr"
    key = f"configs/{key}"
    s3 = boto3.resource("s3")
    response = s3.Object(bucket, key).get()
    if response["ResponseMetadata"]["HTTPStatusCode"] == 200:
        body = response["Body"]
        json_data = body.read()
        json_dict = json.loads(json_data.decode("utf8"))
        return json_dict
    else:
        raise HTTPClientError()

In [8]:
def s3_path_exists(path, existing_paths):
    checks = [path in x for x in existing_paths]
    return any(checks)

In [9]:
@dataclass
class S3Client:
    resource = boto3.resource("s3")
    client = boto3.client("s3")

    def exists(self, bucket: str, prefix: str) -> bool:
        object_listing: Dict[str, Any] = self.client.list_objects_v2(
            Bucket=bucket, Prefix=prefix
        )
        key_count: int = object_listing["KeyCount"]
        return key_count > 0

In [10]:
@dataclass
class S3Object:
    client: S3Client
    bucket: str
    key: str

    def get_path(self, *paths: str) -> str:
        basepath = f"s3://{self.bucket}/{self.key}"
        reducer = lambda x, y: x + "/" + y
        full_path = basepath + reduce(reducer, paths, "")
        return full_path

In [12]:
@dataclass
class S3Sink(S3Object):
    format: str = "parquet"
    mode: str = "append"
    dtype: Optional[Dict[str, str]] = None
    partition_cols: Optional[List[str]] = None

    def exists(self) -> bool:
        return self.client.exists(self.bucket, self.key)

    def detect_dtypes(self, obj:DataFrame):
        dtype_mapping = {
            "int64": "bigint",
            "int32": "int",
            "datetime64[ns]": "timestamp",
            "object": "string",
            "string": "string",
            "float64": "double",
            "float32": "double",
        }
        dtypes_dict:Dict[str, dtype] = obj.dtypes.to_dict()
        dtypes = {
            k: dtypes_dict[k].name if dtypes_dict[k].name != "object" else "string"
            for k in dtypes_dict
        }
        dtypes_mapped = {k: dtype_mapping[v.lower()] for k, v in list(dtypes.items())}
        return dtypes_mapped


    def _write_parquet(self, frame: DataFrame) -> None:
        dtype = self.detect_dtypes(frame)
        s3.to_parquet(
            frame,
            path=self.get_path(),
            dataset=True,
            mode=self.mode,
            compression="snappy",
            dtype=dtype,
            partition_cols=self.partition_cols,
        )

    def set_dtype(self, dtype: Dict[str, str]) -> None:
        self.dtype = dtype

    def set_partition_cols(self, partition_cols: List[str]) -> None:
        self.partition_cols = partition_cols

    def _write_text(self, obj: Union[dict, list, str]) -> None:
        if isinstance(obj, dict) or isinstance(obj, list):
            new_obj = json.dumps(obj)
        else:
            new_obj = obj
        resource = self.client.resource
        s3object = resource.Object(self.bucket, self.key)
        if isinstance(new_obj, io.TextIOWrapper):
            if new_obj.mode == "rb":
                new_obj = new_obj.read()
            elif new_obj.mode == "r":
                new_obj = new_obj.read().encode()
            else:
                raise ValueError("Invalid TextIOWrapper mode")
            s3object.put(Body=new_obj, ContentType=self.content_type)
        elif isinstance(new_obj, str):
            s3object.put(Body=new_obj.encode(), ContentType=self.content_type)

    def write(self, obj: Any) -> None:
        if self.format == "parquet" and isinstance(obj, DataFrame):
            self._write_parquet(obj)
        elif self.format == "json" and (
            isinstance(obj, dict) or isinstance(obj, list) or isinstance(obj, str)
        ):
            self.content_type = "application/json"
            self._write_text(obj)
        elif self.format == "text" and isinstance(obj, str):
            self.content_type = "plain/text"
            self._write_text(obj)
        else:
            raise NotImplementedError("Object and format combination not implemented")

    @classmethod
    def parse(cls, client: S3Client, config: Dict[str, str]):
        bucket = config["s3_sink_bucket"]
        key = config["s3_sink_key"]
        file_format = config["format"]
        mode = config.get("mode", "")
        return cls(client=client, bucket=bucket, key=key, format=file_format, mode=mode)

    def read(self) -> Union[DataFrame, str]:
        if self.format == "parquet":
            return self._read_parquet()
        elif self.format == "json":
            return self._read_text()
        elif self.format == "text":
            return self._read_text()
        else:
            raise NotImplementedError("Format not implemented")

    def _read_text(self) -> str:
        resource = self.client.resource
        obj = resource.Object(self.bucket, self.key)
        obj_response = obj.get()
        content: bytes = obj_response["Body"].read()
        content_decoded = content.decode()
        return content_decoded

    def _read_parquet(self) -> DataFrame:
        path = self.get_path() + "/"
        frame = s3.read_parquet(path=path, dataset=True)
        if isinstance(frame, DataFrame):
            return frame
        else:
            raise ValueError("Path is incorrect", frame)

In [13]:
@dataclass
class DateColumn:
    name: str
    value: datetime

    @classmethod
    def parse(cls, date_col: Optional[Dict[str, Any]]):
        if date_col is None:
            return None
        name: str = date_col["name"]
        value: datetime = date_col["value"]
        return cls(name, value)

In [14]:
@dataclass
class DateTable:
    created_at: Optional[DateColumn]
    updated_at: Optional[DateColumn]
    deleted_at: Optional[DateColumn]

    @classmethod
    def parse(cls, date_dict: Dict[str, Dict[str, Any]]):
        return cls(
            DateColumn.parse(date_dict.get("created_at")),
            DateColumn.parse(date_dict.get("updated_at")),
            DateColumn.parse(date_dict.get("deleted_at")),
        )

    def _as_dict(self):
        as_dict = {
            "created_at": self.created_at,
            "updated_at": self.updated_at,
            "deleted_at": self.deleted_at,
        }
        valid_as_dict = {k: v for k, v in as_dict.items() if v is not None}
        return valid_as_dict

    def _as_list(self):
        as_list = [self.created_at, self.updated_at, self.deleted_at]
        valid_as_list = [x for x in as_list if x is not None]
        return valid_as_list

In [15]:
class TableConfig:
    base_query: str = "select * from {table}"

    def __init__(
        self,
        name: str,
        key: str,
        created_at: Optional[str] = None,
        updated_at: Optional[str] = None,
        deleted_at: Optional[str] = None,
        datetime_columns: Optional[List[str]] = None,
    ) -> None:
        self.name = name
        self.key = key
        self.created_at = created_at
        self.updated_at = updated_at
        self.deleted_at = deleted_at
        self.datetime_columns = datetime_columns
        '''is_full_refresh = (
            created_at is None and updated_at is None and deleted_at is None
        )'''
        self.is_full_refresh = True #is_full_refresh

    def __repr__(self) -> str:
        return """TableConfig(
                    name={},
                    key={},
                    created_at={},
                    updated_at={},
                    deleted_at={},
                    datetime_columns={},
                    is_full_refresh={}
                    )""".format(
            self.name,
            self.key,
            self.created_at,
            self.updated_at,
            self.deleted_at,
            self.datetime_columns,
            self.is_full_refresh,
        )

    @classmethod
    def parse(cls, config: Dict[str, Any]):
        (name, key, created_at, updated_at, deleted_at, datetime_columns) = (
            config["name"],
            config["key_column"],
            config.get("created_at"),
            config.get("updated_at"),
            config.get("deleted_at"),
            config.get("datetime_columns"),
        )
        return cls(name, key, created_at, updated_at, deleted_at, datetime_columns)

    def make_query(self, max_dates: DateTable):
        formatted_query = self.base_query.format(table=self.name)
        formatted_where = self._make_query_where(max_dates)
        if formatted_where is not None:
            final_sql = formatted_query + " " + formatted_where
        else:
            final_sql = formatted_query
        return final_sql

    def _make_query_where(self, max_dates: DateTable) -> Optional[str]:
        dates = max_dates._as_dict()
        if len(dates) > 0:
            max_dates_list = [
                "(date({name}) >= date('{value}'))".format(
                    name=dates[k].name, value=dates[k].value.isoformat()
                )
                for k in dates
            ]
            final_sql_where = "where " + " or ".join(max_dates_list)
            return final_sql_where

    def _dates_as_dict(self):
        as_dict = {
            "created_at": self.created_at,
            "updated_at": self.updated_at,
            "deleted_at": self.deleted_at,
        }
        valid_as_dict = {k: v for k, v in as_dict.items() if v is not None}
        return valid_as_dict

    def _get_max_dates(self, frame: DataFrame) -> DateTable:
        dates_as_dict = self._dates_as_dict()
        max_dates: Dict[str, Dict[str, Any]] = {}
        for date_name, date_column in dates_as_dict.items():
            series: Series = frame[date_column]
            max_date: Timestamp = series.max()
            max_date_datetime: datetime = max_date.to_pydatetime()
            max_dates[date_name] = {"name": date_column, "value": max_date_datetime}
        return DateTable.parse(max_dates)

In [16]:
@dataclass
class ReplicationConfig:
    tables: List[TableConfig]

    @classmethod
    def parse(cls, config: List[Dict[str, Any]]):
        tables = [TableConfig.parse(table_config) for table_config in config]
        return cls(tables)

In [17]:
@dataclass
class AuthConfig:
    protocol: str
    host: str
    port: str
    username: str
    password: str
    dbname: str

    @classmethod
    def parse(cls, config: Dict[str, str]):
        (protocol, host, port, username, password, dbname) = (
            config["protocol"],
            config["host"],
            config["port"],
            config["username"],
            config["password"],
            config["dbname"],
        )
        return cls(protocol, host, port, username, password, dbname)

    def _as_dict(self):
        return {
            "protocol": self.protocol,
            "host": self.host,
            "port": self.port,
            "username": self.username,
            "password": self.password,
            "dbname": self.dbname,
        }

In [18]:
@dataclass
class JobConfig:
    bucket: str
    base_path: str
    prefix: str

In [19]:
@dataclass
class ReplicationResult:
    table_config:TableConfig
    success:bool

    def make_body(self) -> Dict[str,Any]:
        body = {
            "name": self.table_config.name,
            "succeeded": self.success,
            "full_refresh": self.table_config.is_full_refresh,
                }
        return body

In [20]:
class SuccessResult(ReplicationResult):
    table_config: TableConfig
    old_length: int
    new_length: int
    max_dates: DateTable
    schema_change: bool
    success:bool

    def __init__(self, table_config:TableConfig, old_length:int,
            new_length:int, max_dates:DateTable, schema_change:bool,
            success:bool):
        self.old_length = old_length
        self.new_length = new_length
        self.max_dates = max_dates
        self.schema_change = schema_change
        super().__init__(table_config, success)

    def make_body(self) -> Dict[str, Any]:
        body = {
            "name": self.table_config.name,
            "succeeded": self.success,
            "old_length": self.old_length,
            "new_length": self.new_length,
            "query": self.table_config.make_query(self.max_dates),
            "schema_chaged": self.schema_change,
            "max_dates": self.max_dates,
            "full_refresh": self.table_config.is_full_refresh,
        }
        return body

In [21]:
@dataclass
class FailureResult(ReplicationResult):
    pass

In [22]:
@dataclass
class ReplicationSet:
    results: List[ReplicationResult]

    def make_report(self):
        body = [x.make_body() for x in self.results]
        return body

In [23]:
class ReplicationClient:
    def __init__(
        self,
        job_config: JobConfig,
        replication_config: ReplicationConfig,
        auth_config: AuthConfig,
    ) -> None:
        self.replication = replication_config
        self.auth = auth_config
        self.job = job_config
        self.engine = self._get_db_engine()
        self.s3_client = S3Client()

    def _get_db_engine(self) -> Engine:
        db_url = "{protocol}://{username}:{password}@{host}:{port}/{dbname}".format(
            **self.auth._as_dict()
        )
        db_engine = create_engine(db_url)
        return db_engine

    def _get_sink_key(self, table_config: TableConfig) -> str:
        key = f"{self.job.base_path}/{self.job.prefix}{table_config.name}"
        return key

    def get_sink(self, table_config: TableConfig) -> S3Sink:
        key = self._get_sink_key(table_config)
        bucket = self.job.bucket
        mode = "overwrite" if table_config.is_full_refresh else "append"
        sink = S3Sink(self.s3_client, bucket, key, "parquet", mode)
        return sink

    def load_data(self, table_config: TableConfig, max_dates: DateTable) -> DataFrame:
        sql = table_config.make_query(max_dates)
        frame = read_sql(sql, self.engine)
        frame = self.convert_dates(table_config, frame)
        return frame

    def convert_dates(
        self, table_config: TableConfig, initial_frame: DataFrame
    ) -> DataFrame:
        frame = initial_frame.copy()
        datetime_cols = (
            table_config.datetime_columns
            if table_config.datetime_columns is not None
            else []
        )
        date_cols = datetime_cols + list(table_config._dates_as_dict().values())
        if len(date_cols) > 0:
            print("Converting datetime columns")
            frame = convert_dates(frame, date_cols)
        return frame

    def load_existing_data(
        self, table_config: TableConfig, sink: S3Sink
    ) -> Optional[DataFrame]:
        if sink.exists():
            frame = sink.read()
            if isinstance(frame, DataFrame):
                frame = self.convert_dates(table_config, frame)
                return frame
            else:
                raise RuntimeError("Return should be a DataFrame")
        return None

    def _compare_schema(self, old: Optional[DataFrame], new: DataFrame):
        if old is None:
            return True
        old_cols = old.columns
        new_cols = new.columns
        is_same_schema = sorted(old_cols) == sorted(new_cols)
        return is_same_schema

    def process_table(self, table_config: TableConfig) -> SuccessResult:
        print(f"Processing table {table_config}")
        sink = self.get_sink(table_config)
        #existing_frame = self.load_existing_data(table_config, sink)
        '''if existing_frame is not None:
            print("Existing data found")
            max_dates: DateTable = table_config._get_max_dates(existing_frame)
        else:'''
        print("No existing data")
        max_dates = DateTable(None, None, None)
        new_frame = self.load_data(table_config, max_dates)
        #has_same_schema = self._compare_schema(existing_frame, new_frame)
        if not new_frame.empty:
            print("Writting frame")
            sink.write(new_frame)
        else:
            print("Empty frame")
        result = SuccessResult(
            table_config,
            #len(existing_frame) if existing_frame is not None else 0,
            0,
            len(new_frame),
            max_dates,
            #not has_same_schema,
            False,
            True
        )
        return result

    def replicate(self) -> ReplicationSet:
        print("Starting Replication")
        tables = self.replication.tables
        results: List[ReplicationResult] = []
        for table_config in tables:
            try:
                result = self.process_table(table_config)
            except KeyboardInterrupt as e:
                raise e
            except BaseException as e:
                print(traceback.format_exc())
                result = FailureResult(table_config, False)
            print(result.make_body())
            results.append(result)
        return ReplicationSet(results)

In [28]:
def run():
    print("Starting Job Run")
    #platfarm_configs = [CONFIG["platfarm"], CONFIG["platfarm_cadastro"], CONFIG["platfarm_credito_gestao"], CONFIG["platfarm_credito_solicitacao"]]
    platfarm_configs = CONFIG["platfarm"],
    job_config = JobConfig(
        CONFIG["s3"]["bucket_name"],
        CONFIG["s3"]["base_path"],
        CONFIG["s3"]["table_prefix"],
    )
    for iten in platfarm_configs:
        auth_config = read_json_valt(iten["auth"])
        auth_config = AuthConfig.parse(auth_config)
        tables_config: List[Dict[str, Any]] = read_json_configs(iten["tables"])
        replication_config = ReplicationConfig.parse(tables_config)
        client = ReplicationClient(job_config, replication_config, auth_config)
        result = client.replicate()
        print(result.make_report())
        failures = [x for x in result.results if not x.success]
        if len(failures) > 0:
            print("Failures detected")
            sys.exit(1)


In [31]:
if __name__ == "__main__":
    run()

Starting Job Run
Starting Replication
Processing table TableConfig(
                    name=baixas,
                    key=id,
                    created_at=created_at,
                    updated_at=updated_at,
                    deleted_at=None,
                    datetime_columns=['data', 'vencimento'],
                    is_full_refresh=True
                    )
No existing data


  result = client.secrets.kv.v2.read_secret_version(mount_point='farm-bi-aws-glue-configs', path=path)


Converting datetime columns
Writting frame
{'name': 'baixas', 'succeeded': True, 'old_length': 0, 'new_length': 136185, 'query': 'select * from baixas', 'schema_chaged': False, 'max_dates': DateTable(created_at=None, updated_at=None, deleted_at=None), 'full_refresh': True}
[{'name': 'baixas', 'succeeded': True, 'old_length': 0, 'new_length': 136185, 'query': 'select * from baixas', 'schema_chaged': False, 'max_dates': DateTable(created_at=None, updated_at=None, deleted_at=None), 'full_refresh': True}]
