In [2]:
%pip -q install -U crewai[tools] pandas matplotlib python-dotenv

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/42.8 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m48.5/48.5 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m754.4/754.4 kB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m552.7/552.7 kB[0m [31m40.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.4/12.4 MB[0m [31m99.4 MB/s[0m eta

In [3]:
import os, json, textwrap, shutil, sys, pathlib, datetime
from pprint import pprint

#Configuration API Keys

os.environ["OPENAI_API_KEY"] = "Enter Your Key Here"
assert "OPENAI_API_KEY" in os.environ, "OPENAI_API_KEY is not available, please enter your own api key "

# basic paths

BASE_DIR = pathlib.Path().cwd()
OUT_DIR = BASE_DIR / "out"
FIG_DIR = OUT_DIR / "figs"
OUT_DIR.mkdir(exist_ok=True)
FIG_DIR.mkdir(parents=True, exist_ok=True)

print(" Environment OK. Output Dir:", OUT_DIR)


 Environment OK. Output Dir: /content/out


In [4]:
#import necessary tools and create directory for tools

from crewai import Agent, Task, Crew, Process
from crewai_tools import CodeInterpreterTool

import pandas as pd
import matplotlib.pyplot as plt


In [5]:
# Creating necessary files in the subfolders and files

def write_text(path: pathlib.Path, text: str):
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(text, encoding="utf-8")

def read_text(path: pathlib.Path) -> str:
    return path.read_text(encoding="utf-8")

def reset_out():
  if OUT_DIR.exists():
    shutil.rmtree(OUT_DIR)
  OUT_DIR.mkdir(parents=True, exist_ok=True)
  FIG_DIR.mkdir(parents=True, exist_ok=True)



In [6]:
# ReAct Prompts
PROMPT_MANAGER = """
You are the Analytics Manager. You job is to plan, delegate, verify outputs, and request fixes.
Process:
1) Review the goal and inputs (csv_path, figdir, report_path)
2) Delegate to ingestion, the eda, then viz, then insights.
3) For each step, if expected files are missing or malformed, send a concise correction and retry once.
4) Keep everything deterministic and minimal. No external network calls. Keep plots Matplotlib-only.

Deliverable: a successful run with:
- out/df_cleaned.pkl
- out/schema.json
- out/eda_summary.json
- out/figs_index.json and PNGs under {figdir}
- {report_path} (markdown with insights & next actions)

"""


PROMPT_INGESTION = """
Role: Data Ingestion & Cleaning Engineer.

Goal: Load the CSV at `csv_path`, infer schema, clean NA/outliers, and persist artifacts.

Constraints and Tools:
- Use Python (pandas). Write short, correct code via the code interpreter tool.
- Never use seaborn; do not produce plots here.
- Be explicit and reproducible: if you choerce types, log it to a 'changlog'.

Steps (ReAct):
- REASON: Inspect the CSV path and file size. Determine read_csv parameters (parse_dates candidates, dtype hints).
- ACT (code): Read with pandas (low_memory=False). Print df.info(). sample(5).
- OBSERVE: If bad dtypes or parse errors, re-load with better params.
- ADJUST: Apply cleaning : strip column names, drop exact duplicates, make best-effort type coercions.
- For NAs: decide column-wise strategy (drop/median/most_frequent/leave) and justify.
- Flag obvious outliers (z-score > 4 on numeric columns) into a boolean mask column `__outlier_flag` (do NOT drop).

Deliverables:
- Save cleaned DataFrame to 'out/df_cleaned.pkl'
- Write to 'out/schema.json' capturing: columns, inferred dtypes, NA counts, choosen NA strategies, any coercions, and a short 'changelog'


"""
PROMPT_EDA = """
Role: EDA Specialist.

Goal: Analyze 'out/df_cleaned.pkl' and produce 'out/eda_summary.json'.

Constraints:
- Python +pandas only. No Seaborn. No external data.
- Be Selective: Surface only useful, interpretable stats.

Steps (ReAct):
- REASON: Identify numeric vs categorical columns. Detect datetime columns. Check cardinality
- ACT (code): Compute:
   - Descriptive Stats (numeric describe, categorical top frequencies)
   - Correlations (Pearson); if time column exists, simple trand stats
   - Missingness summary (post-clean)
- OBSERVE: Extract key findings (e.g., skewed distribution, strong correlations |r| >= 0.5).
- ADJUST: Propose 3-6 'candidate_plots' with {type, columns, rationale}.

Deliverable JSON ('out/eda_summary.json'):
 {
   "highlights": [string...],
   "candidate_plots": [
     {
       {"type": "hist", "columns": ["col"], "rationale": "..."},
       {"type": "bar", "columns": ["cat_col"], "rationale": "..."},
       {"type": "line", "columns": ["time_col","metric"], "rationale": "..."},
       ....
]
}
"""

PROMPT_VIZ = """
Role: Visualization Engineer.


Goal: Generate Matplotlib PNGs for the top candidate plots and save an index.

Constraints:
  - Use Matplotlib ONLY. One Figure per plot. save under figdir as PNG.
  - No style settings, no custom colors
  - Handle missing columns gracefully (skip with a note)


Plot rules:
  - hist: df[columns[0]].hist()
  - bar: df[columns[0]].value_counts().head(20).plot(kind='bar')
  - line: If len(columns)==2 and first is datetime-like, plot df.sort_values(time)[[time, metric]] as a line.
  - scatter: If 2 numeric columns, simple scatter.
  - box: df[columns].plot(kind="box")

Deliverable:
  - Save 3-6 PNGs named fig_01.png, fig_02.png, .....
  - Write 'out/figs_index.json' as a list: [{"file"}:"out/fig_01.png", "caption":"..."},...]
"""

PROMPT_INSIGHTS = """
Role: Insights & Strategy Analyst.

Goal: Produce a crisp analyst report 'report_path' in Markdown using:
  - out/schema.json
  - out/eda_summary.json
  - out/figs_index.json

Report Sections:
#Title
-Dataset Overview (rows, columns, key types)
-Data quality summary (NA handling, outlier flags)
-Top 3-5 insights with specific stats / correlations
-Visuals: embed filenames and explain what to look at
-Decisions & Next Actions: concrete recommendations (e.g., product , marketing, ops, service logs)
-Risks & Assumptions
-Appendix: table of columns with short notes

Style:
-Be consise, Use Bullet points.
-Quote numbers precisely (means, medians, correlations)
-Refer to figures by filename. Do NOT try to display them; just list paths.

"""











In [7]:
# Defining Agents & Tasks

code_tool= CodeInterpreterTool()



In [9]:
manager = Agent( role = "Analytics Manager",
                 goal ="Deliver an accurate, visual, decision-oriented analysis from a CSV path.",
                 backstory=" A senior data lead who plans work, checks results, and requests fixes.",
                 allow_delegation=True,
                 verbose=True,
                 llm='gpt-5',
                 memory=True,
                 max_iter=2,
                 system_prompt=PROMPT_MANAGER,
)




In [17]:
ingestion = Agent(
    role="Data Ingestion and Cleaning",
    goal = "Load CSV, infer schema, clean data robustly, persist df_cleaned.pkl + schema JSON.",
    backstory =" A senior data lead who plans work, checks results, and request fixex.",
    allow_delegation=False,
    verbose=True,
    llm="gpt-5",
    tools=[code_tool],
    max_iter=3,
    system_prompt=PROMPT_INGESTION
)






In [18]:
eda = Agent(
    role="EDA Specialist",
    goal = "Summaries, stats, correlations; propose meaningful visulalizations",
    backstory ="Quant who spots patterns and pitfalls.",
    allow_delegation=False,
    verbose=True,
    llm="gpt-5",
    tools=[code_tool],
    max_iter=3,
    system_prompt=PROMPT_EDA
)


In [19]:
viz = Agent(
    role="Vizualization Engineer",
    goal = "Produce clear Matlotlib PNGs for findings; save and list captions.",
    backstory =" Programatic plotter with clean defaults",
    allow_delegation=False,
    verbose=True,
    llm="gpt-5",
    tools=[code_tool],
    max_iter=3,
    system_prompt=PROMPT_VIZ
)


In [20]:
insights = Agent(
    role="Insights & Strategy",
    goal = "Draft a crisp report with insights and next-step recommendations",
    backstory =" BI analyst who writes for execs.",
    allow_delegation=False,
    verbose=True,
    llm="gpt-5",
    tools=[code_tool],
    max_iter=3,
    system_prompt=PROMPT_INSIGHTS
)


In [25]:
# Tasks

def make_tasks(csv_path, figdir:str, report_path:str):

  t1= Task(
      description=textwrap.dedent(f""" Ingest {csv_path}. Infer dtypes, diagnose NAs/outliers, clean with transparent steps. Save DataFrame at 'out/df_cleaned.pkl'
      and write 'out/schema.json' describing columns and decisions. Print a short changelog.""").strip(),
      agent=ingestion,
      expected_output="out/df_cleaned.pkl and out/schema.json created",
  )

  t2= Task(
      description=textwrap.dedent(f""" Load 'out/df_cleaned.pkl. Compute descriptive stats, correlations, key categorical distributions, and generate a ranked list of 'candidate_plots' with (type, columns, rationale). Save at 'out/eda_summary.json'
      """).strip(),
      agent=eda,
      expected_output="out/eda_summary.json created with candidate_plots list",
  )

  t3= Task(
      description=textwrap.dedent(f""" Read 'out/eda_summary.json'. For the top 3-6 candiate plots, generate Matplotlib figures and save PNGs to {figdir} and Avoid Seaborn
      . Return a JSON array of {{file, caption}} and write it to 'out/figs_index.json'. """).strip(),
      agent=viz,
      expected_output=f"PNG figures saved under {figdir} and 'out/figs_index.json' created",
  )

  t4= Task(
      description=textwrap.dedent(f""" Using  'out/schema.json', 'out/eda_summary.json' and 'out/figs_index.json', write a Markdown reprot to {report_path}. The
      report must include sections: Overview, data quality, insights, Visuals {filenames}, Decisions & Next Actions: concrete recommendations (e.g., product , marketing, ops, service logs)
-Risks & Assumptions
-Appendix: table of columns with short notes

  """).strip(),
      agent=insights,
      expected_output=f"{report_path} written",
  )

  return [t1,t2, t3, t4]









In [27]:
# Create the crew Hirarchial setup for the manager can verify and redirect

def make_crew(tasks):

  crew = Crew(

              agents= [ingestion, eda, viz, insights],
              manager_agent= manager,
              tasks=tasks,
              verbose=True,
              max_rpm=30,
              process=Process.hierarchical
  )
  return crew








In [28]:
# Runner: End to End pipeline

def run_analytics(csv_path:str, figdir:str = str(FIG_DIR), report_path:str = str(OUT_DIR/ "report.md"), reset_outputs:bool = True):

  if reset_outputs:
    reset_out()


    assert pathlib.Path(csv_path).exists(), f"CSV not found: {csv_path}"
    write_text(OUT_DIR / "RUN_METADATA.json", json.dumps({
        "csv_path": str(csv_path),
        "figdir": str(figdir),
        "report_path": str(report_path),
        "run_at": datetime.datetime.utcnow().isoformat() + "Z",



    }, indent =2))

    tasks = make_tasks(csv_path=csv_path, figdir=figdir, report_path=report_path)
    crew= make_crew(tasks)
    result = crew.kickoff()
    print("\n=== Crew Result ===== \n", result)

    # Acceptance Checks

    required = [
        OUT_DIR / "df_cleaned.pkl",
        OUT_DIR / "schema.json",
        OUT_DIR / "eda_summary.json",
        OUT_DIR / "figs_index.json",
        pathlib.Path(report_path),
    ]

    missing = [str(p) for p in required if not pathlib.Path(p).exists()]

    if missing:
      raise RuntimeError("Missing expected outputs: \n " + "\n".join(missing))

    #Show a quick summary

    print("\n Artifacts:")

    for p in required:
      print(f" - {p}")

    with open(OUT_DIR / "figs_index.json","r", encoding="utf-8") as f:
      figs_index = json.load(f)

    print("\n Figures:")

    for i in idx:
      print(" - ", i.get("file"), "::". i.get("caption"))

    print(f"\n Done. Report : {report_path}")











