In [8]:
import warnings
import pandas as pd
import psycopg2, os
import mysql.connector
from decimal import Decimal
from dotenv import load_dotenv
from datetime import datetime, timedelta
warnings.filterwarnings("ignore")
load_dotenv()


True

In [9]:
# === CONFIG ===
MYSQL_CONFIG = {
                'host': os.getenv('MYSQL_HOST'),
                'user': os.getenv('MYSQL_USER'), 
                'port': int(os.getenv('MYSQL_PORT', 3306)),
                'password': os.getenv('MYSQL_PASSWORD'),
                'database': os.getenv('MYSQL_DB')
                }

REDSHIFT_CONFIG = {
                'host': os.getenv('REDSHIFT_HOST'),
                'user': os.getenv('REDSHIFT_USER'),
                'port': int(os.getenv('REDSHIFT_PORT', 5439)),
                'password': os.getenv('REDSHIFT_PASSWORD'),
                'database': os.getenv('REDSHIFT_DB')
                }


YESTERDAY = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
YESTERDAY_PLUS_ONE = YESTERDAY + timedelta(days=1)
YESTERDAY_DATE = YESTERDAY.strftime("%Y-%m-%d %H:%M:%S")
YESTERDAY_PLUS_ONE_DATE = YESTERDAY_PLUS_ONE.strftime("%Y-%m-%d %H:%M:%S")

print(f"START WINDOW: {YESTERDAY}")
print(f"END WINDOW: {YESTERDAY_PLUS_ONE}")


START WINDOW: 2025-04-14 00:00:00
END WINDOW: 2025-04-15 00:00:00


In [10]:

# === DB HELPERS ===
def connect_mysql():
    return mysql.connector.connect(**MYSQL_CONFIG)

def connect_redshift():
    return psycopg2.connect(**REDSHIFT_CONFIG)

def execute_and_insert(source_cursor, dest_cursor, source_query, insert_query, table_name):    # 2. Run MySQL Query and insert fresh data into Redshift
    source_cursor.execute(source_query, (YESTERDAY_DATE, YESTERDAY_PLUS_ONE_DATE))
    rows = source_cursor.fetchall()
    if len(rows) > 0:
        dest_cursor.execute(
            f"DELETE FROM {table_name} WHERE aggregation_date::date = %s",
            (YESTERDAY_DATE,)
        )
        print(f"Deleted {len(rows)} rows from {table_name} for {YESTERDAY_DATE}")
        print(f"Added {len(rows)} rows to {table_name}\n")
        for row in rows:
            dest_cursor.execute(insert_query, (*row, YESTERDAY))

In [11]:
# mysql_conn = connect_mysql()
# # redshift_conn = connect_redshift()
# mysql_cur = mysql_conn.cursor()
# # redshift_cur = redshift_conn.cursor()
# mysql_cur.execute(
#         """
#         SELECT status, COUNT(*) AS notification_count
#         FROM notifications
#         WHERE created_at >= %s AND created_at < %s
#         GROUP BY status;
#         """, (YESTERDAY_DATE, YESTERDAY_PLUS_ONE_DATE))
# rows = mysql_cur.fetchall()
# rows

In [12]:
def run_etl():
    mysql_conn = connect_mysql()
    redshift_conn = connect_redshift()
    mysql_cur = mysql_conn.cursor()
    redshift_cur = redshift_conn.cursor()

    # 1. Best Performing Sales Teams (filter by sales.timestamp)
    execute_and_insert(
        mysql_cur, redshift_cur,
        """
        SELECT t.name AS team_name, SUM(s.sale_amount) AS total_sales
        FROM sales s
        JOIN teams t ON s.team_id = t.team_id
        WHERE s.timestamp >= %s AND s.timestamp < %s
        GROUP BY t.name
        ORDER BY total_sales DESC;
        """,
        """
        INSERT INTO redshift_team_performance(team_name, total_sales, aggregation_date)
        VALUES (%s, %s, %s);
        """,
        "redshift_team_performance"
    )

    # 2. Product Sales Target (filter by sales.timestamp)
    execute_and_insert(
        mysql_cur, redshift_cur,
        """
        SELECT p.name AS product_name,
               SUM(s.sale_amount) AS total_sales,
               CASE WHEN SUM(s.sale_amount) >= 3000000 THEN 'Achieved' ELSE 'Not Achieved' END AS status
        FROM sales s
        JOIN products p ON s.product_id = p.product_id
        WHERE s.timestamp >= %s AND s.timestamp < %s
        GROUP BY p.name
        ORDER BY total_sales DESC;
        """,
        """
        INSERT INTO redshift_product_performance(product_name, total_sales, status, aggregation_date)
        VALUES (%s, %s, %s, %s);
        """,
        "redshift_product_performance"
    )

    # 3. Branch Sales Performance (filter by sales.timestamp)
    execute_and_insert(
        mysql_cur, redshift_cur,
        """
        SELECT b.name AS branch_name, SUM(s.sale_amount) AS total_sales
        FROM sales s
        JOIN branches b ON s.branch_id = b.branch_id
        WHERE s.timestamp >= %s AND s.timestamp < %s
        GROUP BY b.name
        ORDER BY total_sales DESC;
        """,
        """
        INSERT INTO redshift_branch_performance(branch_name, total_sales, aggregation_date)
        VALUES (%s, %s, %s);
        """,
        "redshift_branch_performance"
    )

    # 4. Most Notified Agents (filter by notifications.created_at)
    execute_and_insert(
        mysql_cur, redshift_cur,
        """
        SELECT a.name AS agent_name, COUNT(n.notification_id) AS notification_count
        FROM notifications n
        JOIN agents a ON n.recipient_id = a.agent_id
        WHERE n.created_at >= %s AND n.created_at < %s
        GROUP BY a.name
        ORDER BY notification_count DESC;
        """,
        """
        INSERT INTO redshift_agent_notifications(agent_name, notification_count, aggregation_date)
        VALUES (%s, %s, %s);
        """,
        "redshift_agent_notifications"
    )

    # 5. Notification Summary by Status (filter by notifications.created_at)
    execute_and_insert(
        mysql_cur, redshift_cur,
        """
        SELECT status, COUNT(*) AS notification_count
        FROM notifications
        WHERE created_at >= %s AND created_at < %s
        GROUP BY status;
        """,
        """
        INSERT INTO redshift_notification_summary(status, notification_count, aggregation_date)
        VALUES (%s, %s, %s);
        """,
        "redshift_notification_summary"
    )

    # Finalize
    redshift_conn.commit()
    mysql_conn.close()
    redshift_conn.close()
    print(f"✅ ETL Complete — Insights written for: {YESTERDAY_DATE}")

