#### PART 1
#### Inspect the values for each financial metric

In [None]:
import psycopg2
import pandas as pd
from utils.utils import get_postgres_connection


def read_financial_metrics():
    """Simple function to read financial_metrics table and return as pandas DataFrame"""
    conn = get_postgres_connection()
    query = "SELECT * FROM raw.financial_metrics"
    df = pd.read_sql_query(query, conn)
    conn.close()
    return df


df = read_financial_metrics()
print(f"Read {len(df)} records from financial_metrics table")
print(df.head())


In [None]:
import pandas as pd
import numpy as np
from IPython.display import display_html

def _safe_numeric_series(s: pd.Series) -> pd.Series:
    """Keep only finite numeric values."""
    s = pd.to_numeric(s, errors="coerce")
    s = s.replace([np.inf, -np.inf], np.nan).dropna()
    return s

def _summary_with_custom_percentiles(s: pd.Series, percentiles=None) -> pd.DataFrame:
    if percentiles is None:
        # include 5th, 95th and deciles 10..90
        percentiles = [0.001, 0.01,0.05] + [i/100 for i in range(10, 100, 10)] + [0.95, 0.99, 0.999]
    desc = s.describe(percentiles=percentiles)
    out = desc.reset_index()
    out.columns = ["Statistic", "Value"]
    return out

def _bucket_counts_qcut(s: pd.Series, q=10) -> pd.DataFrame:
    """Counts per percentile bucket, safe against duplicates and constants."""
    n = len(s)
    if n == 0:
        return pd.DataFrame({"Percentile Bucket": [], "Count": []})
    if s.nunique(dropna=True) < 2:
        return pd.DataFrame({"Percentile Bucket": ["All values equal"], "Count": [n]})

    try:
        buckets = pd.qcut(s, q=q, labels=[f"{i*100//q}-{(i+1)*100//q}%" for i in range(q)], duplicates="drop")
        counts = buckets.value_counts().sort_index().reset_index()
        counts.columns = ["Percentile Bucket", "Count"]
        return counts
    except Exception:
        qs = np.linspace(0, 1, q + 1)
        edges = np.unique(s.quantile(qs).values)
        if len(edges) < 2:
            return pd.DataFrame({"Percentile Bucket": ["All values equal"], "Count": [n]})
        labels = [f"{int(qs[i]*100)}-{int(qs[i+1]*100)}%" for i in range(len(edges)-1)]
        buckets = pd.cut(s, bins=edges, include_lowest=True, labels=labels, duplicates="drop")
        counts = buckets.value_counts().sort_index().reset_index()
        counts.columns = ["Percentile Bucket", "Count"]
        return counts

def _display_side_by_side(dfs: list, titles: list):
    html = ""
    for df, title in zip(dfs, titles):
        html += (
            "<div style='display:inline-block; padding-right:30px; vertical-align:top;'>"
            f"<h3 style='margin:4px 0 8px 0;'>{title}</h3>"
            f"{df.to_html(index=False)}"
            "</div>"
        )
    display_html(html, raw=True)

def describe_all_numeric_with_buckets(df: pd.DataFrame, q=10, percentiles=None, max_cols=None):
    """
    For each numeric column:
      - show summary with custom percentiles (5th, 10..90, 95th by default)
      - show counts per percentile bucket
    """
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if max_cols is not None:
        num_cols = num_cols[:max_cols]

    if not num_cols:
        print("No numeric columns found.")
        return

    for col in num_cols:
        s = _safe_numeric_series(df[col])
        if s.empty:
            print(f"üì≠ Column: {col} ‚Äî skipped (no finite numeric values).")
            continue

        stats = _summary_with_custom_percentiles(s, percentiles=percentiles)
        buckets = _bucket_counts_qcut(s, q=q)

        print(f"üìä Column: {col} (n={len(s)})")
        _display_side_by_side(
            [stats, buckets],
            ["Summary Statistics", f"Counts per Percentile Bucket ({q} buckets)"]
        )
        print()  # spacing


