In [2]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import boto3
from dotenv import load_dotenv
from sqlalchemy import create_engine

# Set up basic configuration
plt.style.use('seaborn-v0_8-whitegrid')
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

# Load environment variables
load_dotenv()

# Configure AWS credentials
aws_access_key = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
region = os.getenv('AWS_REGION')
bucket_name = os.getenv('S3_BUCKET_NAME')

# Initialize S3 client
try:
    s3_client = boto3.client(
        's3',
        aws_access_key_id=aws_access_key,
        aws_secret_access_key=aws_secret_key,
        region_name=region
    )

    # Download the file from S3
    local_file_path = "Financials_from_s3.csv"
    s3_file_path = "raw/financials/Financials.csv"

    print(f"Downloading {s3_file_path} from bucket {bucket_name}...")
    s3_client.download_file(
        bucket_name,
        s3_file_path,
        local_file_path
    )
    print(f"Successfully downloaded to {local_file_path}")
    
    # Load the data
    df = pd.read_csv(local_file_path)
    
except Exception as e:
    print(f"Error with S3: {e}")
    print("Attempting to load local file instead...")
    try:
        # Try to load directly from the local directory
        df = pd.read_csv("Financials.csv")
        print("Successfully loaded local file.")
    except Exception as e2:
        print(f"Error loading local file: {e2}")
        raise Exception("Cannot load data. Please check file paths and permissions.")

# Display basic information
print("DataFrame Shape:", df.shape)
print("\nColumn Names with trailing/leading spaces:")
for col in df.columns:
    print(f"- '{col}'")

# Fix column names (removing spaces)
df.columns = df.columns.str.strip()
print("\nCleaned Column Names:")
for col in df.columns:
    print(f"- '{col}'")

# Check for missing values
print("\nMissing Values Count:")
print(df.isnull().sum())

# Display data types before conversion
print("\nData Types Before Conversion:")
print(df.dtypes)

# Sample data before conversion
print("\nSample Data (first 5 rows) Before Conversion:")
print(df.head())

# Convert numeric columns to proper data types
numeric_columns = ['Units Sold', 'Manufacturing Price', 'Sale Price', 
                   'Gross Sales', 'Discounts', 'Sales', 'COGS', 'Profit', 
                   'Month Number', 'Year']

for col in numeric_columns:
    if col in df.columns:
        # Check if column contains strings with commas or currency symbols
        if df[col].dtype == 'object':
            print(f"Converting {col} from object to numeric...")
            # Remove commas, dollar signs, etc. and convert to numeric
            df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', '').str.replace('$', '').str.strip(), errors='coerce')
            # Report on conversion success
            na_count = df[col].isna().sum()
            if na_count > 0:
                print(f"  Warning: {na_count} values could not be converted to numeric in column '{col}'")
        else:
            # Ensure the column is numeric even if already a number-like dtype
            df[col] = pd.to_numeric(df[col], errors='coerce')

# Display data types after conversion
print("\nData Types After Conversion:")
print(df.dtypes)

# Convert date column to datetime
if 'Date' in df.columns:
    try:
        df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
        print("\nDate column converted to datetime")
    except Exception as e:
        print(f"Error converting Date column: {e}")

# Basic statistics
print("\nNumerical Statistics:")
numeric_cols = df.select_dtypes(include=['number']).columns
print(df[numeric_cols].describe())

# Data quality check - any duplicates?
print("\nDuplicate Rows:", df.duplicated().sum())

# Analyzing categorical columns
print("\nUnique Values in Categorical Columns:")
cat_cols = ['Segment', 'Country', 'Product', 'Discount Band']
for col in cat_cols:
    if col in df.columns:
        print(f"\n{col}: {df[col].nunique()} unique values")
        print(df[col].value_counts().head(5))

# Analyzing date information
if 'Date' in df.columns and pd.api.types.is_datetime64_dtype(df['Date']):
    print("\nDate Range Analysis:")
    print(f"Earliest date: {df['Date'].min()}")
    print(f"Latest date: {df['Date'].max()}")
    
    # Check for month and year columns
    if 'Month Number' in df.columns:
        print(f"Number of months: {df['Month Number'].nunique()}")
    if 'Year' in df.columns:
        print(f"Number of years: {df['Year'].nunique()}")
        
    # Distribution by year
    if 'Year' in df.columns:
        year_count = df.groupby('Year').size()
        print("\nRecords by Year:")
        print(year_count)

