# read data from couchdb in paralell structure

# couch db connection

In [None]:
import pandas as pd
import couchdb
import requests
from io import StringIO
import math

In [None]:

db_name = <db>
couch = couchdb.Server(<couch server>)
if db_name in couch:
    db = couch[db_name]
    print(f"Connected to existing database: {db_name}")
else:
    db = couch.create(db_name)
    print(f"Created new database: {db_name}")

✅ Connected to existing database: oecd_health_expenditure


## read from couch

In [120]:
print(f"Connected to database: {db.name} ({len(db)} documents)")


Connected to database: oecd_health_expenditure (4570 documents)


# defaultdict(list) is a special type of dictionary in Python, provided by the collections module, that automatically creates an empty list as the value for any new key you access

In [121]:
from collections import defaultdict

structure_groups = defaultdict(list)
structure_groups


defaultdict(list, {})

In [122]:
for doc_id in db:
    doc = db[doc_id]
    if 'structure_name' in doc:
        structure_name = doc['structure_name']
        structure_groups[structure_name].append(doc)
        
        

In [None]:
structure_shortcuts = {
    'Gross fixed capital formation in the healthcare system': 'HealthGFCF',
    'Healthcare coverage': 'HC_Coverage',
    'Health expenditure and financing': 'HEF_Main',
    'Input costs for healthcare providers': 'HP_cost',
    'Revenues of healthcare financing schemes' :'HCF_Revenue'
}


In [None]:
structure_groups = {}
for doc_id in db:
    doc = db[doc_id]
    if 'structure_name' in doc:
        group = doc['structure_name']
        if group not in structure_groups:
            structure_groups[group] = []
        clean_doc = {k: v for k, v in doc.items() if k not in ['_id', '_rev']}
        structure_groups[group].append(clean_doc)

# debug for revenue

structure_groups

In [110]:
structure_groups.keys()

dict_keys(['Gross fixed capital formation in the healthcare system', 'Health expenditure and financing', 'Revenues of healthcare financing schemes', 'Input costs for healthcare providers', 'Healthcare coverage'])

# Create DataFrames with shortcut variable names


In [None]:
for group_name, shortcut in structure_shortcuts.items():
    df = pd.DataFrame(structure_groups.get(group_name, []))
    globals()[shortcut] = df  # This creates variables like HealthGFCF, HC_Coverage, etc.
    print(f"Created DataFrame: {shortcut} ({len(df)} rows)")



✅ Created DataFrame: HealthGFCF (1000 rows)
✅ Created DataFrame: HC_Coverage (1000 rows)
✅ Created DataFrame: HEF_Main (1000 rows)
✅ Created DataFrame: HP_cost (1000 rows)
✅ Created DataFrame: HCF_Revenue (570 rows)


In [68]:
HEF_Main.head()

Unnamed: 0,STRUCTURE,STRUCTURE_ID,STRUCTURE_NAME,ACTION,REF_AREA,Reference area,FREQ,Frequency of observation,MEASURE,Measure,...,OBS_STATUS,Observation status,OBS_STATUS2,Observation status 2,OBS_STATUS3,Observation status 3,UNIT_MULT,Unit multiplier,DECIMALS,Decimals
0,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA(1.0),Health expenditure and financing,I,USA,United States,A,Annual,EXP_HEALTH,Expenditure,...,,,,,,,0,Units,1,One
1,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA(1.0),Health expenditure and financing,I,USA,United States,A,Annual,EXP_HEALTH,Expenditure,...,,,,,,,0,Units,1,One
2,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA(1.0),Health expenditure and financing,I,IND,India,A,Annual,EXP_HEALTH,Expenditure,...,,,,,,,0,Units,1,One
3,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA(1.0),Health expenditure and financing,I,IND,India,A,Annual,EXP_HEALTH,Expenditure,...,,,,,,,0,Units,1,One
4,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA(1.0),Health expenditure and financing,I,IND,India,A,Annual,EXP_HEALTH,Expenditure,...,,,,,,,0,Units,1,One


# now based on STRUCTURE_NAME push data into kinesis in parallel from all 4 df

In [None]:
import boto3
import json
import time

kinesis = boto3.client(
    'kinesis',
    aws_access_key_id=<key_id>,
    aws_secret_access_key=<access_key>,
    region_name=<region>
)

stream_name = <kinesis stream_name>

# s3 bucket mapping

In [126]:
bucket_mapping = {
    'HealthGFCF': 'health-gfcf-data',
    'HC_Coverage': 'health-coverage-data',
    'HEF_Main': 'health-expenditure-data',
    'HP_cost': 'health-provider-costs',
    'HCF_Revenue':'health-financing-Revenue'
}


# limit to send

In [114]:
HealthGFCF = HealthGFCF.head(5)
HC_Coverage = HC_Coverage.head(5)
HEF_Main = HEF_Main.head(5)
HP_cost = HP_cost.head(5)
HCF_Revenue=HCF_Revenue.head(5)
HCF_Revenue