# ---- Run it ----
# This will include 5th, 10..90, 95th percentiles + decile bucket counts
describe_all_numeric_with_buckets(df, q=10)


#### Part 2 - Query against Financial Metrics

In [106]:


HOST = "91.107.196.130"
DATABASE = "naro_index_db"
USERNAME = "naro_user"
PASSWORD = "naro_password"
PORT = 5432

def get_remote_postgres_connection():
    """Get a direct PostgreSQL connection to the remote database"""
    return psycopg2.connect(
        dbname=DATABASE,
        user=USERNAME,
        password=PASSWORD,
        host=HOST,
        port=PORT
    )

In [180]:
import psycopg2
import time
import pandas as pd
pd.set_option("display.max_rows", None)
from utils.utils import get_postgres_connection


def run_query(query):
    conn = get_remote_postgres_connection()
    cur = conn.cursor()

    start = time.time()
    df = pd.read_sql_query(query, conn)
    print(f"Query + fetch duration: {round(time.time() - start, 2)} seconds")

    conn.close()
    return df



def run_query_to_polars_simple(query):
    conn = get_postgres_connection()
    cur = conn.cursor()  # ‚ùå no server-side name

    cur.execute(query)

    if cur.description is None:
        print("‚ö†Ô∏è Query ran, but returned no result set.")
        cur.close()
        conn.close()
        return pl.DataFrame()

    columns = [desc[0] for desc in cur.description]
    rows = cur.fetchall()

    cur.close()
    conn.close()

    if rows:
        df = pl.DataFrame(rows, schema=columns)
        print(f"‚úÖ Loaded {df.shape[0]:,} rows into Polars (non-streaming)")
        return df
    else:
        print("‚ö†Ô∏è Query returned zero rows.")
        return pl.DataFrame()
    


max_constituents = 100
min_volume_eur = 100000
user_country_list = ['US']
user_sector_list = []
user_industry_list = []

user_sector_list = ['Technology']
#user_industry_list = ['Software - Application']


perc_list_all = [1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99, 100]
kpis = {
    'price_to_earnings_ratio_perc': [1, 10, 20, 30, 40, 50, 60, 70, 80, 90, 99, 100],
    'gross_profit_margin_perc': [],
    'net_profit_margin_perc': [],
}

kpi_filters = [
    f"AND {kpi} IN ({', '.join(map(str, values))})"
    for kpi, values in kpis.items() if values
]

kpi_sql = "\n".join(kpi_filters)
active_kpis = [kpi for kpi, values in kpis.items() if values]
kpi_cols        = ", ".join(active_kpis)
prep3_kpi_cols  = ", ".join(f"p3.{kpi}" for kpi in active_kpis)
prep6_kpi_cols  = ", ".join(f"prep6.{kpi}" for kpi in active_kpis)


#######

country_query = "SELECT DISTINCT country FROM raw.stock_info WHERE COUNTRY IS NOT NULL"
country_df = run_query(country_query)
country_list_all = list(country_df.country)
country_list_filtered = [country for country in country_list_all if country in user_country_list]
selected_countries = country_list_filtered if country_list_filtered else country_list_all
countries = "(" + ", ".join(f"'{c}'" for c in selected_countries) + ")"
#######

sector_query = "SELECT DISTINCT sector FROM raw.stock_info WHERE SECTOR IS NOT NULL"
sector_df = run_query(sector_query)
sector_list_all = list(sector_df.sector)
sector_list_filtered = [sector for sector in sector_list_all if sector in user_sector_list]
selected_sectors = sector_list_filtered if sector_list_filtered else sector_list_all
sectors = "(" + ", ".join(f"'{s}'" for s in selected_sectors) + ")"
#######

