In [1]:
import pandas as pd
import pyarrow.parquet as pq
from google.cloud import storage
import glob
import os
import gcsfs

In [2]:
# GCP Authentication Setup
def setup_gcp_auth(credentials_path=None):
    """
    Set up GCP authentication
    
    Args:
        credentials_path: Optional path to service account JSON key file
    """
    if credentials_path:
        # Explicitly use service account credentials by specifying the key file
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials_path
        print(f"Using credentials from: {credentials_path}")
    else:
        # Check if credentials are already set
        if 'GOOGLE_APPLICATION_CREDENTIALS' in os.environ:
            print(f"Using credentials from environment variable")
        else:
            print("No credentials specified. Using default authentication")
            # This will use Application Default Credentials (ADC) if available
            # https://cloud.google.com/docs/authentication/application-default-credentials
    
    # Test authentication by listing buckets
    try:
        storage_client = storage.Client()
        buckets = list(storage_client.list_buckets(max_results=5))
        print(f"Authentication successful. Found {len(buckets)} buckets.")
        return True
    except Exception as e:
        print(f"Authentication failed: {e}")
        return False

In [3]:
# Method 1: Using pandas with gcsfs
def read_parquet_files_with_pandas(bucket_name, prefix):
    """
    Read multiple Parquet files from GCP using pandas and gcsfs
    """
    fs = gcsfs.GCSFileSystem()
        
    # List all parquet files in the path
    path_pattern = f"gs://{bucket_name}/{prefix}/*.parquet"
    file_list = fs.glob(path_pattern)
        
    if not file_list:
        print(f"No files found matching pattern: {path_pattern}")
        return pd.DataFrame()
        
    print(f"Found {len(file_list)} files matching pattern")
        
    # Read all the parquet files
    df = pd.concat([pd.read_parquet(file, filesystem=fs) for file in file_list])
    return df

In [32]:
credentials_path = "google_sa.json"  # Optional
setup_gcp_auth(credentials_path)

# Define bucket and path
bucket_name = "gcs-email-landing-inbox-ai"
prefix = "processed/1234/3326b9b8-6f10-4965-9990-f0f5f023e95d/pc612001@gmail.com"

# Read Parquet files using any of the methods
df = read_parquet_files_with_pandas(bucket_name, prefix)

# Display the first few rows
print(df.head())

Using credentials from: google_sa.json
Authentication successful. Found 1 buckets.
Found 1 files matching pattern
         message_id                                from_email  \
0  19558716f157f98f  Target Circle <targetnews@em.target.com>   
1  1955866babc578b7                        pc612001@gmail.com   

                                 to    cc   bcc  \
0            [<PC612001@gmail.com>]  None  None   
1  [choudhari.pra@northeastern.edu]  None  None   

                                             subject  \
0  Hi Pradnyesh, your weekly Target Circle deals ...   
1  ALERT: Email Processing Failure - Run ID: unkn...   

                        date  \
0  2025-03-02T19:59:29+00:00   
1  2025-03-02T19:48:00+00:00   

                                             content  \
0  DQoNCg0KDQoNCiANCg0KIA0KIA0KDQogDQogDQoNCiAgDQ...   
1  CiAgICAgICAgPGgyPkVtYWlsIFByb2Nlc3NpbmcgUGlwZW...   

                                          plain_text  \
0  DQoNCg0KDQoNCiANCg0KIA0KIA0KDQogDQogDQoNCi

In [36]:
df

Unnamed: 0,message_id,from_email,to,cc,bcc,subject,date,content,plain_text,html,attachments,thread_id,labels,plain_text_decoded,html_decoded,extracted_text,combined_text,redacted_subject,redacted_text
0,19558716f157f98f,Target Circle <targetnews@em.target.com>,[<PC612001@gmail.com>],,,"Hi Pradnyesh, your weekly Target Circle deals ...",2025-03-02T19:59:29+00:00,DQoNCg0KDQoNCiANCg0KIA0KIA0KDQogDQogDQoNCiAgDQ...,DQoNCg0KDQoNCiANCg0KIA0KIA0KDQogDQogDQoNCiAgDQ...,DQoNCg0KDQo8IURPQ1RZUEUgaHRtbD4NCjxodG1sIGxhbm...,[],19558716f157f98f,"[CATEGORY_PROMOTIONS, UNREAD, INBOX]",\r\n\r\n\r\n\r\n\r\n \r\n\r\n \r\n \r\n\r\n \r...,\r\n\r\n\r\n\r\n<!DOCTYPE html>\r\n<html lang=...,The easy way to find all your savings for the ...,\r\n\r\n\r\n\r\n\r\n \r\n\r\n \r\n \r\n\r\n \r...,"Hi Pradnyesh, your weekly Target Circle deals ...",The easy way to find all your savings for the ...
1,1955866babc578b7,pc612001@gmail.com,[choudhari.pra@northeastern.edu],,,ALERT: Email Processing Failure - Run ID: unkn...,2025-03-02T19:48:00+00:00,CiAgICAgICAgPGgyPkVtYWlsIFByb2Nlc3NpbmcgUGlwZW...,,CiAgICAgICAgPGgyPkVtYWlsIFByb2Nlc3NpbmcgUGlwZW...,[],195536ff90925df3,[SENT],,\n <h2>Email Processing Pipeline Failur...,Email Processing Pipeline Failure Run ID: unkn...,Email Processing Pipeline Failure Run ID: unk...,ALERT: Email Processing Failure - Run ID: unkn...,Email Processing Pipeline Failure Run ID: unkn...