In [13]:
run_etl()

Deleted 1 rows from redshift_team_performance for 2025-04-14 00:00:00
Added 1 rows to redshift_team_performance

Deleted 1 rows from redshift_product_performance for 2025-04-14 00:00:00
Added 1 rows to redshift_product_performance

Deleted 1 rows from redshift_branch_performance for 2025-04-14 00:00:00
Added 1 rows to redshift_branch_performance

Deleted 1 rows from redshift_agent_notifications for 2025-04-14 00:00:00
Added 1 rows to redshift_agent_notifications

Deleted 1 rows from redshift_notification_summary for 2025-04-14 00:00:00
Added 1 rows to redshift_notification_summary

✅ ETL Complete — Insights written for: 2025-04-14 00:00:00


### DB Inference

In [9]:
import warnings
import psycopg2, os
import pandas as pd
import mysql.connector
from dotenv import load_dotenv
warnings.filterwarnings("ignore")
load_dotenv()


True

In [10]:
MYSQL_CONFIG = {
                'host': os.getenv('MYSQL_HOST'),
                'user': os.getenv('MYSQL_USER'), 
                'port': int(os.getenv('MYSQL_PORT', 3306)),
                'password': os.getenv('MYSQL_PASSWORD'),
                'database': os.getenv('MYSQL_DB')
                }

REDSHIFT_CONFIG = {
                'host': os.getenv('REDSHIFT_HOST'),
                'user': os.getenv('REDSHIFT_USER'),
                'port': int(os.getenv('REDSHIFT_PORT', 5439)),
                'password': os.getenv('REDSHIFT_PASSWORD'),
                'database': os.getenv('REDSHIFT_DB')
                }


In [None]:
def connect_mysql():
    return mysql.connector.connect(**MYSQL_CONFIG)

def connect_redshift():
    return psycopg2.connect(**REDSHIFT_CONFIG)

def print_redshift_tables():
    redshift_conn = connect_redshift()
    redshift_cur = redshift_conn.cursor()
    
    # Tables to query
    tables = [
            "redshift_team_performance",
            "redshift_product_performance", 
            "redshift_branch_performance",
            "redshift_agent_notifications",
            "redshift_notification_summary"
            ]
    
    # Query and print each table
    for table in tables:
        print(f"\n===== {table} =====")
        query = f"SELECT * FROM {table};"
        df = pd.read_sql_query(query, redshift_conn)
        print(df)
        
    # Close connections
    redshift_cur.close()
    redshift_conn.close()

# Print all tables
print_redshift_tables()



===== redshift_team_performance =====
        team_name  total_sales aggregation_date
0   Team 1 - B108    2768792.0       2025-04-10
1   Team 2 - B106    2265994.0       2025-04-10
2   Team 2 - B102    2262655.0       2025-04-10
3   Team 2 - B108    2250383.0       2025-04-10
4   Team 1 - B109    2111327.0       2025-04-10
5   Team 2 - B105    2055286.0       2025-04-10
6   Team 2 - B100    1781832.0       2025-04-10
7   Team 1 - B104    1672908.0       2025-04-10
8   Team 1 - B100    1581662.0       2025-04-10
9   Team 2 - B109    1165994.0       2025-04-10
10  Team 1 - B106    1160029.0       2025-04-10
11  Team 1 - B101    1123501.0       2025-04-10
12  Team 2 - B101    1119600.0       2025-04-10
13  Team 1 - B102    1071483.0       2025-04-10
14  Team 1 - B105     964250.0       2025-04-10
15  Team 2 - B104     561412.0       2025-04-10
16  Team 2 - B103     521365.0       2025-04-10
17  Team 1 - B103     505542.0       2025-04-10
18  Team 2 - B107     497047.0       2025-04-10
1

: 