In [7]:
def make_jupyter_cells_wide():
    from IPython.display import display, HTML
    display(HTML(
        '<style>'
            '#notebook { padding-top:0px !important; } '
            '.container { width:100% !important; } '
            '.end_space { min-height:0px !important; } '
        '</style>'
    ))
print('make_jupyter_cells_wide()')
make_jupyter_cells_wide()

make_jupyter_cells_wide()


In [4]:
"""Initialize and Function setup.
Includes ad hoc SQLite functions.
"""
import re
from datetime import datetime as dTime
import calendar
import datetime
from datetime import timedelta
import random
from collections import OrderedDict
from slack_sdk import WebClient
import pandas as pd
import decimal
from typing import Dict
from openpyxl.styles import Font, Alignment, PatternFill

def get_prev_weekday(the_dt=datetime.date.today()):
    if isinstance(the_dt, str):
        the_dt = dTime.strptime(the_dt[0:10], '%Y-%m-%d').date()
    prev_day = the_dt - timedelta(days=1)
    if prev_day.isoweekday() in set((6, 7)):
        prev_day -= datetime.timedelta(days=prev_day.isoweekday() % 5)
    return prev_day.strftime('%Y-%m-%d')

def get_next_weekday(the_dt=datetime.date.today()):
    if isinstance(the_dt, str):
        the_dt = dTime.strptime(the_dt[0:10], '%Y-%m-%d').date()
    next_day = the_dt + timedelta(days=1)
    if next_day.isoweekday() == 6:
        next_day += datetime.timedelta(days=2)
    elif next_day.isoweekday() == 7:
        next_day += datetime.timedelta(days=1)
    return next_day.strftime('%Y-%m-%d')

def get_yrmo_between(start_dt, end_dt):
    dates = [start_dt, end_dt]
    start, end = [datetime.datetime.strptime(_, "%Y-%m-%d") for _ in dates]
    return OrderedDict(
        ((start + datetime.timedelta(_)).strftime(r"%Y-%m"), None) for _ in range((end - start).days)).keys()

def get_yrmody_between(start_dt, end_dt):
    dates = [start_dt, end_dt]
    start, end = [datetime.datetime.strptime(_, "%Y-%m-%d") for _ in dates]
    return OrderedDict(
        ((start + datetime.timedelta(_)).strftime(r"%Y-%m-%d"), None) for _ in range((end - start).days)).keys()

def get_weekdays_between(start_dt, end_dt):
    new_dates = []
    for date in get_yrmody_between(start_dt, end_dt):
        date_dt = dTime.strptime(date, '%Y-%m-%d')
        if date_dt.isoweekday() not in (6,7):
            new_dates.append(date)
    return new_dates

def unix_to_date(u):
    if isinstance(u, str):
        ux = int(u)
    else:
        ux = u
    if len(str(ux)) == 13:
        ux = ux / 1000
    return dTime.utcfromtimestamp(ux)

def date_to_unix(d, time_only=False):
    """Return a time in seconds from 1/1/1970"""
    if time_only:
        return calendar.timegm(dTime.strptime(d, '%H:%M:%S').timetuple())
    elif isinstance(d, str) and len(d) == 10:
        return calendar.timegm(dTime.strptime(d, '%Y-%m-%d').timetuple())
    elif isinstance(d, str) and len(d) == 16:
        return calendar.timegm(dTime.strptime(f'{d}:00', '%Y-%m-%d %H:%M:%S').timetuple())
    elif isinstance(d, str) and len(d) == 19:
        return calendar.timegm(dTime.strptime(d, '%Y-%m-%d %H:%M:%S').timetuple())
    elif isinstance(d, dTime):
        return calendar.timegm(d.timetuple())
    else:
        raise 'Not a valid date or time'

def dtNow():
    a = dTime.now().strftime('%Y-%m-%d %H:%M:%S')
    print(a)

def post_slack(message, channel='#notify-all'):
    client = WebClient(token=sett.slack_token)
    if isinstance(channel, str):
        channels = [channel]
    else:
        channels = channel
    for ch in channels:
        res = client.chat_postMessage(channel=ch, text=message)
    return res

