In [7]:
# imports
from fastapi import FastAPI
#from dotenv import dotenv_values
from pymongo import MongoClient
from fastapi import APIRouter, Body, Request, Response, HTTPException, status
#from fastapi.encoders import jsonable_encoder
#from typing import List
import csv
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
import matplotlib.dates as mdates
import pickle 


In [5]:
# local functions
def dbConnection(dbCFG):
    # connect to base 
    app = FastAPI()
    app.mongodb_client = MongoClient(dbCFG["ATLAS_URI"])
    app.database = app.mongodb_client[dbCFG["DB_NAME"]]
    router = APIRouter()
    app.include_router(router)
    #bdbtr = list(Request.app.database["bodybattery"].find())
    
    app.mongodb_client.close()
    client =  MongoClient(dbCFG["ATLAS_URI"])
    data = client[dbCFG["DB_NAME"]]
    return data



In [3]:
# main function
def retrieve(listVarNames, dbCFG, namecsv):
    # 0. connect to dB. dictionary dbCFG provides all fields 
    db = dbConnection(dbCFG)
    for collection_name in listVarNames:
        collection = db[collection_name]
        data = list(collection.find())
        
        df = pd.DataFrame(data)
        
        # 2. save to CSV
        csv_filename = f"{namecsv}_{collection_name}.csv"
        df.to_csv(csv_filename, index=False)
        
        # 3. save to Pickle
        pkl_filename = f"{namecsv}_{collection_name}.pkl"
        with open(pkl_filename, 'wb') as f:
            pickle.dump(df, f)

        print(f"Saved {collection_name} to {csv_filename} and {pkl_filename}")

In [None]:
# call retrieve
pthLong = r"mongodb://prodain:iprolep5isProdainigma@128.140.109.4:27017,49.13.200.206:27017,128.140.102.237:27017/iprolepsis?replicaSet=iprolepsisrs"
 
dbCFG = {"ATLAS_URI": pthLong,  
        "DB_NAME": "iprolepsis" 
        }
# tuples (collection, varname)
tupsVarNames = [("bodybattery", 'bodyBattery') , 
                ('wellness', 'steps'),
                ('stresslevels','stressScore'),
                ('heartratereadings','beatsPerMinute'),
                ('wellness', 'activityType')
                ]

# keep just the collections
listVarNames = ["bodybattery", 
                'wellness',
                'stresslevels',
                'heartratereadings',
                ]

namecsv = 'Mar28'
retrieve(listVarNames, dbCFG, namecsv)

In [26]:
#a faster approach to retrieve large chunks of data
import pandas as pd
from pymongo import MongoClient

def dbConnection(cfg):
    client = MongoClient(cfg["ATLAS_URI"])
    return client[cfg["DB_NAME"]]

def retrieve(collections, dbCFG, nameprefix, chunksize=100_000):
    db = dbConnection(dbCFG)

    for collection_name in collections:
        print(f"\nRetrieving {collection_name}...")
        cursor = db[collection_name].find({}, projection={"_id": False}, batch_size=1000)

        chunk = []
        chunk_idx = 0
        total_docs = 0

        for doc in cursor:
            chunk.append(doc)
            if len(chunk) >= chunksize:
                df = pd.DataFrame(chunk)
                df.to_parquet(f"{nameprefix}_{collection_name}_part{chunk_idx}.parquet")
                print(f"  Saved chunk {chunk_idx} ({len(df)} rows)")
                total_docs += len(chunk)
                chunk = []
                chunk_idx += 1

        # Final chunk
        if chunk:
            df = pd.DataFrame(chunk)
            df.to_parquet(f"{nameprefix}_{collection_name}_part{chunk_idx}.parquet")
            print(f"  Saved final chunk {chunk_idx} ({len(df)} rows)")
            total_docs += len(chunk)

        print(f"✔ Done: {collection_name} — {total_docs} total rows saved.")
        cursor.close()

# Usage
# listVarNames = ["bodybattery",  "stresslevels", "heartratereadings", "wellness"]
listVarNames = ["wellness"]

dbCFG = {
    "ATLAS_URI": "mongodb://prodain:iprolep5isProdainigma@128.140.109.4:27017,49.13.200.206:27017,128.140.102.237:27017/iprolepsis?replicaSet=iprolepsisrs",
    "DB_NAME": "iprolepsis"
}

retrieve(listVarNames, dbCFG, "Mar29")