# Create visualizations
try:
    # 1. Monthly sales trend
    if all(col in df.columns for col in ['Year', 'Month Number', 'Sales']):
        plt.figure(figsize=(14, 6))
        monthly_sales = df.groupby(['Year', 'Month Number'])['Sales'].sum().reset_index()
        monthly_sales['Year-Month'] = monthly_sales['Year'].astype(str) + '-' + monthly_sales['Month Number'].astype(str).str.zfill(2)
        plt.plot(monthly_sales['Year-Month'], monthly_sales['Sales'], marker='o', linestyle='-')
        plt.title('Monthly Sales Trend', fontsize=14)
        plt.xticks(rotation=90)
        plt.xlabel('Year-Month')
        plt.ylabel('Total Sales')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig('monthly_sales_trend.png')
        print("\nSaved Monthly Sales Trend chart to monthly_sales_trend.png")
        plt.close()

    # 2. Sales by segment
    if all(col in df.columns for col in ['Segment', 'Sales']):
        plt.figure(figsize=(10, 6))
        segment_sales = df.groupby('Segment')['Sales'].sum().sort_values(ascending=False)
        print("\nSegment Sales Data:")
        print(segment_sales)
        ax = segment_sales.plot(kind='bar', color='skyblue')
        plt.title('Sales by Segment', fontsize=14)
        plt.xlabel('Segment')
        plt.ylabel('Total Sales')
        
        # Add data labels on bars
        for i, v in enumerate(segment_sales):
            ax.text(i, v * 1.01, f'{v:,.0f}', ha='center', fontsize=9)
            
        plt.tight_layout()
        plt.savefig('sales_by_segment.png')
        print("Saved Segment Sales chart to sales_by_segment.png")
        plt.close()

    # 3. Sales by country
    if all(col in df.columns for col in ['Country', 'Sales']):
        plt.figure(figsize=(14, 7))
        country_sales = df.groupby('Country')['Sales'].sum().sort_values(ascending=False).head(10)
        ax = country_sales.plot(kind='bar', color='lightgreen')
        plt.title('Sales by Country (Top 10)', fontsize=14)
        plt.xlabel('Country')
        plt.ylabel('Total Sales')
        
        # Add data labels on bars
        for i, v in enumerate(country_sales):
            ax.text(i, v * 1.01, f'{v:,.0f}', ha='center', fontsize=9)
            
        plt.tight_layout()
        plt.savefig('sales_by_country.png')
        print("Saved Country Sales chart to sales_by_country.png")
        plt.close()

    # 4. Product profitability
    if all(col in df.columns for col in ['Product', 'Profit']):
        plt.figure(figsize=(14, 7))
        product_profit = df.groupby('Product')['Profit'].sum().sort_values(ascending=False).head(10)
        ax = product_profit.plot(kind='bar', color='salmon')
        plt.title('Most Profitable Products (Top 10)', fontsize=14)
        plt.xlabel('Product')
        plt.ylabel('Total Profit')
        
        # Add data labels on bars
        for i, v in enumerate(product_profit):
            ax.text(i, v * 1.01, f'{v:,.0f}', ha='center', fontsize=9)
            
        plt.tight_layout()
        plt.savefig('product_profitability.png')
        print("Saved Product Profitability chart to product_profitability.png")
        plt.close()
        
    # 5. Discount Analysis
    if all(col in df.columns for col in ['Discount Band', 'Discounts', 'Gross Sales']):
        plt.figure(figsize=(12, 7))
        discount_data = df.groupby('Discount Band').agg({
            'Discounts': 'sum',
            'Gross Sales': 'sum'
        }).reset_index()
        discount_data['Discount Percentage'] = (discount_data['Discounts'] / discount_data['Gross Sales'] * 100).round(2)
        discount_data = discount_data.sort_values('Discount Percentage')
        
        ax = discount_data.plot(x='Discount Band', y='Discount Percentage', kind='bar', color='purple')
        plt.title('Average Discount Percentage by Discount Band', fontsize=14)
        plt.xlabel('Discount Band')
        plt.ylabel('Discount Percentage (%)')
        
        # Add data labels on bars
        for i, v in enumerate(discount_data['Discount Percentage']):
            ax.text(i, v * 1.01, f'{v:.2f}%', ha='center', fontsize=9)
            
        plt.tight_layout()
        plt.savefig('discount_analysis.png')
        print("Saved Discount Analysis chart to discount_analysis.png")
        plt.close()

