In [2]:
import pandas as pd
import polars as pl
import numpy as np
import json
from google.cloud import storage
from google.oauth2 import service_account
from google.cloud import bigquery
import os
#Set up Credential for GCS
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'/opt/airflow/code/src/sql-server-replicate-0ec74ad95b13.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'../sql-server-replicate-0ec74ad95b13.json'

credentials = service_account.Credentials.from_service_account_file(
    # r'/opt/airflow/code/src/sql-server-replicate-0ec74ad95b13.json'
    r'../sql-server-replicate-0ec74ad95b13.json'
)
bqclient = bigquery.Client(credentials=credentials, project=credentials.project_id,)

In [3]:
project_name = "sql-server-replicate"
bucket_name = "coding-pyspark-workspace-global"
bronze_path = "tu_mai/SilverLayer/EndOfDay"
silver_path = "tu_mai/SilverLayer"

gcs_path = f"gs://{bucket_name}/{bronze_path}/"

def get_bronze_data(gcs_path):
    """ Get EndOfDay raw data into a Dataframe
    Return:
        Dataframe about EndOfDay data
    """

    # Read the Delta table directly into a Polars DataFrame
    df = pl.read_delta(gcs_path)

    # print(df)
    return df

df = get_bronze_data(gcs_path)
df

open,high,low,close,volume,adj_high,adj_low,adj_close,adj_open,adj_volume,split_factor,dividend,name,exchange_code,asset_type,price_currency,symbol,exchange,date
f64,f64,f64,f64,i64,f64,f64,f64,f64,i64,i64,f64,str,str,str,str,str,str,date
30.13,30.25,30.04,30.04,39480,30.26,30.04,30.04,30.13,39480,1,0.0,"""SIMPLIFY HEALTH CARE ETF ""","""NYSE ARCA""","""ETF""","""USD""","""PINK""","""ARCX""",2025-02-24
244.808,248.86,244.59,247.1,51143732,248.86,244.42,247.1,244.925,51326396,1,0.0,"""Apple Inc""","""NASDAQ""","""Stock""","""USD""","""AAPL""","""XNAS""",2025-02-24
245.95,248.69,245.22,245.55,53119400,248.69,245.22,245.55,245.95,53197431,1,0.0,"""Apple Inc""","""NASDAQ""","""Stock""","""USD""","""AAPL""","""XNAS""",2025-02-21
30.385,30.44,30.13,30.13,14400,30.44,30.13,30.13,30.385,14374,1,0.0,"""SIMPLIFY HEALTH CARE ETF ""","""NYSE ARCA""","""ETF""","""USD""","""PINK""","""ARCX""",2025-02-21
30.3,30.57,30.3,30.49,33900,30.57,30.3,30.49,30.3,33949,1,0.0,"""SIMPLIFY HEALTH CARE ETF ""","""NYSE ARCA""","""ETF""","""USD""","""PINK""","""ARCX""",2025-02-20
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
31.02,31.03,30.4384,30.51,23954,31.07,30.43,30.51,31.02,23954,1,0.0,,,,,"""PINK""","""ARCX""",2024-12-12
247.945,250.8,246.2601,246.49,42854558,250.8,246.2601,246.49,247.96,45205814,1,0.0,,,,,"""AAPL""","""XNAS""",2024-12-11
31.33,31.3468,30.9887,31.12,65979,31.3468,30.97,31.12,31.33,65979,1,0.0,,,,,"""PINK""","""ARCX""",2024-12-11
246.89,248.21,245.34,247.77,36873800,248.21,245.34,247.77,246.89,36914806,1,0.0,,,,,"""AAPL""","""XNAS""",2024-12-10


In [4]:
df.columns

['open',
 'high',
 'low',
 'close',
 'volume',
 'adj_high',
 'adj_low',
 'adj_close',
 'adj_open',
 'adj_volume',
 'split_factor',
 'dividend',
 'name',
 'exchange_code',
 'asset_type',
 'price_currency',
 'symbol',
 'exchange',
 'date']

In [9]:
def validate_data(df):
    """ Validate polar Dataframe
    Args:
        df: Bronze polar dataframe
    Return:
        Validated polar dataframe
    """

    # Replace '.' with '_' in all column names
    df = df.rename({col: col.replace(".", "_") for col in df.columns})

    # Rename columns
    df = df.rename({"name": "company_name"})
    df = df.rename({"website": "company_website"})
    # Unnest struct columns
    df = df.unnest(["key_executives", "stock_exchanges"])

    # Convert salary column: remove "M", cast to float, multiply by 1e6, then cast to int using with_columns
    try:
        df = df.with_columns([
            (pl.col("salary")
            .str.replace("M", "")
            .cast(pl.Float64) * 1_000_000)
            .cast(pl.Int64)
            .alias("salary")
        ])
    except:
        df = df.with_columns(
            pl.col("salary").replace("",0)
        )

        df = df.with_columns([
            (pl.col("salary")
            .str.replace("M", "")
            .cast(pl.Float64) * 1_000_000)
            .cast(pl.Int64)
            .alias("salary")
        ])
    return df

tmp = validate_data(df)
tmp