industry_query = "SELECT DISTINCT industry FROM raw.stock_info WHERE INDUSTRY IS NOT NULL"
industry_df = run_query(industry_query)
industry_list_all = list(industry_df.industry)
industry_list_filtered = [industry for industry in industry_list_all if industry in user_industry_list]
selected_industries = industry_list_filtered if industry_list_filtered else industry_list_all
industries = "(" + ", ".join(f"'{i}'" for i in selected_industries) + ")"
#######


query = f"""
WITH prep1 AS (
    SELECT symbol
    FROM raw.stock_info 
    WHERE country IN {countries}
      AND industry IN {industries}
      AND sector IN {sectors}
),

prep2 AS (
    SELECT *
    FROM clean.financial_metrics_perc
    WHERE 1=1
    {kpi_sql}
),

prep3 AS (
    SELECT 
        p2.symbol, p2.date, p2.fiscal_year, p2.period, p2.reported_currency,
        {kpi_cols}
    FROM prep2 p2
    INNER JOIN prep1 p1 ON p2.symbol = p1.symbol
),

prep4 AS (
    SELECT 
        hmc.*,
        {prep3_kpi_cols}
    FROM raw.historical_market_cap hmc
    INNER JOIN prep3 p3
      ON hmc.symbol = p3.symbol
     AND hmc.year = p3.fiscal_year
     AND hmc.quarter = p3.period
    WHERE hmc.last_quarter_date = TRUE
),

prep5 AS (
    SELECT 
        p4.*, 
        RANK() OVER (
            PARTITION BY p4.year, p4.quarter 
            ORDER BY p4.market_cap_eur DESC
        ) AS mcap_rank
    FROM prep4 p4
),

prep6 AS (
    SELECT *
    FROM prep5
    WHERE mcap_rank <= {max_constituents}
),

prep7 AS (
    SELECT *
    FROM raw.historical_price_volume
    WHERE volume_eur > {min_volume_eur}
),

prep8 AS (
    SELECT 
        hpv.*, 
        prep6.market_cap, 
        prep6.market_cap_eur,
        {prep6_kpi_cols},
        prep6.mcap_rank
    FROM prep7 hpv
    INNER JOIN prep6
      ON hpv.symbol = prep6.symbol
     AND hpv.year = prep6.year
     AND hpv.quarter = prep6.quarter
)

SELECT *
FROM prep8
"""




#df = run_query(query)
df = run_query_to_polars_simple(query)


# constituents_per_day = (
#     df.loc[df["last_quarter_date"] == True]
#       .groupby(["year", "quarter"])["symbol"]
#       .nunique()
#       .reset_index(name="unique_symbol_count")
#       .sort_values(["year", "quarter"], ascending=[False, False])  # both descending
# )


# print(constituents_per_day)


print(f"Read {len(df)} records from financial_metrics table")
#df[["date", "symbol", "currency", "close", "volume", "year", "quarter", "last_quarter_date", "close_eur", "market_cap_eur"]].head(10)
#df.head()
#print(constituents_per_day)


  df = pd.read_sql_query(query, conn)


Query + fetch duration: 0.07 seconds
Query + fetch duration: 0.14 seconds
Query + fetch duration: 0.14 seconds


  df = pl.DataFrame(rows, schema=columns)


‚úÖ Loaded 123,319 rows into Polars (non-streaming)
Read 123319 records from financial_metrics table


In [190]:
from datetime import date, datetime
df = df.select(["date", "symbol", "close_eur","close_usd", "market_cap_eur", "year", "quarter", "last_quarter_date"])

decimal_cols = [name for name, dtype in zip(df.columns, df.dtypes) if dtype.base_type() == pl.Decimal]

df = df.with_columns(
    [pl.col(c).cast(pl.Float64) for c in decimal_cols]
)

# Step 0: Parse the date
if df.schema["date"] == pl.Utf8:
    df = df.with_columns(pl.col("date").str.to_date())
elif df.schema["date"] != pl.Date:
    df = df.with_columns(pl.col("date").cast(pl.Date))

df = (
    df.sort(["symbol", "date"])
      .with_columns([
          pl.col("close_eur").forward_fill().over("symbol")
      ])
)