In [35]:
df.redacted_text.tolist()

['The easy way to find all your savings for the week. [URL] [URL] Weekly Ad [URL] Top Deals [URL] New Arrivals [URL] This week’s deals are here! See all deals [URL] This week’s deals are here! See all deals [URL] Activate your personalized Target Circle Bonuses now &#8250; [URL] Save $5 when you spend $30 on baby diapers & wipes &#8250; [URL] Save $5 when you spend $30 on baby diapers & wipes &#8250; [URL] 10% off select board games, puzzles & activities &#8250; [URL] 10% off select board games, puzzles & activities &#8250; Explore this week’s top deals [URL] Electronics Deals [URL] Toy Deals [URL] Clothing, Shoes & Accessory Deals [URL] Home Deals [URL] Beauty & Personal Care Deals [URL] All Top Deals [URL] *See the full details of all deals. Some restrictions apply. [URL] Offer Details [URL] Clothing [URL] Shoes [URL] Home [URL] Electronics [URL] Toys [URL] Grocery [URL] Help [URL] Privacy †”Lifetime savings” includes savings from Cartwheel and Target Circle offers. “Available balanc

In [5]:
def upload_parquet_to_gcs(local_parquet_path, destination_blob_name):
    """
    Upload a Parquet file to GCS bucket
    
    Args:
        local_parquet_path: Path to local Parquet file
        destination_blob_name: Destination path in GCS bucket
    """
    try:
        storage_client = storage.Client()
        bucket = storage_client.bucket("gcs-email-landing-inbox-ai")
        blob = bucket.blob(destination_blob_name)
        blob.upload_from_filename(local_parquet_path)
        print(f"File {local_parquet_path} uploaded to gs://{bucket_name}/{destination_blob_name}")
    except Exception as e:
        print(f"Error uploading file: {e}")

# Example usage:
# Assuming you have a local Parquet file
upload_parquet_to_gcs("emails_batch_00001.parquet", "user_id/cron/emailid/emails/emails_batch_00001.parquet")



File emails_batch_00001.parquet uploaded to gs://gcs-email-landing-inbox-ai/user_id/cron/emailid/emails/emails_batch_00001.parquet


In [29]:
def list_files_in_bucket(bucket_name, prefix=None):
    """
    List all files in a GCS bucket with optional prefix filtering
    
    Args:
        bucket_name: Name of the GCS bucket
        prefix: Optional prefix to filter files (folder path)
    """
    try:
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        
        # List blobs/files
        blobs = bucket.list_blobs(prefix=prefix)
        
        print(f"Files in gs://{bucket_name}/{''+prefix if prefix else ''}:")
        for blob in blobs:
            print(f"- {blob.name}")
            
    except Exception as e:
        print(f"Error listing files: {e}")

# List all files in the bucket with the specified prefix
list_files_in_bucket("gcs-email-landing-inbox-ai")

# Alternatively, list all files in the bucket
# list_files_in_bucket(bucket_name)

Files in gs://gcs-email-landing-inbox-ai/:
- chroma_data/chroma.sqlite3
- processed/1233/0dea884a-66ce-4fce-8f88-861d2e47e41a/pc612001@gmail.com/processed_emails.parquet
- processed/1234/3326b9b8-6f10-4965-9990-f0f5f023e95d/pc612001@gmail.com/processed_emails.parquet
- processed/3/ed74a5db-8319-462c-91fd-a28e7e4fb781/pc612001@gmail.com/processed_emails.parquet
- raw/1233/0dea884a-66ce-4fce-8f88-861d2e47e41a/pc612001@gmail.com/emails.parquet
- raw/1234/3326b9b8-6f10-4965-9990-f0f5f023e95d/pc612001@gmail.com/emails.parquet
- raw/2/1d30ee2a-4d99-41a9-b79e-fb5766207e49/pc612001@gmail.com/emails.parquet
- raw/2/5a347bc6-aed2-4873-83ae-fd614c6157d5/pc612001@gmail.com/emails.parquet
- raw/2/b0681edc-4baf-464b-bccd-31ac9c412a82/pc612001@gmail.com/emails.parquet
- raw/2/d2a3e3f0-532d-49dc-85fc-445128d2fb23/pc612001@gmail.com/emails.parquet
- raw/3/ed74a5db-8319-462c-91fd-a28e7e4fb781/pc612001@gmail.com/emails.parquet
- raw/3/ed74a5db-8319-462c-91fd-a28e7e4fb781/pc612001@gmail.com/processed_emai

In [7]:
import chromadb
# Example setup of the client to connect to your chroma server
client = chromadb.HttpClient(host='localhost', port=8000)

In [37]:
df['labels'] = df['labels'].astype(str)
# using apply function to create a new column
df['metadata'] = df.apply(lambda row: {"from":row.from_email,  "date":row.date, "labels":row.labels}, axis = 1)

# Print the DataFrame after addition
# of new column
print(df.metadata.to_list())

[{'from': 'Target Circle <targetnews@em.target.com>', 'date': '2025-03-02T19:59:29+00:00', 'labels': "['CATEGORY_PROMOTIONS' 'UNREAD' 'INBOX']"}, {'from': 'pc612001@gmail.com', 'date': '2025-03-02T19:48:00+00:00', 'labels': "['SENT']"}]


In [39]:
import openai

In [None]:
OPENAI_API_KEY = ""
# Initialize clients
openai.api_key = OPENAI_API_KEY

df['embeddings'] = df.apply(lambda row: openai.embeddings.create(input=row.redacted_text, model="text-embedding-3-small"), axis = 1)

In [43]:
df

Unnamed: 0,message_id,from_email,to,cc,bcc,subject,date,content,plain_text,html,...,thread_id,labels,plain_text_decoded,html_decoded,extracted_text,combined_text,redacted_subject,redacted_text,metadata,embeddings
0,19558716f157f98f,Target Circle <targetnews@em.target.com>,[<PC612001@gmail.com>],,,"Hi Pradnyesh, your weekly Target Circle deals ...",2025-03-02T19:59:29+00:00,DQoNCg0KDQoNCiANCg0KIA0KIA0KDQogDQogDQoNCiAgDQ...,DQoNCg0KDQoNCiANCg0KIA0KIA0KDQogDQogDQoNCiAgDQ...,DQoNCg0KDQo8IURPQ1RZUEUgaHRtbD4NCjxodG1sIGxhbm...,...,19558716f157f98f,['CATEGORY_PROMOTIONS' 'UNREAD' 'INBOX'],\r\n\r\n\r\n\r\n\r\n \r\n\r\n \r\n \r\n\r\n \r...,\r\n\r\n\r\n\r\n<!DOCTYPE html>\r\n<html lang=...,The easy way to find all your savings for the ...,\r\n\r\n\r\n\r\n\r\n \r\n\r\n \r\n \r\n\r\n \r...,"Hi Pradnyesh, your weekly Target Circle deals ...",The easy way to find all your savings for the ...,{'from': 'Target Circle <targetnews@em.target....,CreateEmbeddingResponse(data=[Embedding(embedd...
1,1955866babc578b7,pc612001@gmail.com,[choudhari.pra@northeastern.edu],,,ALERT: Email Processing Failure - Run ID: unkn...,2025-03-02T19:48:00+00:00,CiAgICAgICAgPGgyPkVtYWlsIFByb2Nlc3NpbmcgUGlwZW...,,CiAgICAgICAgPGgyPkVtYWlsIFByb2Nlc3NpbmcgUGlwZW...,...,195536ff90925df3,['SENT'],,\n <h2>Email Processing Pipeline Failur...,Email Processing Pipeline Failure Run ID: unkn...,Email Processing Pipeline Failure Run ID: unk...,ALERT: Email Processing Failure - Run ID: unkn...,Email Processing Pipeline Failure Run ID: unkn...,"{'from': 'pc612001@gmail.com', 'date': '2025-0...",CreateEmbeddingResponse(data=[Embedding(embedd...


In [None]:
import chromadb
from chromadb.config import Settings

# Create the client
client = chromadb.HttpClient(
    host="localhost",
    port=8000
)

# Define your function without async/await
def upload_to_chroma(user_id, df, client):
    """
    Upload data to Chroma
    """
    try:
        collection = client.get_or_create_collection(name=user_id)
        
        # Upload data to Chroma
        collection.upsert(
            documents=df.subject.tolist(),
            embeddings=df.embeddings.tolist(),
            metadatas=df.metadata.tolist(),
            ids=df.message_id.tolist()
        )
    except Exception as e:
        print(f"Error uploading data to Chroma: {e}")

# Call the function normally
upload_to_chroma("test_id", df, client)

ValueError: Server disconnected without sending a response.

In [21]:
import chromadb
print(chromadb.__version__)

0.6.3


In [28]:
import chromadb
import pandas as pd

# Create the client
client = chromadb.HttpClient(
    host="localhost",
    port=8000,
    # settings=chromadb.config.Settings(api_version="v2")
)

def upload_to_chroma(user_id, df, client):
    """
    Upload data to Chroma
    """
    try:
        collection = client.get_or_create_collection(name=user_id)
        
        # Upload data to Chroma
        collection.upsert(
            documents=df['subject'].tolist(),
            metadatas=df['metadata'].tolist(),
            ids=df['message_id'].tolist()
        )
        print("Data uploaded successfully")
    except Exception as e:
        print(f"Error uploading data to Chroma: {e}")

# Example dataframe
df = pd.DataFrame({
    "subject": ["Email 1", "Email 2", "Email 3"],
    "metadata": [{"sender": "A"}, {"sender": "B"}, {"sender": "C"}],
    "message_id": ["1", "2", "3"]
})

# Upload data
upload_to_chroma("test_id", df, client)


Data uploaded successfully
