In [None]:
from crewai import LLM
import os

llm=LLM(
    model='deepseek-r1-distill-llama-70b',
    temperature=0.1,
    api_key=os.getenv("GROQ_API_KEY")
)

# Import Libraries

In [5]:
from crewai import Agent,Task,Crew
from crewai_tools import FileReadTool

# Custom and Crew AI tool function

In [None]:
file_reader=FileReadTool('./data/customers.csv')


class DataValidatorTool:
    def run(self,dataframe):
        report=[]
        for file_name,df in dataframe.items():
            summary={
                "file":file_name,
                "rows":len(df),
                "columns":len(df.columns),
                "null_count":int(df.isnull().sum().sum()),
                "duplicate_count":int(df.duplicated().sum())
            }
            
            report.append(summary)
        return report
    
class DataCleanerTool:
    def run(self,dataframe_dict):
        cleaned_data = {}
        report = []

        for filename, df in dataframe_dict.items():
            cleaned_df = df.dropna().drop_duplicates()

            summary = {
                "file": filename,
                "original_rows": len(df),
                "rows_after_cleaning": len(cleaned_df),
                "nulls_removed": int(df.isnull().sum().sum()),
                "duplicates_removed": int(df.duplicated().sum())
            }

            # store cleaned data for further processing
            cleaned_data[filename] = cleaned_df
            report.append(summary)

        return {"cleaned_data": cleaned_data, "report": report}

validator_tool=DataValidatorTool()    
cleaner_tool= DataCleanerTool()   


# Agents Functions

In [None]:
loader_agent=Agent(
    role="Data Loader",
    goal="Read local CSV/JSON files prepare for Validation",
    backstory=""" This agent can scan the local directory ,finds the CSV/JSON files 
                   and read them into dataframe, and collects metadata for validation""",
    tools=[file_reader],
    verbose=True,
    llm=llm 
)

validator_agent=Agent(
    role ="Data validator",
    goal="Check dataset for missing values,duplicates, and Anomolies",
    backstory="""This Agent recieves Dataframes from the loader and runs quality checks so that 
    the clean ,reliable data can be stored or processe further.""",
    tool=[validator_tool],
    llm=llm,
    verbose=True
)

cleaner_agent=Agent(
    role="Data Cleaner",
    goal="Clean dataset any missing values, Duplicate values and Anomolies",
    backstory="""This Agent recievs Dataframe from Validator and runs quality check the clean ,reliable data can be stored or processe further.""",
    tool=[cleaner_tool],
    llm=llm,
    verbose=True
)


# Tasks Functions

In [None]:
load_task=Task(
    description=(
        "1.Scan the 'data/' folder for .csv.\n"
        "2.For each file,read it contents into a Pandas Dataframe.\n"
        "3.Collect metadata: file name,size,number of rows and columns\n"
        "4.Pass DataFrames to the next agent for validation"
        ),
    agent=loader_agent,
    expected_output="A dictionary of file names mapped to Pandas DataFrames for validation."
)

validate_task=Task(
    description=(
        "Run data qualit,y check on provided Dataframes.\n"
        "1.Count total number of rows"
        "2.Count Missing values (NAN).\n"
        "3.Count duplicate Rows \n"
        "Return Summary report for each file"
    ),
    agent=validator_agent,
    expected_output="A JSON report containing file name, rows, columns, null_count, and duplicate_count for each dataset."
)

cleaner_task=Task(
    description=(
        "Receive validated DataFrames and clean them by:\n"
        "1. Removing all rows with null or NaN values.\n"
        "2. Removing all duplicate rows.\n"
        "3. Returning both the cleaned DataFrames and a summary report "
        "showing how many nulls and duplicates were removed for each file."
        ),
    agent=cleaner_agent,
    expected_output=(
        "A JSON object containing:\n"
        "- 'cleaned_data': dictionary with filenames as keys and cleaned DataFrames as values.\n"
        "- 'report': list of summaries for each file, where each summary contains:\n"
        "  * file name\n"
        "  * original row count\n"
        "  * rows after cleaning\n"
        "  * nulls removed\n"
        "  * duplicates removed"
    )
)

# Crew AI 

In [None]:
crew=Crew(
    agents=[loader_agent,validator_agent,cleaner_agent],
    tasks=[load_task,validate_task,cleaner_task],
    verbose=True
)

res=crew.kickoff()
print("\n=== Data Validation Report ===")
print(res)