rebalance_df = df.filter(pl.col("last_quarter_date") == True)

rebalance_snapshots = (
    rebalance_df
    .group_by(["year", "quarter"])
    .agg([
        pl.first("date").alias("rebalance_date"),
        pl.col("symbol"),
        pl.col("market_cap_eur")
    ])
)


# Build weight dict per (year, quarter)
rebalance_weights = {}
for row in rebalance_snapshots.iter_rows(named=True):
    year, quarter = row["year"], row["quarter"]
    date = row["rebalance_date"]
    symbols, mcaps = row["symbol"], row["market_cap_eur"]
    total = float(sum(mcaps))
    weights = {s: float(m) / total for s, m in zip(symbols, mcaps)}
    rebalance_weights[(year, quarter)] = {"date": date, "weights": weights}

# Step 3: Build pivot table of prices (symbol per date)
pivot = (
    df.with_columns([
        pl.col("date").cast(pl.Utf8)
    ])
    .select(["date", "symbol", "close_eur"])
    .pivot(index="date", columns="symbol", values="close_eur", aggregate_function="first")
    .with_columns([
        pl.col("date").str.to_date()
    ])
    .sort("date")
)

pivot = pivot.sort("date")
pivot = pivot.with_columns(
    pl.all().exclude("date").fill_null(strategy="forward")
)

pl.Config.set_tbl_rows(None)
pl.Config.set_tbl_rows(-1)  
#df.filter(pl.col("symbol") == "1060.SR").head(300)
pivot.head(300)



# Step 4: Compute index value daily using Polars loop
date_series = pivot.select("date").to_series()
symbol_columns = [col for col in pivot.columns if col != "date"]

from typing import Union
from datetime import datetime, date

index_start_date: Union[date, str, None] = "2014-03-15"

# Parse to datetime.date if string
if isinstance(index_start_date, str):
    index_start_date = datetime.strptime(index_start_date, "%Y-%m-%d").date()

index_value = 1000.0
index_values = []
previous_prices = {}
active_weights = {}

for i in range(pivot.height):
    row = pivot.row(i)
    current_date = row[0]
    price_row = dict(zip(symbol_columns, row[1:]))

    # Get year and quarter
    y = current_date.year
    m = current_date.month
    q = {1: "Q1", 2: "Q1", 3: "Q1",
         4: "Q2", 5: "Q2", 6: "Q2",
         7: "Q3", 8: "Q3", 9: "Q3",
         10: "Q4", 11: "Q4", 12: "Q4"}[m]

    # Rebalance
    # if (y, q) in rebalance_weights and rebalance_weights[(y, q)]["date"] == current_date:
    #     active_weights = rebalance_weights[(y, q)]["weights"]
    #     previous_prices = {s: price_row[s] for s in active_weights if price_row[s] is not None}
    #     index_values.append((current_date, index_value))
    #     continue

    if (y, q) in rebalance_weights and rebalance_weights[(y, q)]["date"] == current_date:
        # Skip if we're still before index start
        if index_start_date and current_date < index_start_date:
            continue

        active_weights = rebalance_weights[(y, q)]["weights"]
        previous_prices = {s: price_row[s] for s in active_weights if price_row[s] is not None}

        # Rebase index to 1000 at first rebalance after start date
        if not index_values:
            index_value = 1000.0

        index_values.append((current_date, index_value))
        continue


    # Skip if not yet rebalanced
    if not previous_prices:
        index_values.append((current_date, index_value))
        continue

    # Calculate returns and update index
    returns = {}
    for symbol, prev_price in previous_prices.items():
        current_price = price_row.get(symbol, None)
        if current_price is not None and prev_price > 0:
            returns[symbol] = current_price / prev_price
        else:
            returns[symbol] = 1.0

    daily_return = sum(
        active_weights[s] * returns.get(s, 1.0) for s in active_weights
    )
    index_value *= daily_return

    # Update previous prices
    for s in previous_prices:
        new_price = price_row.get(s, None)
        if new_price is not None:
            previous_prices[s] = new_price

    index_values.append((current_date, index_value))

