<a href="https://colab.research.google.com/github/telnarayanan/ConstitutionofIndiaGPT/blob/main/mf_duckdb_dlt_api.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
!pip install "dlt[duckdb]" pandas requests tqdm openpyxl




In [4]:
import pandas as pd
import requests
from requests.adapters import HTTPAdapter
from requests.sessions import Session
from urllib3 import PoolManager
from concurrent.futures import ThreadPoolExecutor, as_completed
import dlt
from tqdm import tqdm

class CustomAdapter(HTTPAdapter):
    """An HTTP adapter with a larger pool size."""
    def __init__(self, *args, **kwargs):
        self._maxsize = kwargs.pop("maxsize", 10)  # Default to 10 if not specified
        super().__init__(*args, **kwargs)

    def init_poolmanager(self, connections, maxsize, block=False):
        # Use self._maxsize instead of maxsize from the arguments
        super().init_poolmanager(connections, self._maxsize, block)

# Initialize a session with a custom adapter to increase the connection pool size
session = Session()
adapter = CustomAdapter(maxsize=50)  # Adjust maxsize as needed based on your requirements
session.mount('http://', adapter)
session.mount('https://', adapter)

# Define the path to your Excel file
excel_path = 'AMFI-CODES.xlsx'

# Read scheme codes from the Excel file
df_scheme_codes = pd.read_excel(excel_path, usecols=['Amfi Code'])

# Initialize a DLT pipeline
pipeline = dlt.pipeline(
    pipeline_name="mutual_fund_details",
    destination="duckdb",  # Ensure the destination is correctly set up for your environment
    dataset_name="mf_dataset"
)

def fetch_data(scheme_code):
    """Fetch data for a given scheme code using the global session."""
    url = f"https://api.mfapi.in/mf/{scheme_code}"
    response = session.get(url)
    if response.status_code == 200:
        return response.json()
    return None

def fetch_scheme_metadata():
    """Fetch scheme metadata for all scheme codes, yield results as they come."""
    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = {executor.submit(fetch_data, scheme_code): scheme_code for scheme_code in df_scheme_codes['Amfi Code']}
        for future in tqdm(as_completed(futures), total=len(futures), desc="Fetching Metadata"):
            data = future.result()
            if data and 'meta' in data:
                yield data['meta']

def fetch_nav_data():
    """Fetch NAV data for all scheme codes, yield results as they come."""
    with ThreadPoolExecutor(max_workers=20) as executor:
        futures = {executor.submit(fetch_data, scheme_code): scheme_code for scheme_code in df_scheme_codes['Amfi Code']}
        for future in tqdm(as_completed(futures), total=len(futures), desc="Fetching NAV Data"):
            data = future.result()
            if data and 'data' in data:
                for record in data['data']:
                    nav_record = {**record, "scheme_code": data['meta']['scheme_code']}
                    yield nav_record

# Execute the pipeline for metadata
metadata_load_info = pipeline.run(fetch_scheme_metadata)
print("Metadata Load Info:", metadata_load_info)

# Execute the pipeline for NAV data
nav_data_load_info = pipeline.run(fetch_nav_data)
print("NAV Data Load Info:", nav_data_load_info)


Fetching Metadata: 100%|██████████| 7730/7730 [02:52<00:00, 44.72it/s]


Metadata Load Info: Pipeline mutual_fund_details load step completed in 0.94 seconds
1 load package(s) were loaded to destination duckdb and into dataset mf_dataset
The duckdb destination used duckdb:////content/mutual_fund_details.duckdb location to store data
Load package 1709193468.6773603 is LOADED and contains no failed jobs


Fetching NAV Data: 100%|██████████| 7730/7730 [07:06<00:00, 18.12it/s]


NAV Data Load Info: Pipeline mutual_fund_details load step completed in 8 minutes and 38.25 seconds
1 load package(s) were loaded to destination duckdb and into dataset mf_dataset
The duckdb destination used duckdb:////content/mutual_fund_details.duckdb location to store data
Load package 1709193646.1400876 is LOADED and contains no failed jobs


In [15]:
import duckdb

conn = duckdb.connect('mutual_fund_details.duckdb')  # Make sure the path is correct
result = conn.execute("SELECT * FROM information_schema.tables WHERE table_schema = 'main';").fetchall()
for row in result:
    print(row)
