In [1]:
import duckdb
import dotenv

In [2]:
env = dotenv.dotenv_values(".env")

In [None]:
%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False

In [3]:
con = duckdb.connect()

In [4]:
con.install_extension('httpfs')
con.load_extension('httpfs')

In [5]:
con.sql(f"""
CREATE OR REPLACE SECRET (
    TYPE r2,
    KEY_ID '{env['R2_ACCESS_KEY_ID']}',
    SECRET '{env['R2_SECRET_ACCESS_KEY']}',
    ACCOUNT_ID '{env['R2_ACCOUNT_ID']}'
);
""")

┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ true    │
└─────────┘

### Read and clean raw

In [6]:
from config.settings import R2

In [7]:
r2 = R2()

In [8]:
path = r2.get_full_path('raw',"*")

In [9]:
all_raw = con.read_parquet(path)

In [10]:
metadata = con.read_parquet("./data/processed/scheme_metadata/amfi_scheme_metadata.parquet")

AttributeError: '_duckdb.DuckDBPyConnection' object has no attribute 'to_parquet'

In [29]:
nav_data = all_raw.join(metadata.filter('is_growth_plan = TRUE').select('''scheme_code,
        amc_name,
        scheme_name,
        scheme_type,
        scheme_category,
        scheme_nav_name,
        scheme_category_level1,
        scheme_category_level2,
        is_direct,
        is_growth_plan'''), "scheme_code", how="inner").filter("nav IS NOT NULL").distinct()

In [28]:
con.sql("""select * from all_raw where scheme_code = 100033""").df()

Unnamed: 0,scheme_code,isin_growth,isin_dividend,nav,date
0,100033,INF209K01165,,902.96,2025-08-25
1,100033,INF209K01165,,891.03,2025-08-26
2,100033,INF209K01165,,883.65,2025-08-28
3,100033,INF209K01165,,878.99,2025-08-29
4,100033,INF209K01165,,890.70,2025-09-01
...,...,...,...,...,...
5316,100033,INF209K01165,,741.07,2023-12-20
5317,100033,INF209K01165,,747.29,2023-12-21
5318,100033,INF209K01165,,750.89,2023-12-22
5319,100033,INF209K01165,,753.34,2023-12-26


In [30]:
con.sql("""
        SELECT 
            scheme_code,
            date,
            nav,
            COUNT(*) as count
        FROM nav_data
        GROUP BY scheme_code, date,nav
        HAVING COUNT(*) > 1
        ORDER BY scheme_code, date
""")

┌─────────────┬──────────────┬────────┬───────┐
│ scheme_code │     date     │  nav   │ count │
│   varchar   │ timestamp_ns │ double │ int64 │
├─────────────┴──────────────┴────────┴───────┤
│                   0 rows                    │
└─────────────────────────────────────────────┘

In [None]:
# First, drop specified columns from metadata and prepare for join
metadata_clean = con.sql("""
    SELECT 
        scheme_code,
        amc_name,
        scheme_name,
        scheme_type,
        scheme_category,
        scheme_nav_name,
        scheme_category_level1,
        scheme_category_level2,
        is_direct,
        is_growth_plan
    FROM metadata
""")

# Join, remove duplicates, and forward fill NAV
result = con.sql("""
    WITH joined AS (
        SELECT 
            a.*,
            m.amc_name,
            m.scheme_name,
            m.scheme_type,
            m.scheme_category,
            m.scheme_nav_name,
            m.scheme_category_level1,
            m.scheme_category_level2,
            m.is_direct,
            m.is_growth_plan
        FROM all_raw a
        INNER JOIN metadata_clean m ON a.scheme_code = m.scheme_code
    ),
    -- Check for duplicates
    duplicates AS (
        SELECT scheme_code, date, COUNT(*) as cnt
        FROM joined
        GROUP BY scheme_code, date
        HAVING COUNT(*) > 1
    ),
    -- Forward fill NAV using window functions
    final AS (
        SELECT DISTINCT 
            j.*,
            LAST_VALUE(nav) IGNORE NULLS 
            OVER (PARTITION BY j.scheme_code ORDER BY date 
                  ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as nav_filled
        FROM joined j
    )
    SELECT * FROM final
    ORDER BY scheme_code, date
""")

# Show duplicate counts if any exist
duplicates = con.sql("""
    SELECT scheme_code, date, COUNT(*) as cnt
    FROM (
        SELECT DISTINCT scheme_code, date, nav 
        FROM all_raw a
        INNER JOIN metadata_clean m ON a.scheme_code = m.scheme_code
    )
    GROUP BY scheme_code, date
    HAVING COUNT(*) > 1
""")

print("Checking for duplicates:")
print(duplicates)
print("\nFinal result:")
print(result)

In [None]:
all_raw

In [None]:
test = con.read_parquet("r2://financial-data-store/clean/commodities_bhav_clean.parquet")

In [None]:
test.limit(5)

In [None]:
from pathlib import Path
import pandas as pd

In [None]:
def clean_cols(df):
    df.columns = df.columns.str.strip().str.lower().str.replace(' ','_').str.replace('.','_').str.replace('(','').str.replace(')','')
    return df
    
    

In [None]:
col_select = ['Scheme Code', 'ISIN Div Payout/ISIN Growth', 'ISIN Div Reinvestment', 'Net Asset Value', 'Date']

In [None]:
raw_csvs = pd.concat([pd.read_csv(f)[col_select] for f in Path('/home/modiis/projects/mf_data_pipeline/data/raw/nav_historical').glob('*.csv')], ignore_index=True)

In [None]:
raw_csvs.head()

In [None]:
raw_csvs.shape

In [None]:
raw_csvs.head()

In [None]:
clean_df = (raw_csvs
         .rename(columns={
            'Scheme Code': 'scheme_code',
            'ISIN Div Payout/ISIN Growth': 'isin_growth', 
            'ISIN Div Reinvestment': 'isin_dividend',
            'Net Asset Value': 'nav',
            'Date': 'date'
        })
         .query('scheme_code.notnull() & nav.notnull() & date.notnull()')
         .assign(scheme_code=lambda x: x['scheme_code'].astype(str),
                 date = lambda x: pd.to_datetime(x['date'], format='%d-%b-%Y', errors='coerce'),
                 nav = lambda x: pd.to_numeric(x['nav'], errors='coerce'),         
        )
        )

In [None]:
clean_df.head()

In [None]:
from config.settings import R2

In [None]:
r2 = R2()

In [None]:
path = r2.get_full_path('raw','nav_historical')

In [None]:
path

In [None]:
con.write_parquet(clean_df, path, overwrite=True)

In [None]:
FILE_PATH

In [None]:
con.read_parquet(r2)

In [32]:
# Register your DataFrame with DuckDB
con.register('scheme_metadata', metadata)

<_duckdb.DuckDBPyConnection at 0x77993c138bf0>

In [34]:
con.execute(f"""
    COPY scheme_metadata TO 'r2://financial-data-store/mutual_funds/clean/scheme_metadata.parquet' 
    (FORMAT PARQUET)
""")

<_duckdb.DuckDBPyConnection at 0x77993c138bf0>

In [35]:
con.read_parquet(pat)

NameError: name 'pat' is not defined