# Step 5: Output as Polars DataFrame
index_df = pl.DataFrame(index_values, schema=["date", "index_value"]).sort("date", descending=True)
index_df.sort("date")

  .pivot(index="date", columns="symbol", values="close_eur", aggregate_function="first")
  index_df = pl.DataFrame(index_values, schema=["date", "index_value"]).sort("date", descending=True)


date,index_value
date,f64
2013-12-02,1000.0
2013-12-03,1000.0
2013-12-04,1000.0
2013-12-05,1000.0
2013-12-06,1000.0
2013-12-09,1000.0
2013-12-10,1000.0
2013-12-11,1000.0
2013-12-12,1000.0
2013-12-13,1000.0


In [173]:
pivot.height

3271

In [157]:
import polars as pl

df = df.select(["date", "symbol", "close_eur","close_usd", "market_cap_eur", "year", "quarter", "last_quarter_date"])

# Step 0: Parse the date
if df.schema["date"] == pl.Utf8:
    df = df.with_columns(pl.col("date").str.to_date())
elif df.schema["date"] != pl.Date:
    df = df.with_columns(pl.col("date").cast(pl.Date))

# Step 1: Sort + Forward fill close price per symbol
df = (
    df.sort(["symbol", "date"])
      .with_columns([
          pl.col("close").forward_fill().over("symbol")
      ])
)

# Step 2: Get rebalance snapshot: one per (year, quarter)
rebalance_df = df.filter(pl.col("last_quarter_date") == True)

rebalance_snapshots = (
    rebalance_df
    .group_by(["year", "quarter"])
    .agg([
        pl.first("date").alias("rebalance_date"),
        pl.col("symbol"),
        pl.col("market_cap_eur")
    ])
)

# Build weight dict per (year, quarter)
rebalance_weights = {}
for row in rebalance_snapshots.iter_rows(named=True):
    year, quarter = row["year"], row["quarter"]
    date = row["rebalance_date"]
    symbols, mcaps = row["symbol"], row["market_cap_eur"]
    total = float(sum(mcaps))
    weights = {s: float(m) / total for s, m in zip(symbols, mcaps)}
    rebalance_weights[(year, quarter)] = {"date": date, "weights": weights}

# Step 3: Build pivot table of prices (symbol per date)
pivot = (
    df.with_columns([
        pl.col("date").cast(pl.Utf8)
    ])
    .select(["date", "symbol", "close"])
    .pivot(index="date", columns="symbol", values="close", aggregate_function="first")
    .with_columns([
        pl.col("date").str.to_date()
    ])
    .sort("date")
)

# Step 4: Compute index value daily using Polars loop
date_series = pivot.select("date").to_series()
symbol_columns = [col for col in pivot.columns if col != "date"]

index_value = 1000.0
index_values = []
previous_prices = {}
active_weights = {}

for i in range(pivot.height):
    row = pivot.row(i)
    current_date = row[0]
    price_row = dict(zip(symbol_columns, row[1:]))

    # Get year and quarter
    y = current_date.year
    m = current_date.month
    q = {1: "Q1", 2: "Q1", 3: "Q1",
         4: "Q2", 5: "Q2", 6: "Q2",
         7: "Q3", 8: "Q3", 9: "Q3",
         10: "Q4", 11: "Q4", 12: "Q4"}[m]

    # Rebalance
    if (y, q) in rebalance_weights and rebalance_weights[(y, q)]["date"] == current_date:
        active_weights = rebalance_weights[(y, q)]["weights"]
        previous_prices = {s: price_row[s] for s in active_weights if price_row[s] is not None}
        index_values.append((current_date, index_value))
        continue

    # Skip if not yet rebalanced
    if not previous_prices:
        index_values.append((current_date, index_value))
        continue

    # Calculate returns and update index
    returns = {}
    for symbol, prev_price in previous_prices.items():
        current_price = price_row.get(symbol, None)
        if current_price is not None and prev_price > 0:
            returns[symbol] = current_price / prev_price
        else:
            returns[symbol] = 1.0

    daily_return = sum(
        active_weights[s] * returns.get(s, 1.0) for s in active_weights
    )
    index_value *= daily_return

    # Update previous prices
    for s in previous_prices:
        new_price = price_row.get(s, None)
        if new_price is not None:
            previous_prices[s] = new_price

    index_values.append((current_date, index_value))

