Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aws connectors #358

Merged
merged 29 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
71b2874
adding AWS Dataservices connectors utilities
Apr 2, 2024
a7341e0
Revert "adding AWS Dataservices connectors utilities"
Apr 2, 2024
a5c649d
feat: Adding utilities to connect AWS services like RDS
Apr 2, 2024
e303e9f
feat: Adding utilities to connect AWS services like RDS
Apr 4, 2024
0a8dab4
feat: Adding utilities to connect AWS services like RDS
Apr 5, 2024
779dcaa
feat: Adding utilities to connect AWS services like RDS
Apr 5, 2024
4877462
feat: Adding utilities to connect AWS services like RDS
Apr 5, 2024
5a12426
feat: Adding utilities to connect AWS services like RDS
Apr 5, 2024
a7ee12a
feat: Adding utilities to connect AWS services like RDS
Apr 5, 2024
37fe340
feat: Adding utilities to connect AWS services like RDS
Apr 5, 2024
446b5fa
feat: Adding utilities to connect AWS services like RDS
Apr 5, 2024
b9aef38
changing property values to be generic
SaiSharathReddy Apr 5, 2024
179db73
feat: Adding utilities to connect AWS services like RDS
Apr 8, 2024
35b5ae9
feat: Adding utilities to connect AWS services like RDS
Apr 8, 2024
c02396c
Fixing lint issues
Apr 8, 2024
c61ce0a
Fixing lint issues
Apr 8, 2024
e379c83
Fixing lint issues and PR comments
Apr 8, 2024
aca25ad
Updated functionality as per comments.
Apr 9, 2024
25cd0b0
removing cryptography package as this is not required in this PR
Apr 9, 2024
23d2118
moved RDS Config to AWS Utils
Apr 9, 2024
fc3e51e
added changes as per comments
Apr 9, 2024
5af858f
added changes as per comments
Apr 9, 2024
c4811f1
added changes as per comments
Apr 10, 2024
4597e76
added changes as per comments
Apr 10, 2024
9f847f4
feat: introduce stride in dataset (#360)
ab93 Apr 10, 2024
464eb44
feat: initial support for flattened vector in backtest (#361)
ab93 Apr 10, 2024
02cdd38
fix: remove print (#363)
ab93 Apr 11, 2024
b6b74f7
added changes as per comments
Apr 11, 2024
da865c1
Merge branch 'main' into aws_connectors
SaiSharathReddy Apr 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions numalogic/connectors/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ class ConnectorType(IntEnum):
redis = 0
prometheus = 1
druid = 2
rds = 3


@dataclass
Expand Down
3 changes: 3 additions & 0 deletions numalogic/connectors/rds/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from numalogic.connectors.rds._rds import RDSFetcher

__all__ = ["RDSFetcher"]
159 changes: 159 additions & 0 deletions numalogic/connectors/rds/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from abc import ABCMeta, abstractmethod
from typing import Optional
import pandas as pd
from numalogic.connectors.utils.aws.config import DatabaseServiceProvider, RDSConfig
from numalogic.connectors.utils.aws.boto3_client_manager import Boto3ClientManager
import logging
from numalogic.connectors._config import Pivot
import time

_LOGGER = logging.getLogger(__name__)


def format_dataframe(
df: pd.DataFrame,
query: str,
datetime_field_name: str,
group_by: Optional[list[str]] = None,
pivot: Optional[Pivot] = None,
) -> pd.DataFrame:
"""
Executes formatting operations on a pandas DataFrame.

Arguments
----------
df : pd.DataFrame
The input DataFrame to be formatted.
query : str
The SQL query used to retrieve the data.
datetime_field_name : str
The name of the datetime field in the DataFrame.
group_by : Optional[list[str]], optional
A list of column names to group the DataFrame by, by default None.
pivot : Optional[Pivot], optional
An optional Pivot object specifying the index, columns,
and values for pivoting the DataFrame, by default None.

Returns
-------
pd.DataFrame : The formatted DataFrame.

"""
_start_time = time.perf_counter()
df["timestamp"] = pd.to_datetime(df[datetime_field_name]).astype("int64") // 10**6
df.drop(columns=datetime_field_name, inplace=True)
if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)
_end_time = time.perf_counter() - _start_time
_LOGGER.info("RDS MYSQL Query: %s, Format time: %.4fs", query, _end_time)
return df


class RDSBase(metaclass=ABCMeta):
"""
class represents a data fetcher for RDS (Relational Database Service) connections. It
provides methods for retrieving the RDS token, getting the password, establishing a
connection, and executing queries.

Args:
- db_config (RDSConfig): The configuration object for the RDS connection.
- kwargs (dict): Additional keyword arguments.

"""

def __init__(self, db_config: RDSConfig, **kwargs):
self.kwargs = kwargs
self.db_config = db_config
self.connection = None
self.database_type = db_config.database_type
self.boto3_client_manager = Boto3ClientManager(self.db_config)

def get_rds_token(self) -> str:
"""
Generates an RDS authentication token using the provided RDS boto3 client.

Args:
- rds_boto3_client (boto3.client): The RDS boto3 client used to generate the
authentication token.

Returns
-------
str: The generated RDS authentication token.

"""
rds_client = self.boto3_client_manager.get_client(DatabaseServiceProvider.RDS)
return self.boto3_client_manager.get_rds_token(rds_client)

def get_password(self) -> str:
"""
Retrieves the password for the RDS connection.

If 'aws_rds_use_iam' is True, it calls the get_rds_token() method to generate the RDS
token. Otherwise, it returns the database password from the configuration.

Returns
-------
str: The password for the RDS connection.

"""
if self.db_config.aws_rds_use_iam:
_LOGGER.info("using aws_rds_use_iam to generate RDS Token")
return self.get_rds_token()

_LOGGER.info("using password from config to connect RDS Database")
return self.db_config.database_password

@abstractmethod
def get_connection(self):
"""
Establishes a connection to the RDS database.

This method is a placeholder and needs to be implemented in a subclass. It should handle
the logic for establishing a connection to the RDS database based on the provided
configuration.

Returns
-------
None

"""
raise NotImplementedError

@abstractmethod
def get_db_cursor(self, *args, **kwargs):
"""
Retrieves a database cursor for executing queries.

This method is a placeholder and needs to be implemented in a subclass. It should handle
the logic for retrieving a database cursor based on the established connection.

Returns
-------
None

"""
raise NotImplementedError

@abstractmethod
def execute_query(self, query: str) -> pd.DataFrame:
"""
Executes a query on the RDS database and returns the result as a pandas DataFrame.

Args:
query (str): The SQL query to be executed.

Returns
-------
pd.DataFrame: The result of the query as a pandas DataFrame.

"""
raise NotImplementedError
70 changes: 70 additions & 0 deletions numalogic/connectors/rds/_rds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Optional
from numalogic.connectors._base import DataFetcher
from numalogic.connectors._config import Pivot
from numalogic.connectors.rds._base import format_dataframe
from numalogic.connectors.utils.aws.config import RDSConfig
import logging
import pandas as pd
from numalogic.connectors.rds.db.factory import RdsFactory
import time

_LOGGER = logging.getLogger(__name__)


class RDSFetcher(DataFetcher):
"""
class is a subclass of DataFetcher and ABC (Abstract Base Class).
It is used to fetch data from an RDS (Relational Database Service) instance by executing
a given SQL query.

Attributes
----------
db_config (RDSConfig): The configuration object for the RDS instance.
fetcher (db.CLASS_TYPE): The fetcher object for the specific database type.

"""

def __init__(self, db_config: RDSConfig):
super().__init__(db_config.endpoint)
self.db_config = db_config
factory_object = RdsFactory()
self.fetcher = factory_object.get_db_handler(db_config.database_type.lower())(db_config)
_LOGGER.info("Executing for database type: %s", self.fetcher.database_type)

def fetch(
self,
query,
datetime_field_name: str,
pivot: Optional[Pivot] = None,
group_by: Optional[list[str]] = None,
) -> pd.DataFrame:
"""
Fetches data from the RDS instance by executing the given query.

Args:
query (str): The SQL query to be executed.
datetime_field_name (str): The name of the datetime field in the fetched data.
pivot (Optional[Pivot], optional): The pivot configuration for the fetched data.
Defaults to None.
group_by (Optional[list[str]], optional): The list of fields to group the
fetched data by. Defaults to None.

Returns
-------
pd.DataFrame: A pandas DataFrame containing the fetched data.
"""
_start_time = time.perf_counter()
df = self.fetcher.execute_query(query)
if df.empty or df.shape[0] == 0:
_LOGGER.warning("No data found for query : %s ", query)
return pd.DataFrame()

formatted_df = format_dataframe(
df, query=query, datetime_field_name=datetime_field_name, pivot=pivot, group_by=group_by
)
_end_time = time.perf_counter() - _start_time
_LOGGER.info("RDS Query: %s Fetch Time: %.4fs", query, _end_time)
return formatted_df

def raw_fetch(self, *args, **kwargs) -> pd.DataFrame:
raise NotImplementedError
Empty file.
34 changes: 34 additions & 0 deletions numalogic/connectors/rds/db/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import logging

from numalogic.connectors.utils.aws.config import DatabaseTypes
from numalogic.connectors.utils.aws.exceptions import UnRecognizedDatabaseTypeException

_LOGGER = logging.getLogger(__name__)


class RdsFactory:
"""class represents a factory for creating database handlers for different database types."""

@classmethod
def get_db_handler(cls, database_type: DatabaseTypes):
"""
Get the database handler for the specified database type.

Args:
- database_type (str): The type of the database.

Returns
-------
- The database handler for the specified database type.

Raises
------
- UnRecognizedDatabaseTypeException: If the specified database type is not supported.

"""
if database_type == DatabaseTypes.MYSQL:
from numalogic.connectors.rds.db.mysql_fetcher import MysqlFetcher

return MysqlFetcher

raise UnRecognizedDatabaseTypeException(f"database_type: {database_type} is not supported")
Loading
Loading