Retrieving wellness...
  Saved chunk 0 (100000 rows)
  Saved chunk 1 (100000 rows)
  Saved chunk 2 (100000 rows)
  Saved chunk 3 (100000 rows)
  Saved chunk 4 (100000 rows)
  Saved chunk 5 (100000 rows)
  Saved chunk 6 (100000 rows)
  Saved chunk 7 (100000 rows)
  Saved chunk 8 (100000 rows)
  Saved chunk 9 (100000 rows)
  Saved chunk 10 (100000 rows)
  Saved chunk 11 (100000 rows)
  Saved chunk 12 (100000 rows)
  Saved chunk 13 (100000 rows)
  Saved chunk 14 (100000 rows)
  Saved chunk 15 (100000 rows)
  Saved chunk 16 (100000 rows)
  Saved chunk 17 (100000 rows)
  Saved chunk 18 (100000 rows)
  Saved chunk 19 (100000 rows)
  Saved chunk 20 (100000 rows)
  Saved chunk 21 (100000 rows)
  Saved chunk 22 (100000 rows)
  Saved chunk 23 (100000 rows)
  Saved chunk 24 (100000 rows)
  Saved chunk 25 (100000 rows)
  Saved chunk 26 (100000 rows)
  Saved chunk 27 (100000 rows)
  Saved chunk 28 (100000 rows)
  Saved chunk 29 (100000 rows)
  Saved chunk 30 (100000 rows)
  Saved chunk 31 (100000 

In [20]:
# local functions
def dbConnection(cfg):
    client = MongoClient(cfg["ATLAS_URI"])
    return client[cfg["DB_NAME"]], client

def toyRetrieve(collections, dbCFG, nameprefix, sample_size=1000):
    db, client = dbConnection(dbCFG)
    print("Connected")
    import time
    start = time.time()
    with client.start_session() as session:
        for collection_name in collections:
            pipeline = [
                {"$sample": {"size": sample_size}},
                {"$project": {"_id": False}}
            ]
            print(f"Sampling {sample_size} from {collection_name}...")
            cursor = db[collection_name].aggregate(pipeline, session=session)
            
            data = list(cursor)
            print(f"Retrieved {len(data)} from {collection_name} in {time.time() - start:.2f}s")
            if not data:
                print(f"No data returned from {collection_name}")
                continue

            df = pd.DataFrame(data)
            df.to_parquet(f"{nameprefix}_{collection_name}_toy.parquet")
            df.to_csv(f"{nameprefix}_{collection_name}_toy.csv", index=False)

            print(f"Randomly sampled {len(df)} records from {collection_name} and saved to Parquet and CSV.")


toyRetrieve(listVarNames, dbCFG, "Toy")

Connected
Sampling 1000 from bodybattery...
Retrieved 1000 from bodybattery in 0.57s
Randomly sampled 1000 records from bodybattery and saved to Parquet and CSV.
Sampling 1000 from wellness...
Retrieved 1000 from wellness in 68.40s
Randomly sampled 1000 records from wellness and saved to Parquet and CSV.
Sampling 1000 from stresslevels...
Retrieved 1000 from stresslevels in 68.92s
Randomly sampled 1000 records from stresslevels and saved to Parquet and CSV.
Sampling 1000 from heartratereadings...
Retrieved 1000 from heartratereadings in 74.58s
Randomly sampled 1000 records from heartratereadings and saved to Parquet and CSV.


In [22]:
def profile_collection_sizes(db, collections):
    for name in collections:
        try:
            stats = db.command({"collStats": name})
            
            if stats.get("ok", 0) != 1:
                print(f"{name} → collStats failed: {stats.get('errmsg', 'Unknown error')}")
                continue

            print(f"\n{name}")
            print(f"  Document count: {stats.get('count', 'N/A')}")
            print(f"  Size (MB): {stats.get('size', 0) / 1e6:.2f}")
            print(f"  Storage size (MB): {stats.get('storageSize', 0) / 1e6:.2f}")
            print(f"  Avg doc size (KB): {stats.get('avgObjSize', 0) / 1024:.2f}")
            print(f"  Indexes (MB): {stats.get('totalIndexSize', 0) / 1e6:.2f}")
        
        except Exception as e:
            print(f"Error retrieving stats for '{name}': {e}")


db, _ = dbConnection(dbCFG)
profile_collection_sizes(db, ["bodybattery", "stresslevels", "wellness",'heartratereadings'])



bodybattery
  Document count: N/A
  Size (MB): 97.97
  Storage size (MB): 32.01
  Avg doc size (KB): 0.00
  Indexes (MB): 5.59

stresslevels
  Document count: N/A
  Size (MB): 108.36
  Storage size (MB): 52.20
  Avg doc size (KB): 0.00
  Indexes (MB): 5.54

wellness
  Document count: 25339441
  Size (MB): 8124.88
  Storage size (MB): 1152.06
  Avg doc size (KB): 0.31
  Indexes (MB): 1198.37

heartratereadings
  Document count: N/A
  Size (MB): 116.67
  Storage size (MB): 49.95
  Avg doc size (KB): 0.00
  Indexes (MB): 7.36


In [None]:
# clean raw data steps

# BatterySaver Periods?

# filter step rate?

In [None]:
# === DYNAMIC TSFresh SETTINGS ===
def get_fc_parameters(modality):
    if modality in ['steps_summary', 'steps_hourly', 'steps_daily', 'walking_time']:
        return {
            'sum_values': None,
            'mean': None,
            'variance': None,
            'standard_deviation': None,
            'linear_trend': [{"attr": attr} for attr in ['intercept', 'slope', 'rvalue']]
        }
    elif modality == 'physical_activity', 'sedentary_prop':
        return {
            'mean': None,
            'median': None,
            'variance': None,
            'percentage_of_reoccurring_values_to_all_values': None
        }
    else: #let bdybattery, stresscore, bpm be processed like this
        return {
            'sum_values': None,
            'mean': None,
            'variance': None,
            'standard_deviation': None,
            'linear_trend': [{"attr": attr} for attr in ['intercept', 'slope', 'rvalue']]
        }
custom_fc_parameters = get_fc_parameters(modality)
extracted = extract_features(
        df_ts,
        column_id='id',
        column_sort='time',
        column_value="value"
        default_fc_parameters=custom_fc_parameters,
    )