# Step 5: Output as Polars DataFrame
index_df = pl.DataFrame(index_values, schema=["date", "index_value"]).sort("date", descending=True)
print(index_df)


ColumnNotFoundError: unable to find column "close"; valid columns: ["date", "symbol", "close_eur", "close_usd", "market_cap_eur", "year", "quarter", "last_quarter_date"]

In [None]:
query2 = """
WITH prep1 AS (
    SELECT symbol
    FROM raw.stock_info 
    WHERE country IN ('US')
      AND industry IN (
        'Agricultural Inputs', 'Industrial Materials', 'Industrial - Pollution & Treatment Controls',
        'Manufacturing - Textiles', 'Auto - Recreational Vehicles', 'Discount Stores',
        'Insurance - Diversified', 'Entertainment', 'REIT - Diversified', 'Medical - Distribution',
        'Software - Infrastructure', 'Software - Application', 'Electrical Equipment & Parts',
        'Independent Power Producers', 'Asset Management - Global', 'Oil & Gas Drilling',
        'Leisure', 'Medical - Devices', 'Coal', 'Education & Training Services',
        'REIT - Industrial', 'Auto - Parts', 'Manufacturing - Metal Fabrication',
        'Financial - Credit Services', 'Broadcasting', 'Medical - Care Facilities',
        'Chemicals - Specialty', 'Asset Management', 'Insurance - Reinsurance',
        'Asset Management - Cryptocurrency', 'Travel Lodging', 'Financial - Diversified',
        'Beverages - Alcoholic', 'Banks', 'Hardware, Equipment & Parts',
        'Industrial - Distribution', 'Food Distribution', 'Construction Materials',
        'Medical - Equipment & Services', 'Beverages - Wineries & Distilleries', 'Silver',
        'Furnishings, Fixtures & Appliances', 'Specialty Business Services', 'Publishing',
        'Apparel - Manufacturers', 'Financial - Mortgages', 'Railroads',
        'Insurance - Property & Casualty', 'REIT - Retail', 'Telecommunications Services',
        'Internet Content & Information', 'Industrial - Infrastructure Operations',
        'Real Estate - Services', 'Packaging & Containers', 'Regulated Water',
        'Real Estate - Diversified', 'Agricultural - Commodities/Milling', 'Renewable Utilities',
        'Auto - Dealerships', 'Other Precious Metals', 'Food Confectioners',
        'Manufacturing - Tools & Accessories', 'Financial - Data & Stock Exchanges',
        'Advertising Agencies', 'Luxury Goods', 'Household & Personal Products',
        'REIT - Residential', 'Department Stores', 'Personal Products & Services',
        'Paper, Lumber & Forest Products', 'Staffing & Employment Services',
        'Residential Construction', 'Construction', 'Specialty Retail', 'Media & Entertainment',
        'Asset Management - Income', 'Real Estate - Development', 'Engineering & Construction',
        'REIT - Office', 'Gambling, Resorts & Casinos', 'Drug Manufacturers - General',
        'Home Improvement', 'Real Estate - General', 'Banks - Regional', 'Steel', 'Conglomerates',
        'Agricultural Farm Products', 'Medical - Healthcare Plans', 'Chemicals',
        'Oil & Gas Midstream', 'Oil & Gas Exploration & Production', 'Computer Hardware',
        'Banks - Diversified', 'Restaurants', 'Oil & Gas Integrated', 'Diversified Utilities',
        'REIT - Hotel & Motel', 'Electronic Gaming & Multimedia', 'Oil & Gas Energy',
        'Regulated Gas', 'Grocery Stores', 'Medical - Pharmaceuticals', 'Waste Management',
        'Trucking', 'Industrial - Capital Goods', 'Investment - Banking & Investment Services',
        'REIT - Mortgage', 'Auto - Manufacturers', 'Biotechnology', 'Technology Distributors',
        'Asset Management - Leveraged', 'Business Equipment & Supplies',
        'Apparel - Footwear & Accessories', 'Medical - Specialties', 'Apparel - Retail',
        'Regulated Electric', 'Industrial - Specialties', 'Financial - Conglomerates',
        'General Transportation', 'Oil & Gas Refining & Marketing', 'REIT - Specialty',
        'Agricultural - Machinery', 'Packaged Foods', 'REIT - Healthcare Facilities', 'Solar',
        'Copper', 'Gold', 'Medical - Instruments & Supplies', 'Insurance - Brokers',
        'General Utilities', 'Security & Protection Services', 'Tobacco',
        'Financial - Capital Markets', 'Manufacturing - Miscellaneous', 'Uranium',
        'Insurance - Life', 'Industrial - Machinery', 'Shell Companies', 'Consumer Electronics',
        'Medical - Diagnostics & Research', 'Communication Equipment',
        'Oil & Gas Equipment & Services', 'Aerospace & Defense', 'Travel Services',
        'Drug Manufacturers - Specialty & Generic', 'Aluminum',
        'Information Technology Services', 'Medical - Healthcare Information Services',
        'Consulting Services', 'Beverages - Non-Alcoholic', 'Environmental Services',
        'Marine Shipping', 'Rental & Leasing Services', 'Software - Services',
        'Asset Management - Bonds', 'Integrated Freight & Logistics', 'Insurance - Specialty',
        'Semiconductors', 'Airlines, Airports & Air Services'
      )
      AND sector IN (
        'Real Estate', 'Healthcare', 'Basic Materials', 'Energy', 'Industrials',
        'Consumer Cyclical', 'Utilities', 'Technology', 'Consumer Defensive',
        'Financial Services', 'Communication Services'
      )
),
prep2 AS (
    SELECT *
    FROM clean.financial_metrics_perc
    WHERE price_to_earnings_ratio_perc IN (60, 70, 80, 90, 100)
),
prep3 AS (
    SELECT 
        p2.symbol, p2.date, p2.fiscal_year, p2.period, p2.reported_currency,
        price_to_earnings_ratio_perc
    FROM prep2 p2
    INNER JOIN prep1 p1 ON p2.symbol = p1.symbol
),
prep4 AS (
    SELECT 
        hmc.*,
        p3.price_to_earnings_ratio_perc
    FROM raw.historical_market_cap hmc
    INNER JOIN prep3 p3
      ON hmc.symbol = p3.symbol
     AND hmc.year = p3.fiscal_year
     AND hmc.quarter = p3.period
    WHERE hmc.last_quarter_date = TRUE
),
prep5 AS (
    SELECT 
        p4.*, 
        RANK() OVER (
            PARTITION BY p4.year, p4.quarter 
            ORDER BY p4.market_cap_eur DESC
        ) AS mcap_rank
    FROM prep4 p4
),
prep6 AS (
    SELECT *
    FROM prep5
    WHERE mcap_rank <= 100
),
prep7 AS (
    SELECT *
    FROM raw.historical_price_volume
    WHERE volume_eur > 100000
),
prep8 AS (
    SELECT 
        hpv.*, 
        prep6.market_cap, 
        prep6.market_cap_eur,
        prep6.price_to_earnings_ratio_perc,
        prep6.mcap_rank
    FROM prep7 hpv
    INNER JOIN prep6
      ON hpv.symbol = prep6.symbol
     AND hpv.year = prep6.year
     AND hpv.quarter = prep6.quarter
)
SELECT *
FROM prep8;"""

#df = run_query(query2)