Unnamed: 0,structure,structure_id,structure_name,action,country_code,country_name,freq,frequency_of_observation,measure,unit_measure,...,asset_type,price_base,year,value,unit_mult,unit_multiplier,decimals,doc_type,dataset_name,processed_timestamp
0,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA_FS(1.0),Revenues of healthcare financing schemes,I,JPN,Japan,A,Annual,Revenues,PT_EXP_FS,...,Not applicable,Not applicable,2017,46.161,0,Units,One,health_financing_revenues,Revenues of Healthcare Financing Schemes,2025-04-21T13:20:25.756183
1,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA_FS(1.0),Revenues of healthcare financing schemes,I,PER,Peru,A,Annual,Revenues,PT_EXP_FS,...,Not applicable,Not applicable,2019,28.434,0,Units,One,health_financing_revenues,Revenues of Healthcare Financing Schemes,2025-04-21T13:20:25.756213
2,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA_FS(1.0),Revenues of healthcare financing schemes,I,PER,Peru,A,Annual,Revenues,PT_EXP_FS,...,Not applicable,Not applicable,2022,23.332,0,Units,One,health_financing_revenues,Revenues of Healthcare Financing Schemes,2025-04-21T13:20:25.756434
3,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA_FS(1.0),Revenues of healthcare financing schemes,I,CHN,China (People’s Republic of),A,Annual,Revenues,PT_EXP_FS,...,Not applicable,Not applicable,2017,29.028,0,Units,One,health_financing_revenues,Revenues of Healthcare Financing Schemes,2025-04-21T13:20:25.758545
4,DATAFLOW,OECD.ELS.HD:DSD_SHA@DF_SHA_FS(1.0),Revenues of healthcare financing schemes,I,CHN,China (People’s Republic of),A,Annual,Revenues,PT_EXP_FS,...,Not applicable,Not applicable,2021,27.21,0,Units,One,health_financing_revenues,Revenues of Healthcare Financing Schemes,2025-04-21T13:20:25.758568


In [127]:
DATASETS = {
    'HealthGFCF': HealthGFCF,
    'HC_Coverage': HC_Coverage,
    'HEF_Main': HEF_Main,
    'HP_cost': HP_cost,
    'HCF_Revenue':HCF_Revenue
}

#  # Initialize Kinesis client

In [None]:
import json
import boto3
from concurrent.futures import ThreadPoolExecutor

# Configuration
KINESIS_STREAM = <KINESIS_STREAM>
AWS_REGION = <AWS_REGION>



kinesis = boto3.client(
    'kinesis',
    aws_access_key_id=<key_id>,
    aws_secret_access_key=<access_key>,
    region_name=<region_name>
)

# send data

In [None]:
def send_dataset_to_kinesis(dataset_name, df, sample_size=None):
    print(f"Starting {dataset_name} stream ({len(df)} records)")
    
    if sample_size:
        df = df.head(sample_size)
        print(f"Running in sample mode (first {sample_size} records)")

    for idx, record in df.iterrows():
        try:
            payload = json.dumps(record.to_dict())            
            kinesis.put_record(
                StreamName=KINESIS_STREAM,
                Data=payload,
                PartitionKey=dataset_name  
            )
            
            if idx % 100 == 0:
                print(f"{dataset_name}: Sent record {idx}/{len(df)}")

        except Exception as e:
            print(f" {dataset_name} error at record {idx}: {str(e)}")
    
    print(f"Completed {dataset_name} stream")
with ThreadPoolExecutor(max_workers=4) as executor:
    futures = []
    for name, df in DATASETS.items():
        futures.append(
            executor.submit(
                send_dataset_to_kinesis,
                dataset_name=name,
                df=df,
                sample_size=None  
            )
        )
    
    for future in futures:
        future.result()

🚀 Starting HealthGFCF stream (1000 records)🚀 Starting HC_Coverage stream (1000 records)

🚀 Starting HEF_Main stream (1000 records)
🚀 Starting HP_cost stream (1000 records)
📤 HC_Coverage: Sent record 0/1000
📤 HEF_Main: Sent record 0/1000
📤 HealthGFCF: Sent record 0/1000
📤 HP_cost: Sent record 0/1000
📤 HC_Coverage: Sent record 100/1000
📤 HealthGFCF: Sent record 100/1000
📤 HEF_Main: Sent record 100/1000
📤 HP_cost: Sent record 100/1000
📤 HealthGFCF: Sent record 200/1000
📤 HEF_Main: Sent record 200/1000
📤 HC_Coverage: Sent record 200/1000
📤 HP_cost: Sent record 200/1000
📤 HealthGFCF: Sent record 300/1000
📤 HEF_Main: Sent record 300/1000
📤 HC_Coverage: Sent record 300/1000
📤 HP_cost: Sent record 300/1000
📤 HealthGFCF: Sent record 400/1000
📤 HEF_Main: Sent record 400/1000
📤 HP_cost: Sent record 400/1000
📤 HC_Coverage: Sent record 400/1000
📤 HealthGFCF: Sent record 500/1000
📤 HEF_Main: Sent record 500/1000
📤 HP_cost: Sent record 500/1000
📤 HC_Coverage: Sent record 500/1000
📤 HealthGFCF: Sent r