def is_datetime(my_string):
    # List of common date time formats to check against
    date_formats = ['%Y-%m-%d', '%m/%d/%Y', '%m-%d-%Y', '%d-%m-%Y', '%d/%m/%Y', '%Y%m%d', '%m%d%Y', '%d%m%Y']
    time_formats = ['%H:%M:%S', '%I:%M:%S %p', '%H%M%S']
    # Loop through formats and try to parse string as datetime
    if isinstance(my_string, datetime.datetime) or isinstance(my_string, datetime.date):
        return True
    if my_string is not None and isinstance(my_string, str):
        if len(my_string) >= 8:
            for date_fmt in date_formats:
                for time_format in time_formats:
                    for ty in ['date_only', 'time_only', 'both']:
                        if ty == 'date_only':
                            try:
                                dt = datetime.datetime.strptime(my_string, f'{date_fmt}')
                                return True
                            except ValueError:
                                pass
                        elif ty == 'time_only':
                            try:
                                dt = datetime.datetime.strptime(my_string, f'{time_format}')
                                return True
                            except ValueError:
                                pass
                        elif ty == 'both':
                            try:
                                dt = datetime.datetime.strptime(my_string, f'{date_fmt} {time_format}')
                                return True
                            except ValueError:
                                pass
                            try:
                                dt = datetime.datetime.strptime(my_string, f'{date_fmt}{time_format}')
                                return True
                            except ValueError:
                                pass
    return False

def get_type(my_string, pandas=False):
    """Will return an interpreted data type of any Py variable"""
    if my_string is None:
        return None
    elif is_datetime(my_string) and pandas:
        return 'datetime64[ns]'
    elif is_datetime(my_string):
        return 'datetime'
    elif isinstance(my_string, str):
        try:
            int_val = int(my_string)
            return 'int'
        except ValueError:
            try:
                float_val = float(my_string)
                return 'float'
            except ValueError:
                return 'str'
    elif isinstance(my_string, int):
        return 'int'
    elif isinstance(my_string, float):
        return 'float'
    elif isinstance(my_string, decimal.Decimal):
        return 'decimal'
    elif isinstance(my_string, list):
        return 'list'
    elif isinstance(my_string, dict):
        return 'dict'
    elif isinstance(my_string, tuple):
        return 'tuple'
    else:
        return type(my_string)

def list_to_df(my_list, first_row_is_header=True):
    """Give me a list and I will look at the first row of data and PROPERLY coerce
    and convert it to a DataFrame with proper column data types.
    """
    if first_row_is_header:
        column_names = my_list[0]
        data = my_list[1:]
    else:
        column_names = [f'col{x}' for x in range(len(my_list[0]))]
        data = my_list
    column_types = [get_type(x, True) for x in data[0]]
    column_dict = dict(zip(column_names, column_types))
    df = pd.DataFrame(data, columns=column_names)
    for col_name, data_type in column_dict.items():
        if data_type == 'decimal':
            df[col_name] = df[col_name].astype('float')
        elif data_type == 'datetime64[ns]':
            df[col_name] = pd.to_datetime(df[col_name], errors='ignore')
        else:
            df[col_name] = df[col_name].astype(data_type)
    return df

def get_non_blanks(data: Dict[any, any]):
    """Give me a dict and I will return only the keys that have valid data"""
    non_blank_keys = []
    if data != {}:
        for key in data.keys():
            if data[key] is not None:
                if get_type(data[key]) in ['int', 'float', 'decimal']:
                    non_blank_keys.append(key)
                elif get_type(data[key]) in ['str'] and data[key] != '':
                    non_blank_keys.append(key)
                elif get_type(data[key]) not in ['str']:
                    non_blank_keys.append(key)
    return non_blank_keys


def list_to_dict(rows, column_names=None):
    """Give me a list with the first row as column names
    and I will return a dict or a list of dicts.
    Can handle rows in as a one or multi dimensional list"""
    if column_names:
        rows_to_process = rows
    else:
        column_names = rows[0]
        rows_to_process = rows[1:]
    out = []
    if len(rows_to_process) > 0:
        if not isinstance(rows_to_process[0], list):
            rows_to_process = [rows_to_process]
        for row in rows_to_process:
            data = {}
            for i, col in enumerate(column_names):
                if i+1 > len(row):
                    row.append('')
                data[col] = row[i]
            out.append(data)
    return out

