joedatlive/classifier
Folders and files
| Name | Name | Last commit date | ||
|---|---|---|---|---|
Repository files navigation
We have two pipelines, one for each asset type we are classifying.
User pipeline:
Run ingest_users_assets.py and it will call the Okta integrator (okta_integrator.py) and the user extractor/sensitivity tagger (user_extractor.py).
Then run classifier_user.py to send the data to the LLM.
Data pipleine:
Run ingest_data_assets.py and it will call the Google Workspace integrator (google_drive_integrator) and the data extrator/sensitivity tagger (data_extractor.py).
Then run classifier_data.py to send the data to the LLM.
# Data Classification Pipeline
This repository contains a robust data classification pipeline designed to automatically assess the sensitivity and regulatory relevance (e.g., ITAR) of various data assets. The pipeline leverages large language models (LLMs) for intelligent classification and a PostgreSQL database for storing data assets, knowledge bases, and classification results.
## Architecture Overview
The pipeline consists of several interconnected Python scripts, each responsible for a specific phase:
1. **`ingest_data_assets.py`**: Ingests raw data assets from external sources (e.g., Google Drive) into the central `data_assets` table in PostgreSQL.
2. **`itar_kb_builder.py`**: Builds and maintains a specialized knowledge base for ITAR regulations by fetching, processing, and embedding ITAR documents into the `itar_chunks` table.
3. **`classifier_data.py`**: The main orchestration script that fetches unclassified data assets, performs general sensitivity classification using an LLM, and conditionally performs ITAR-specific classification using the ITAR Knowledge Base.
4. **`itar_kb_classifier.py`**: A module called by `classifier_data.py` to perform the ITAR relevance assessment, utilizing Retrieval Augmented Generation (RAG) principles.
## Database Schema
The pipeline interacts with the following PostgreSQL tables:
* **`data_assets`**: Stores metadata and full text content of all ingested data assets.
* `id` (PRIMARY KEY)
* `name`
* `data_type`
* `location`
* `owner_email`
* `sensitivity_tags_json` (JSONB array of tags identified during ingestion)
* `full_text_content` (Extracted text)
* `access_groups_json`
* `associated_application`
* `last_modified_at`
* `ingested_at`
* **`data_classification`**: Stores the general sensitivity classification results for each data asset.
* `data_asset_id` (PRIMARY KEY, Foreign Key to `data_assets.id`)
* `data_asset_name`
* `data_classification` (e.g., 'Critical', 'High', 'Moderate', 'Low')
* `data_classification_reason` (LLM-generated explanation)
* `owner_email`
* `classified_at`
* **`itar_chunks`**: Stores the processed and embedded chunks of ITAR regulation documents, forming the ITAR Knowledge Base.
* `id` (PRIMARY KEY)
* `document_id` (ID of the original ITAR document)
* `document_name`
* `content` (Text of the chunk)
* `metadata` (JSONB, e.g., subpart, section, chunk_id)
* `embedding` (Vector embedding of the chunk content, using `pgvector`)
* `created_at`
* **`kb_classifications`**: Stores classification results from specific Knowledge Bases (e.g., ITAR).
* `id` (PRIMARY KEY)
* `data_asset_id` (Foreign Key to `data_assets.id`)
* `kb_name` (e.g., 'ITAR')
* `relevance_score` (Similarity score from vector search)
* `classification_label` (e.g., 'ITAR Relevant', 'Potentially ITAR Relevant')
* `classification_reason` (LLM-generated explanation)
* `llm_model_used`
* `created_at`
* `updated_at`
## Pipeline Flow
The classification process unfolds in the following stages:
1. **Data Ingestion (`ingest_data_assets.py`)**:
* Connects to Google Drive using a dedicated **Service Account (e.g., `credentials.json`)**.
* Lists and downloads specified document types.
* Extracts full text content from documents (e.g., DOCX, PDF, XLSX) using `unstructured.io`.
* Identifies initial sensitivity tags (e.g., PII, Financial) from the extracted text.
* Stores the document metadata, extracted text, and initial tags into the `data_assets` table.
2. **ITAR Knowledge Base Building (`itar_kb_builder.py`)**:
* Connects to Google Drive using a separate **Service Account (e.g., `cruncher-credentials.json`)** specifically for ITAR regulations.
* Fetches ITAR regulation documents (either all shared with the service account or from a configured `ITAR_DRIVE_FOLDER_ID`).
* Parses these documents into structured elements (e.g., sections, subsections).
* Applies ITAR-specific hierarchical chunking to break down the regulations into manageable segments (`MAX_CHUNK_SIZE`).
* Generates **vector embeddings** for each ITAR chunk using a local SentenceTransformer model (`all-MiniLM-L6-v2`).
* Stores these ITAR chunks and their embeddings into the `itar_chunks` table. This forms the searchable knowledge base.
3. **Data Classification Orchestration (`classifier_data.py`)**:
* This is the central script that runs periodically to classify new data.
* Establishes **both synchronous (`psycopg2`) and asynchronous (`asyncpg`) database connections** for efficient mixed operations.
* Fetches data assets from the `data_assets` table that have not yet received a general classification (`data_classification.data_asset_id IS NULL`).
* For each unclassified data asset:
* **General Classification (LLM Call 1):** Sends the data asset's content, name, owner, and initial sensitivity tags to the **Gemini LLM** (`gemini-2.0-flash`) for a broad sensitivity classification (Critical, High, Moderate, Low) and a reason.
* **Saves General Classification:** Stores the result in the `data_classification` table.
* **Conditional ITAR Classification (LLM Call 2 - RAG):**
* If `ENABLE_ITAR_KB_CLASSIFICATION` is `True` in the `.env` file:
* Calls the `itar_kb_classifier.classify_document_for_itar` module.
* **Inside `itar_kb_classifier.py`:**
* **Embeds Input Document:** Generates a vector embedding for the input data asset's text using the local SentenceTransformer model (`all-MiniLM-L6-v2`).
* **Vector Similarity Search:** Performs a semantic search against the `itar_chunks` table using `pgvector` to retrieve the most relevant ITAR regulation chunks based on embedding similarity.
* **LLM Classification (Gemini):** Constructs a prompt including the original data asset's text and the retrieved ITAR context. This combined information is sent to the **Gemini LLM** (`gemini-2.0-flash`) to classify the data asset's specific ITAR relevance (ITAR Relevant, Potentially ITAR Relevant, Not ITAR Relevant) and provide a detailed reason.
* **Saves ITAR Classification:** Stores the ITAR classification result (relevance score, label, reason) in the `kb_classifications` table.
* Includes `time.sleep()` pauses to respect LLM API rate limits.
## Environment Variables
Ensure the following environment variables are set in your `.env` file:
* `GEMINI_API_KEY`: Your API key for accessing the Google Gemini LLM.
* `DB_HOST`: PostgreSQL database host (e.g., `localhost`, `0.0.0.0`).
* `DB_NAME`: PostgreSQL database name (e.g., `assets`).
* `DB_USER_DEV_CLASSIFIER`: PostgreSQL username for `classifier_data.py` (needs `SELECT` on `data_assets`, `INSERT/UPDATE` on `data_classification` and `kb_classifications`).
* `DB_PWD_DEV_CLASSIFIER`: PostgreSQL password for `DB_USER_DEV_CLASSIFIER`.
* `DB_USER_DEV_ITAR_READER`: PostgreSQL username for `itar_kb_classifier.py` (needs `SELECT` on `itar_chunks`).
* `DB_PWD_DEV_ITAR_READER`: PostgreSQL password for `DB_USER_DEV_ITAR_READER`.
* `DB_USER_DEV_ITAR_WRITER`: PostgreSQL username for `itar_kb_builder.py` (needs `INSERT/UPDATE` on `itar_chunks`).
* `DB_PWD_DEV_ITAR_WRITER`: PostgreSQL password for `DB_USER_DEV_ITAR_WRITER`.
* `ENABLE_ITAR_KB_CLASSIFICATION`: Set to `True` or `False` to enable/disable ITAR classification (e.g., `ENABLE_ITAR_KB_CLASSIFICATION=True`).
* `ITAR_DRIVE_FOLDER_ID`: (Optional, for `itar_kb_builder.py`) Google Drive folder ID containing ITAR regulations. If omitted, `itar_kb_builder.py` will fetch all files shared with its service account.
---
Mptes on running the new Cloud classifier pipeline
SQL: TRUNCATE TABLE cloud_host_metrics RESTART IDENTITY CASCADE; TRUNCATE TABLE cloud_hosts RESTART IDENTITY CASCADE; TRUNCATE TABLE cloud_resource_groups RESTART IDENTITY CASCADE; TRUNCATE TABLE cloud_subscriptions RESTART IDENTITY CASCADE; TRUNCATE TABLE cloud_accounts RESTART IDENTITY CASCADE; TRUNCATE TABLE production_baseline_profiles RESTART IDENTITY CASCADE; TRUNCATE TABLE cloud_classification RESTART IDENTITY CASCADE;
Python: python generate_cloud_data.py
Python: python production_kb_builder.py
Python: python cloud_extractor.py (This will now populate vm_size and raw_azure_metadata_jsonb in cloud_hosts)
Python: python cloud_classifier.py (This will now use the richer host data, including raw_azure_metadata_jsonb, for LLM analysis)