# Cvm Daily Funds Data

In [31]:
from catalog_api.infrastructure.CkanApi import CkanApi
import pandas as pd
from datetime import datetime

from catalog_api.services.CatalogDataService import CatalogDataService

catalog_data = CatalogDataService()
portal = CkanApi("https://dados.cvm.gov.br")
organization = "br_gov_cvm"
dataset = "fi-doc-inf_diario"
table_name = "inf_diario_fi"
fi_doc_inf_diario = portal.get_package(dataset)

In [None]:
from datetime import timezone


def process_resource(resource):
    url = resource.url
    filename = url.split("/")[-1]
    filename_prefix = filename.split(".")[0]
    partition_full = filename_prefix.split("_")[-1]
    year = partition_full[:4]
    month = partition_full[4:6]
    if not year.isnumeric() or not month.isnumeric():
        return  # Skip invalid partitions
    partitions = {
        "year": year,
        "month": month,
    }
    bucket_last_updated = catalog_data.get_parquet_last_updated(
        organization, dataset, table_name, partitions
    ) or datetime(2000, 1, 1)
    resource_last_updated = datetime.fromisoformat(
        resource.created or resource.last_modified
    )
    if bucket_last_updated.replace(
        tzinfo=timezone.utc
    ) >= resource_last_updated.replace(tzinfo=timezone.utc):
        return  # Already processed
    df = pd.read_csv(url, sep=";", encoding="latin1", low_memory=False)
    df["DT_COMPTC"] = df["DT_COMPTC"].apply(
        lambda x: datetime.strptime(x, "%Y-%m-%d").date() if x else None
    )
    catalog_data.put_parquet_data(
        df, organization, dataset, table_name, partitions
    )


for resource in fi_doc_inf_diario.resources:  # type: ignore
    process_resource(resource)

Invalid partition: meta_inf_diario_fi
Already processed: 2025-05-24 03:39:51+00:00 2024-06-04 11:00:17.004707
Already processed: 2025-05-24 03:39:58+00:00 2024-07-03 11:00:07.298990
Already processed: 2025-05-24 03:40:06+00:00 2024-08-02 11:00:14.191154
Already processed: 2025-05-24 03:39:51+00:00 2024-06-04 11:00:17.004707
Already processed: 2025-05-24 03:39:58+00:00 2024-07-03 11:00:07.298990
Already processed: 2025-05-24 03:40:06+00:00 2024-08-02 11:00:14.191154
Already processed: 2025-05-24 03:40:13+00:00 2024-09-03 11:00:13.728621
Already processed: 2025-05-24 03:40:19+00:00 2024-10-02 11:00:12.689857
Already processed: 2025-05-24 03:40:26+00:00 2024-11-04 11:00:40.205869
Already processed: 2025-05-24 03:40:33+00:00 2024-12-03 11:00:16.441427
Already processed: 2025-05-24 03:40:13+00:00 2024-09-03 11:00:13.728621
Already processed: 2025-05-24 03:40:19+00:00 2024-10-02 11:00:12.689857
Already processed: 2025-05-24 03:40:26+00:00 2024-11-04 11:00:40.205869
Already processed: 2025-05

## Query the data

In [None]:
db = catalog_data.get_duckdb(organization, dataset, table_name)

# Query using date filtering (works with all data)
result_df = db.fetch_df("SELECT * FROM inf_diario_fi WHERE date_part('year', DT_COMPTC) = 2025 LIMIT 10")
result_df

Attempting glob pattern: s3://portela-dev-data/catalog-data/br_gov_cvm/fi-doc-inf_diario/inf_diario_fi/**/*.parquet
Using region: sa-east-1
✅ Successfully created view inf_diario_fi with glob pattern for partition pruning
✅ Successfully created view inf_diario_fi with glob pattern for partition pruning
Available columns: ['TP_FUNDO_CLASSE', 'CNPJ_FUNDO_CLASSE', 'ID_SUBCLASSE', 'DT_COMPTC', 'VL_TOTAL', 'VL_QUOTA', 'VL_PATRIM_LIQ', 'CAPTC_DIA', 'RESG_DIA', 'NR_COTST', 'year', 'month']

Sample data:
S3 bucket structure:
1: catalog-data/br_gov_cvm/fi-doc-inf_diario/inf_diario_fi/year=2024/month=05/data.parquet
2: catalog-data/br_gov_cvm/fi-doc-inf_diario/inf_diario_fi/year=2024/month=06/data.parquet
3: catalog-data/br_gov_cvm/fi-doc-inf_diario/inf_diario_fi/year=2024/month=07/data.parquet
4: catalog-data/br_gov_cvm/fi-doc-inf_diario/inf_diario_fi/year=2024/month=08/data.parquet
5: catalog-data/br_gov_cvm/fi-doc-inf_diario/inf_diario_fi/year=2024/month=09/data.parquet
6: catalog-data/br_gov

In [None]:
# Use the fixed catalog service for optimal performance
from catalog_api.services.CatalogDataService import CatalogDataService
catalog_data_fixed = CatalogDataService()

# Query using partition columns for efficient filtering
db = catalog_data_fixed.get_duckdb(organization, dataset, table_name)
result = db.fetch_df("SELECT * FROM inf_diario_fi WHERE year='2025' LIMIT 10")
result

Attempting glob pattern: s3://portela-dev-data/catalog-data/br_gov_cvm/fi-doc-inf_diario/inf_diario_fi/**/*.parquet
Using region: sa-east-1
✅ Successfully created view inf_diario_fi with glob pattern for partition pruning
✅ Successfully created view inf_diario_fi with glob pattern for partition pruning
🎉 SUCCESS! Query with partition columns works!
Found 10 records
Sample data:
🎉 SUCCESS! Query with partition columns works!
Found 10 records
Sample data:


Unnamed: 0,TP_FUNDO_CLASSE,CNPJ_FUNDO_CLASSE,ID_SUBCLASSE,DT_COMPTC,VL_TOTAL,VL_QUOTA,VL_PATRIM_LIQ,CAPTC_DIA,RESG_DIA,NR_COTST,year,month
0,FI,00.017.024/0001-53,,2025-01-02,1154771.86,37.228814,1151690.99,0.0,0.0,1,2025,1
1,FI,00.017.024/0001-53,,2025-01-03,1155366.09,37.247108,1152256.93,0.0,0.0,1,2025,1
2,FI,00.017.024/0001-53,,2025-01-06,1156032.89,37.263748,1152771.7,0.0,0.0,1,2025,1
3,FI,00.017.024/0001-53,,2025-01-07,1156632.81,37.282736,1153359.09,0.0,0.0,1,2025,1
4,FI,00.017.024/0001-53,,2025-01-08,1157164.63,37.298684,1150313.12,0.0,3539.35,1,2025,1
5,FI,00.017.024/0001-53,,2025-01-09,1157694.2,37.312915,1150752.0,0.0,0.0,1,2025,1
6,FI,00.017.024/0001-53,,2025-01-10,1158235.24,37.32707,1151188.56,0.0,0.0,1,2025,1
7,FI,00.017.024/0001-53,,2025-01-13,1158794.19,37.341599,1151636.64,0.0,0.0,1,2025,1
8,FI,00.017.024/0001-53,,2025-01-14,1159371.12,37.356708,1152102.59,0.0,0.0,1,2025,1
9,FI,00.017.024/0001-53,,2025-01-15,1159942.08,37.372401,1152586.59,0.0,0.0,1,2025,1


In [None]:
# Comprehensive functionality test
db = catalog_data_fixed.get_duckdb(organization, dataset, table_name)

# Test 1: Partition-based filtering (efficient)
year_2025 = db.fetch_df("SELECT COUNT(*) as count FROM inf_diario_fi WHERE year='2025'")
jan_2025 = db.fetch_df("SELECT COUNT(*) as count FROM inf_diario_fi WHERE year='2025' AND month='01'")

# Test 2: Date-based filtering (backward compatible)
date_2025 = db.fetch_df("SELECT COUNT(*) as count FROM inf_diario_fi WHERE date_part('year', DT_COMPTC) = 2025")

# Test 3: Partition statistics
partition_stats = db.fetch_df("""
    SELECT year, month, COUNT(*) as records 
    FROM inf_diario_fi 
    WHERE year='2025'
    GROUP BY year, month 
    ORDER BY month
    LIMIT 6
""")

# Test 4: Sample data with all columns
sample_data = db.fetch_df("""
    SELECT TP_FUNDO_CLASSE, CNPJ_FUNDO_CLASSE, DT_COMPTC, VL_TOTAL, year, month
    FROM inf_diario_fi 
    WHERE year='2025' AND month='01'
    LIMIT 5
""")

db.close()

# Display results
partition_stats

🎯 COMPREHENSIVE TEST OF ALL FUNCTIONALITY

1. ✅ Partition-based filtering (efficient):
Attempting glob pattern: s3://portela-dev-data/catalog-data/br_gov_cvm/fi-doc-inf_diario/inf_diario_fi/**/*.parquet
Using region: sa-east-1
✅ Successfully created view inf_diario_fi with glob pattern for partition pruning
   Records in year 2025: 2,517,343
✅ Successfully created view inf_diario_fi with glob pattern for partition pruning
   Records in year 2025: 2,517,343
   Records in January 2025: 560,594

2. ✅ Date-based filtering (compatible with all data):
   Records in January 2025: 560,594

2. ✅ Date-based filtering (compatible with all data):
   Records in 2025 (by date): 2,517,343

3. ✅ Partition statistics:
   Monthly breakdown for 2025:
   2025-01: 560,594 records
   2025-02: 511,790 records
   2025-03: 487,653 records
   2025-04: 507,677 records
   2025-05: 352,112 records
   2025-06: 97,517 records

4. ✅ Sample data with partition columns:
   Records in 2025 (by date): 2,517,343

3. ✅ Par

## Query Performance Summary

### Efficient Querying
The system now supports efficient partition-based querying:

```python
# Partition filtering (recommended for performance)
result = db.fetch_df("SELECT * FROM inf_diario_fi WHERE year='2025' AND month='01'")

# Date filtering (backward compatible)
result = db.fetch_df("SELECT * FROM inf_diario_fi WHERE date_part('year', DT_COMPTC) = 2025")
```

### Available Columns
- All original data columns (TP_FUNDO_CLASSE, CNPJ_FUNDO_CLASSE, DT_COMPTC, VL_TOTAL, etc.)
- Partition columns: `year`, `month` for efficient filtering

In [None]:
# Additional query examples
db = catalog_data_fixed.get_duckdb(organization, dataset, table_name)

# Query specific month data
jan_data = db.fetch_df("""
    SELECT TP_FUNDO_CLASSE, COUNT(*) as records, AVG(VL_TOTAL) as avg_value
    FROM inf_diario_fi 
    WHERE year='2025' AND month='01'
    GROUP BY TP_FUNDO_CLASSE
    ORDER BY records DESC
    LIMIT 10
""")

db.close()
jan_data