def fmt_num(the_obj, format_code=","):
    if type(the_obj) is not float and type(the_obj) is not int:
        try:
            the_num = float(the_obj)
        except:
            return '0'
    else:
        the_num = the_obj
    if the_obj in ['0', '']:
        return "{0:.0f}".format(the_num)
    if format_code in ['thous', ',']:
        return "{0:,.0f}".format(the_num)
    if format_code in ['thous1', ',1', '1']:
        return "{0:,.1f}".format(the_num)
    if format_code in ['thous2', ',2', '2']:
        return "{0:,.2f}".format(the_num)
    if format_code in ['pct', '%']:
        return "{0:.0%}".format(the_num)
    if format_code in ['pct2', '%2']:
        return "{0:.2%}".format(the_num)
    if format_code in ['.1', '.']:
        return "{:.1f}".format(the_num)
    if format_code in ['.2']:
        return "{:.2f}".format(the_num)
    if format_code in ['.3']:
        return "{:.3f}".format(the_num)
    if format_code in ['pct1', '%1']:
        return "{0:.1%}".format(the_num)
    return "{0:,.0f}".format(the_num)

def get_random_weekday(year, start=(1, 2), end=(12, 31)):
    if not isinstance(year, int):
        year = int(year)
    while True:
        # Generate a random date between January 1 and December 31
        start_date = datetime.date(year, start[0], start[1])
        end_date = datetime.date(year, end[0], end[1])
        random_date = start_date + datetime.timedelta(days=random.randint(0, (end_date - start_date).days))

        # Check if the date is a weekday (Monday to Friday)
        if random_date.weekday() < 5:
            return random_date

def get_random_day(year, start=(1, 2), end=(12, 31)):
    if not isinstance(year, int):
        year = int(year)
    start_date = datetime.date(year, start[0], start[1])
    end_date = datetime.date(year, end[0], end[1])
    random_date = start_date + datetime.timedelta(days=random.randint(0, (end_date - start_date).days))
    return random_date

def format_header(ws, row_num=1, fill_right=15):
    for col in range(1, fill_right):
        cell = ws.cell(row=row_num, column=col)
        cell.font = Font(bold=True)
        cell.alignment = Alignment(horizontal='center')

def convert_to_float(rows):
    """Convert all numbers in a list to Floats"""
    out_rows = []
    for row in rows:
        out_row = []
        for cell in row:
            try:
                float_cell = float(cell)
            except:
                float_cell = cell
            out_row.append(float_cell)
        out_rows.append(out_row)
    return out_rows

In [5]:
"""
Using pyscopg3 to connect to Postgres
v1.0 forked from util on 11/26/2023
pip install "psycopg[binary]"
poetry add "psycopg[binary]"
DB Helper functions for reporting
"""
import re
import psycopg
from dataclasses import dataclass
import logging
import datetime as dtTime
import time
from typing import List

logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