company_name,ticker,item_type,sector,industry,exchange_code,full_time_employees,birth_year,exercised,function,name,salary,incorporation,incorporation_description,end_fiscal,phone,company_website,about,acronym1,alpha2_code,city,country,exchange_mic,exchange_name,website,address_city,address_street1,address_street2,address_postal_code,address_stateOrCountry,address_state_or_country_description,post_address_city,post_address_street1,post_address_street2,post_address_postal_code,post_address_stateOrCountry,post_address_state_or_country_description
str,str,str,str,str,str,str,str,str,str,str,i64,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str,str
"""MICROSOFT CORP""","""MSFT""","""equity""","""Technology""","""Software—Infrastructure""","""NMS""","""221000""","""""","""""","""Vice President of Investor Rel…","""Mr. Brett Iversen""",0,"""WA""","""WA""","""0630""","""425-882-8080""","""https://www.microsoft.com""","""Microsoft Corporation develops…","""IEX""","""US""","""New York""","""USA""","""IEXG""","""Investors Exchange""","""www.iextrading.com""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA"""
"""MICROSOFT CORP""","""MSFT""","""equity""","""Technology""","""Software—Infrastructure""","""NMS""","""221000""","""""","""""","""Vice President of Investor Rel…","""Mr. Brett Iversen""",0,"""WA""","""WA""","""0630""","""425-882-8080""","""https://www.microsoft.com""","""Microsoft Corporation develops…","""NASDAQ""","""US""","""NEW YORK""","""""","""XNAS""","""NASDAQ - ALL MARKETS""","""www.nasdaq.com""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA"""
"""MICROSOFT CORP""","""MSFT""","""equity""","""Technology""","""Software—Infrastructure""","""NMS""","""221000""","""""","""""","""Vice President of Investor Rel…","""Mr. Brett Iversen""",0,"""WA""","""WA""","""0630""","""425-882-8080""","""https://www.microsoft.com""","""Microsoft Corporation develops…","""NASDAQ""","""US""","""New York""","""USA""","""XNAS""","""NASDAQ Stock Exchange""","""www.nasdaq.com""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA"""
"""MICROSOFT CORP""","""MSFT""","""equity""","""Technology""","""Software—Infrastructure""","""NMS""","""221000""","""1966""","""""","""Corporate VP & Chief Accountin…","""Ms. Alice L. Jolla""",0,"""WA""","""WA""","""0630""","""425-882-8080""","""https://www.microsoft.com""","""Microsoft Corporation develops…","""IEX""","""US""","""New York""","""USA""","""IEXG""","""Investors Exchange""","""www.iextrading.com""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA"""
"""MICROSOFT CORP""","""MSFT""","""equity""","""Technology""","""Software—Infrastructure""","""NMS""","""221000""","""1966""","""""","""Corporate VP & Chief Accountin…","""Ms. Alice L. Jolla""",0,"""WA""","""WA""","""0630""","""425-882-8080""","""https://www.microsoft.com""","""Microsoft Corporation develops…","""NASDAQ""","""US""","""NEW YORK""","""""","""XNAS""","""NASDAQ - ALL MARKETS""","""www.nasdaq.com""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA"""
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""MICROSOFT CORP""","""MSFT""","""equity""","""Technology""","""Software—Infrastructure""","""NMS""","""221000""","""1959""","""""","""Pres & Vice Chairman""","""Mr. Bradford L. Smith LCA""",4660000,"""WA""","""WA""","""0630""","""425-882-8080""","""https://www.microsoft.com""","""Microsoft Corporation develops…","""NASDAQ""","""US""","""NEW YORK""","""""","""XNAS""","""NASDAQ - ALL MARKETS""","""www.nasdaq.com""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA"""
"""MICROSOFT CORP""","""MSFT""","""equity""","""Technology""","""Software—Infrastructure""","""NMS""","""221000""","""1959""","""""","""Pres & Vice Chairman""","""Mr. Bradford L. Smith LCA""",4660000,"""WA""","""WA""","""0630""","""425-882-8080""","""https://www.microsoft.com""","""Microsoft Corporation develops…","""NASDAQ""","""US""","""New York""","""USA""","""XNAS""","""NASDAQ Stock Exchange""","""www.nasdaq.com""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA"""
"""MICROSOFT CORP""","""MSFT""","""equity""","""Technology""","""Software—Infrastructure""","""NMS""","""221000""","""1967""","""""","""Chairman & CEO""","""Mr. Satya Nadella""",12680000,"""WA""","""WA""","""0630""","""425-882-8080""","""https://www.microsoft.com""","""Microsoft Corporation develops…","""IEX""","""US""","""New York""","""USA""","""IEXG""","""Investors Exchange""","""www.iextrading.com""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA"""
"""MICROSOFT CORP""","""MSFT""","""equity""","""Technology""","""Software—Infrastructure""","""NMS""","""221000""","""1967""","""""","""Chairman & CEO""","""Mr. Satya Nadella""",12680000,"""WA""","""WA""","""0630""","""425-882-8080""","""https://www.microsoft.com""","""Microsoft Corporation develops…","""NASDAQ""","""US""","""NEW YORK""","""""","""XNAS""","""NASDAQ - ALL MARKETS""","""www.nasdaq.com""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA""","""REDMOND""","""ONE MICROSOFT WAY""","""""","""98052-6399""","""WA""","""WA"""


In [79]:
import polars as pl

# Sample DataFrame with salary as string
df = pl.DataFrame({
    "salary": ["12.68M", "8.55M", "15.00M"]
})

# Convert salary column: remove "M", cast to float, multiply by 1e6, then cast to int using with_columns
df = df.with_columns([
    (pl.col("salary")
     .str.replace("M", "")
     .cast(pl.Float64) * 1_000_000)
    .cast(pl.Int64)
    .alias("salary")
])

print(df)


shape: (3, 1)
┌──────────┐
│ salary   │
│ ---      │
│ i64      │
╞══════════╡
│ 12680000 │
│ 8550000  │
│ 15000000 │
└──────────┘