except Exception as e:
    print(f"Error creating visualizations: {e}")
    print("Continuing with analysis...")

# Save cleaned data to CSV for further processing
cleaned_file_path = 'financials_cleaned.csv'
df.to_csv(cleaned_file_path, index=False)
print(f"\nCleaned data saved to {cleaned_file_path}")

# Connect to PostgreSQL and create raw table
# Update these with your actual PostgreSQL credentials
db_user = 'postgres'  # Change this to your actual username
db_password = 'your_password'  # Change this to your actual password
db_host = 'localhost'
db_name = 'financial_dwh'

try:
    engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}/{db_name}')
    
    # Create raw table in PostgreSQL
    df.to_sql('raw_financials', engine, schema='raw', if_exists='replace', index=False)
    print("\nData loaded to PostgreSQL raw.raw_financials table")
except Exception as e:
    print(f"Error connecting to PostgreSQL: {e}")
    print("Skipping database load, but analysis will continue.")

# Identify potential dimension and fact table columns
print("\nPotential Dimension Tables:")
print("1. dim_date (Date, Month Number, Month Name, Year)")
print("2. dim_product (Product, Manufacturing Price)")
print("3. dim_segment (Segment)")
print("4. dim_geography (Country)")
print("5. dim_discount (Discount Band)")

print("\nPotential Fact Table:")
print("fact_financial_transactions (Units Sold, Sale Price, Gross Sales, Discounts, Sales, COGS, Profit)")

# Preliminary analysis for SCD considerations
print("\nAnalysis for SCD Considerations:")

# Check if product prices vary for the same product
if 'Product' in df.columns and 'Manufacturing Price' in df.columns:
    product_price_variation = df.groupby('Product')['Manufacturing Price'].nunique()
    products_with_multiple_prices = (product_price_variation > 1).sum()
    print(f"\nProducts with multiple manufacturing prices: {products_with_multiple_prices}")
    
    if products_with_multiple_prices > 0:
        print("Some products have multiple manufacturing prices - consider SCD Type 2 for dim_product")
        # Show examples of products with varying prices
        products_to_check = product_price_variation[product_price_variation > 1].head(5).index.tolist()
        print("\nExamples of products with varying prices:")
        for product in products_to_check:
            prices = df[df['Product'] == product]['Manufacturing Price'].unique()
            print(f"Product: {product}, Prices: {prices}")

# Check if discount bands have consistent discount percentages
if 'Discount Band' in df.columns and 'Discounts' in df.columns and 'Gross Sales' in df.columns:
    discount_variation = df.groupby('Discount Band').agg({
        'Discounts': 'sum',
        'Gross Sales': 'sum'
    })
    discount_variation['Discount_Pct'] = (discount_variation['Discounts'] / discount_variation['Gross Sales'] * 100).round(2)
    
    print("\nDiscount percentages by discount band:")
    print(discount_variation[['Discount_Pct']])
    
    # Check if discount percentages align with band names
    print("\nAnalyzing if discount bands align with actual discount percentages:")
    for band, row in discount_variation.iterrows():
        print(f"Band: {band}, Actual Discount: {row['Discount_Pct']:.2f}%")

Downloading raw/financials/Financials.csv from bucket financial-data-modelling...
Successfully downloaded to Financials_from_s3.csv
DataFrame Shape: (700, 16)

Column Names with trailing/leading spaces:
- 'Segment'
- 'Country'
- ' Product '
- ' Discount Band '
- ' Units Sold '
- ' Manufacturing Price '
- ' Sale Price '
- ' Gross Sales '
- ' Discounts '
- '  Sales '
- ' COGS '
- ' Profit '
- 'Date'
- 'Month Number'
- ' Month Name '
- 'Year'

Cleaned Column Names:
- 'Segment'
- 'Country'
- 'Product'
- 'Discount Band'
- 'Units Sold'
- 'Manufacturing Price'
- 'Sale Price'
- 'Gross Sales'
- 'Discounts'
- 'Sales'
- 'COGS'
- 'Profit'
- 'Date'
- 'Month Number'
- 'Month Name'
- 'Year'

Missing Values Count:
Segment                0
Country                0
Product                0
Discount Band          0
Units Sold             0
Manufacturing Price    0
Sale Price             0
Gross Sales            0
Discounts              0
Sales                  0
COGS                   0
Profit           

<Figure size 1200x700 with 0 Axes>