@dataclass
class Postgres:
    PostgresServerHost: str = "ugali"
    PostgresServerDatabase: str = "money"
    PostgresServerPort: str = "5432"
    PostgresServerUser: str = ""
    PostgresServerPassword: str = ""
    default_schema: str = "pyinv"

    def __post_init__(self):
        self.__connection = None

    @property
    def connection(self):
        if not self.__connection:
            self.__connection = psycopg.connect(
                host=self.PostgresServerHost,
                port=self.PostgresServerPort,
                user=self.PostgresServerUser,
                password=self.PostgresServerPassword,
                dbname=self.PostgresServerDatabase,
                sslmode="allow"
            )
        return self.__connection

    @property
    def curs(self):
        _c = self.connection.cursor()
        _c.execute(f"SET SEARCH_PATH TO {self.default_schema},public")
        return _c

    def cursor(self):
        _c = self.connection.cursor()
        _c.execute(f"SET SEARCH_PATH TO {self.default_schema},public")
        return _c

    def truncate_table(self, table_name):
        sql = f"TRUNCATE TABLE {table_name}"
        with self.curs as curs:
            try:
                curs.execute(sql)
                self.connection.commit()
                return 1
            except (Exception, psycopg.DatabaseError) as error:
                logger.error(f"Error: {error}")
                self.connection.rollback()
                print(f"[money]Postgres error {sql}")
                return -1, error

    @property
    def jdbc_url(self) -> str:
        return (
            f"jdbc:postgresql://{self.PostgresServerHost}:{self.PostgresServerPort}"
            f"/{self.PostgresServerDatabase}"
            f"?user={self.PostgresServerUser}&password={self.PostgresServerPassword}"
        )

    def get_df(self, sql: str):
        """Pass in a SQL statement and I will return a DataFrame with Column Headers"""
        data = self.get_rows(sql, True)
        return list_to_df(data)

    def get_rows(self, sql: str, return_headers=True) -> List[List[str]]:
        if '&amp;' in sql:
            logger.error(f"Possible problem with SQL text &amp; found {sql}")
            print(f"[money]Possible problem with SQL text &amp; found {sql}")
        with self.curs as curs:
            try:
                curs.execute(sql)
                self.connection.commit()
                if return_headers:
                    rows = [[desc[0] for desc in curs.description]]
                else:
                    rows = []
                for row in curs:
                    rows.append(list(row))
                return rows
            except (Exception, psycopg.DatabaseError) as error:
                logger.error(f"Error: {error} {sql}")
                self.connection.rollback()
                print(f"[money]Postgres error {sql}")
                return -1

    def get_col(self, sql: str, return_headers=True) -> List[str]:
        rows = self.get_rows(sql, False)
        return [x[0] for x in rows]

    def strip_alpha(self, theString):
        # Strip characters, return a number
        if type(theString) is str:
            theString2 = '0' + (''.join(x for x in theString if x.isdigit()))
            return float(theString2)
        elif isinstance(theString, dtTime.date) or isinstance(theString, dtTime.date):
            return float(theString.strftime('%Y%m%d%H%M%S'))
        elif theString is None or theString == '':
            return float(0)
        else:
            return float(theString)

    def remove_trailing_semicolon(self, sql):
        if ';' not in sql:
            return sql
        sql = re.sub(r'[\s\n\r\t]+$', '', sql)
        return sql.strip()[:-1]

    def get_num(self, sql: str):
        """Give me SQL and I will return a single cell number"""
        with self.curs as curs:
            if ' LIMIT ' not in sql.upper():
                sql2 = f'{self.remove_trailing_semicolon(sql)} LIMIT 1'
            else:
                sql2 = sql
            try:
                curs.execute(sql2)
                self.connection.commit()
                result = curs.fetchone()
                if result is None:
                    return 0
                elif len(list(result)) >= 1 and result[0] is None:
                    return 0
                elif type(result[0]) is str:
                    return float(self.strip_alpha(result[0]))
                else:
                    return result[0]

            except (Exception, psycopg.DatabaseError) as error:
                logger.error(f"Error: {error} {sql}")
                self.connection.rollback()
                print(f"[money]Postgres error {sql}")
                return -1

    def get_str(self, sql: str) -> str:
        """Give me SQL and I will return a single cell string"""
        with self.curs as curs:
            if ' LIMIT ' not in sql.upper():
                sql2 = f'{self.remove_trailing_semicolon(sql)} LIMIT 1'
            else:
                sql2 = sql
            try:
                curs.execute(sql2)
                self.connection.commit()
                result = curs.fetchone()
                if result is None:
                    return ''
                elif type(result[0]) is not str:
                    return str(result[0])
                else:
                    return result[0]

            except (Exception, psycopg.DatabaseError) as error:
                logger.error(f"Error: {error} {sql}")
                self.connection.rollback()
                print(f"[money]Postgres error {sql}")
                return ''

    def exec_sql(self, sql: str, commit=True):
        with self.curs as curs:
            try:
                curs.execute(sql)
                if commit:
                    self.connection.commit()
            except (Exception, psycopg.DatabaseError) as error:
                logger.error(f"Error: {error} {sql}")
                self.connection.rollback()
                print(f"[money]Postgres error {sql}")
                return -1, error
            res = ''
            try:
                res = curs.fetchone()
            except (Exception, psycopg.DatabaseError) as error:
                pass
            return 1, res

    def update_rows(self, sql: str, data: List[any], commit=True):
        """Updating a table setting some columns = '' based on the Primary Key `id`
            Use `execute`
            Hardcode the static `SET` values
            Pass in a list of keys to `ANY(%s)` converted to a set without a second element
            `.curs.execute(sql, (list_of_ids, ))`"""
        with self.curs as curs:
            try:
                res = self.curs.execute(sql, (data,))
                if commit:
                    self.connection.commit()
            except (Exception, psycopg.DatabaseError) as error:
                logger.error(f"Error: {error}")
                self.connection.rollback()
                print(f"[money]Postgres error {sql}")
                return -1, error
            res = ''
            try:
                res = curs.fetchone()
            except (Exception, psycopg.DatabaseError) as error:
                pass
            return 1, res

    def insert_rows(self, sql: str, data, commit=True):
        if sql.count('%s') != len(data[0]):
            logger.warning(f"SQL Insert {sql.count('%s')=} does not have proper number of columns as data being passed {len(data)=}")
        with self.curs as curs:
            try:
                curs.executemany(sql, data)
                if commit:
                    self.connection.commit()
            except (Exception, psycopg.DatabaseError) as error:
                logger.error(f"Error: {error}")
                self.connection.rollback()
                print(f"[money]Postgres error {sql}")
                return -1, error
            res = ''
            try:
                res = curs.fetchone()
            except (Exception, psycopg.DatabaseError) as error:
                pass
            return 1, res

    def insert_df(self, df, table):
        """
        Using psycopg2.extras.execute_values() to insert the dataframe
        """
        # Create a list of tupples from the dataframe values
        tuples = [tuple(x) for x in df.to_numpy()]
        # Comma-separated dataframe columns
        cols = ','.join(list(df.columns))
        # SQL query to execute
        sql = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
        cur = self.connection.cursor()
        try:
            res = psycopg.execute_values(cur, sql, tuples)
            self.connection.commit()
        except (Exception, psycopg.DatabaseError) as error:
            print("[money]Postgres error: %s" % error)
            print(table, cols)
            print(tuples[0:10])
            self.connection.rollback()
            cur.close()
            return 1
        cur.close()
        return res

    def get_fks(self, table: str):
        """Give me a table and I will return all the Foreign Key constraints for that table"""
        #Need to replace with this SQL
        """select 
  (select r.relname from pg_class r where r.oid = c.conrelid) as table, conname,pg_get_constraintdef(oid),
  (select array_agg(attname) from pg_attribute 
   where attrelid = c.conrelid and ARRAY[attnum] <@ c.conkey) as col, 
  (select r.relname from pg_class r where r.oid = c.confrelid) as ftable 
from pg_constraint c 
where c.confrelid = (select oid from pg_class where relname = 'wms_distributioncenter');"""
        sql = f"""SELECT c.conname as fk_name
              FROM pg_constraint c
                INNER JOIN pg_namespace AS sh ON sh.oid = c.connamespace
                INNER JOIN (SELECT oid, unnest(conkey) as conkey FROM pg_constraint) con ON c.oid = con.oid
                INNER JOIN pg_class tbl ON tbl.oid = c.conrelid
                INNER JOIN pg_attribute col ON (col.attrelid = tbl.oid AND col.attnum = con.conkey)
                INNER JOIN pg_class referenced_tbl ON c.confrelid = referenced_tbl.oid
                INNER JOIN pg_namespace AS referenced_sh ON referenced_sh.oid = referenced_tbl.relnamespace
                INNER JOIN (SELECT oid, unnest(confkey) as confkey FROM pg_constraint) conf ON c.oid = conf.oid
                INNER JOIN pg_attribute referenced_field ON (referenced_field.attrelid = c.confrelid AND referenced_field.attnum = conf.confkey)
            WHERE c.contype = 'f' and lower(tbl.relname)='{table.lower()}' ORDER BY 1"""
        return self.get_col(sql)

    def kill_pids(self, db_name):
        """Check for running processes other than admin/dba and kill them.
        Used just before ETL processes"""
        sql_find = f"""SELECT DISTINCT pid FROM pg_stat_activity 
          WHERE datname = '{db_name}' AND backend_type = 'client backend' AND usename NOT IN ('ods_service','rdsadmin') AND usename IS NOT NULL"""
        pids_to_kill = self.get_col(sql_find)
        if len(pids_to_kill) > 0:
            sql_user = """SELECT datname||'-'||usename||'-'||application_name user_info FROM pg_stat_activity"""
            logger.warning(f'Active Postgres users; killing PIDs')
            for pid in pids_to_kill:
                user_info = self.get_str(f'{sql_user} WHERE pid = {pid}')
                logger.warning(f'Killing PID {pid} user info: {user_info}')
                self.exec_sql(f"""SELECT pg_cancel_backend({pid});""")
            time.sleep(30)
            pids_to_kill = self.get_col(sql_find)
            if len(pids_to_kill) > 0:
                logger.warning(f'Active Postgres users; FORCE killing PIDs')
                for pid in pids_to_kill:
                    user_info = self.get_str(f'{sql_user} WHERE pid = {pid}')
                    logger.warning(f'FORCE Killing PID {pid} user info: {user_info}')
                    self.exec_sql(f"""SELECT pg_terminate_backend({pid});""")

In [10]:
class Setting():
    PostgresServerHost: str = "ugali"
    PostgresServerPort: int = 5432
    PostgresServerDatabase: str = "pyinv"
    PostgresServerUser: str = "pyinv_admin"
    PostgresServerPassword: str = "boAWxvyKtX2mCoQ2"
sett = Setting()
print(f'Server: {sett.PostgresServerHost}')

Server: ugali
