In [None]:
import os, sqlite3, logging, pandas as pd
from glob import glob
from time import sleep
import schedule

CLEAN_DIR="data/clean"
MASTER_CSV=f"{CLEAN_DIR}/crypto_master.csv"
MASTER_DB=f"{CLEAN_DIR}/crypto_master_db"
os.makedirs("logs", exist_ok=True); os.makedirs(CLEAN_DIR, exist_ok=True)

logging.basicConfig(filename="log/week3.log",
                    level=logging.INFO,
                    format="%(asctime)s %(levelname)s %(message)s")

def lastest_clean():
    files = glob(f"{CLEAN_DIR}/crypto_clean*.csv")
    return max(files, key=os.path.getmtime) if files else None

def job():
    try:
        src = lastest_clean()
        if not src:
            return logging.info("No cleaned CSV found.")
        df = pd.read_csv(src)
        master = pd.read_csv(MASTER_CSV) if os.path.exists(MASTER_CSV) else pd.DataFrame(columns=df.columns)
        before = len(master)
        master = pd.concat([master, df]).drop_duplicates(subset=["id","fetched_at"])
        master.to_csv(MASTER_CSV, index=False)
        with sqlite3.connect(MASTER_DB) as conn:
            master.to_sql("crypto", conn, if_exists="replace", index=False)
        added = len(master) - before
        logging.info(f"SUCCESS: {added} new rows from {os.path.basename(src)}")
    except Exception:
        logging.exception("ERROR in pipeline")

schedule.every().day.at("00:00").do(job)

if __name__ == "__main__":
    job()
    while True:
        schedule.run_pending(); sleep(30)

In [None]:
import sqlite3, pandas as pd
conn = sqlite3.connect("data/clean/crypto_master.db")

#Count rows
print(pd.read_sql("SELECT COUNT(*) AS total FROM crypto;", conn))

# Top 5 by market cap
print(pd.read_sql("SELECT id, name, current_price, market_cap FROM crypto ORDER BY CAST(market_cap AS REAL) DESC LIMIT 5;", conn))

# Average daily % change
print(pd.read_sql("SELECT AVG(CAST(price_change