# debugging

# test with one record

In [None]:
rec = HealthGFCF.iloc[0]
payload = json.dumps(rec.to_dict())

kinesis.put_record(
    StreamName=KINESIS_STREAM,
    Data=payload,
    PartitionKey="random name"  
)


{'ShardId': 'shardId-000000000000',
 'SequenceNumber': '49662528673383861570870701425129547993620313263833088002',
 'ResponseMetadata': {'RequestId': 'd75c647e-9726-36df-b722-c82382f0d9d4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd75c647e-9726-36df-b722-c82382f0d9d4',
   'x-amz-id-2': 'sHjr1piQ7hEtFGplyvJFXwFNt8Gu1zsABV+LHtobfwOyEFPYxq3F4ZXki3NXShEIjb3+xbM7YJmeGGyTAugt7VMC3lBvETHX',
   'date': 'Sun, 20 Apr 2025 08:02:37 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '110',
   'connection': 'keep-alive'},
  'RetryAttempts': 0}}

In [40]:
readCheck()

{"user_id": 101, "location": "San Francisco", "temperature": 67.5, "timestamp": 1745098732.5864081}
{"user_id": 101, "location": "San Francisco", "temperature": 67.5, "timestamp": 1745099953.5145624}
{"user_id": 101, "location": "San Francisco", "temperature": 67.5, "timestamp": 1745100003.6269653}
{"user_id": 101, "location": "San Francisco", "temperature": 67.5, "timestamp": 1745105435.4470923}
{"STRUCTURE": "DATAFLOW", "STRUCTURE_ID": "OECD.ELS.HD:DSD_SHA@DF_SHA_HK(1.0)", "STRUCTURE_NAME": "Gross fixed capital formation in the healthcare system", "ACTION": "I", "REF_AREA": "ESP", "Reference area": "Spain", "FREQ": "A", "Frequency of observation": "Annual", "MEASURE": "CAPITAL_FORM", "Measure": "Gross fixed capital formation", "UNIT_MEASURE": "PT_B1GQ", "Unit of measure": "Percentage of GDP", "FINANCING_SCHEME": "_Z", "Financing scheme": "Not applicable", "FINANCING_SCHEME_REV": "_Z", "Revenues of financing schemes": "Not application", "FUNCTION": "_Z", "Health function": "Not applic

#  Test parallel load for sample 5 records

In [49]:
HealthGFCF = HealthGFCF.head(5)
HC_Coverage = HC_Coverage.head(5)
HEF_Main = HEF_Main.head(5)
HP_cost = HP_cost.head(5)


In [None]:
DATASETS = {
    'HealthGFCF': HealthGFCF,
    'HC_Coverage': HC_Coverage,
    'HEF_Main': HEF_Main,
    'HP_cost': HP_cost
}

In [50]:
HC_Coverage

Unnamed: 0,STRUCTURE,STRUCTURE_ID,STRUCTURE_NAME,ACTION,REF_AREA,Reference area,FREQ,Frequency of observation,MEASURE,Measure,...,TIME_PERIOD,Time period,OBS_VALUE,Observation value,OBS_STATUS,Observation status,UNIT_MULT,Unit multiplier,DECIMALS,Decimals
0,DATAFLOW,OECD.ELS.HD:DSD_HEALTH_PROT@DF_HEALTH_PROT(1.0),Healthcare coverage,I,NLD,Netherlands,A,Annual,HIC,Health insurance coverage,...,2010,,99.6,,E,Estimated value,0,Units,,
1,DATAFLOW,OECD.ELS.HD:DSD_HEALTH_PROT@DF_HEALTH_PROT(1.0),Healthcare coverage,I,NLD,Netherlands,A,Annual,HIC,Health insurance coverage,...,2011,,99.6,,E,Estimated value,0,Units,,
2,DATAFLOW,OECD.ELS.HD:DSD_HEALTH_PROT@DF_HEALTH_PROT(1.0),Healthcare coverage,I,NLD,Netherlands,A,Annual,HIC,Health insurance coverage,...,2012,,99.7,,E,Estimated value,0,Units,,
3,DATAFLOW,OECD.ELS.HD:DSD_HEALTH_PROT@DF_HEALTH_PROT(1.0),Healthcare coverage,I,NLD,Netherlands,A,Annual,HIC,Health insurance coverage,...,2013,,99.8,,E,Estimated value,0,Units,,
4,DATAFLOW,OECD.ELS.HD:DSD_HEALTH_PROT@DF_HEALTH_PROT(1.0),Healthcare coverage,I,NLD,Netherlands,A,Annual,HIC,Health insurance coverage,...,2014,,99.8,,E,Estimated value,0,Units,,
