In [1]:
import pandas as pd
import hashlib

# Read the first 10 rows of "emails.csv"
emails_df = pd.read_csv("emails.csv", nrows=1000)

print(emails_df.count())

file       1000
message    1000
dtype: int64


In [35]:
import os
from dotenv import load_dotenv
from neo4j import GraphDatabase

# Load environment variables from .env file
load_dotenv()

# Read Neo4j credentials from environment variables
neo4j_username = os.getenv('NEO4J_USERNAME')
neo4j_password = os.getenv('NEO4J_PASSWORD')
neo4j_uri = os.getenv('NEO4J_URI')

# Create a Neo4j driver instance
driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_username, neo4j_password))

# Verify the connection

def verify_connection(driver):
    try:
        with driver.session() as session:
            result = session.run("RETURN 1")
            if result.single()[0] == 1:
                print("Connection to Neo4j established successfully.")
            else:
                print("Failed to establish connection to Neo4j.")
    except Exception as e:
        print(f"An error occurred: {e}")

verify_connection(driver)



Connection to Neo4j established successfully.


In [36]:
import os
from pinecone import Pinecone

# Load Pinecone API key from environment variables
pinecone_api_key = os.getenv('PINECONE_API_KEY')
print(pinecone_api_key)

# Initialize Pinecone client
pc = Pinecone(api_key=pinecone_api_key)

# Create a Pinecone index
index_name = "enron"
pindex = pc.Index(index_name)


  from tqdm.autonotebook import tqdm


40dcba58-adbd-4509-9323-f47d8cf22799


In [108]:
def save_transaction_graph(sender, recipient, subject, body, sent_date, transaction_id, is_forwarded, parent_transaction_id):
    print(f"is forwarded: {is_forwarded == True}")

    if not sender or not recipient:
        print("Error: Sender or recipient address is null.")
        return
    
    if subject is None:
        subject = ""
    if body is None:
        body = ""
        
    if sent_date is None:
        sent_date = ""
    
    with driver.session() as session:
        session.run(
            """
            MERGE (from:EmailAddress {address: $sender})
            MERGE (to:EmailAddress {address: $recipient})
            MERGE (email:Email {id: $transaction_id, body: $body, subject: $subject, sent_date: $sent_date})
            CREATE (from)-[:EMAIL_FROM]->(email)
            CREATE (email)-[:EMAIL_TO]->(to)
            """,
            sender=sender,
            recipient=recipient,
            subject=subject,
            body=body,
            sent_date=sent_date,
            transaction_id=transaction_id
        )
        
     
        if is_forwarded:
            print(f"found forwarded email {transaction_id} with parent {parent_transaction_id}")
            session.run(
                """
                MATCH (parent:Email {id: $parent_transaction_id})
                MATCH (child:Email {id: $transaction_id})
                CREATE (child)-[:INCLUDED_IN]->(parent)
                """,
                parent_transaction_id=parent_transaction_id,
                transaction_id=transaction_id
            )
               
        print(f"Email {transaction_id} is not forwarded.")
        
           

In [37]:
import textwrap

def recursive_text_splitter(document, max_chunk_size):
    # Use textwrap to initially split the text into lines of max_chunk_size
    chunks = textwrap.wrap(document, width=max_chunk_size)
    
    final_chunks = []
    
    for chunk in chunks:
        if len(chunk) > max_chunk_size:
            # If a chunk is still larger than max_chunk_size, split it further
            final_chunks.extend(recursive_text_splitter(chunk, max_chunk_size))
        else:
            final_chunks.append(chunk)
    
    return final_chunks

In [38]:
from openai import OpenAI
client = OpenAI()

def get_embedding(text):
    response = client.embeddings.create(
        input=text,
        model="text-embedding-3-small"
    )
    return response.data[0].embedding

In [39]:
def save_transaction_embedding(email_from, email_to, email_subject, email_body, email_sent_date, transaction_id):
  chunks = recursive_text_splitter(email_body, 512)
  
  # Create an index if it doesn't exist  
  for chunk in chunks:
      chunk_id = transaction_id + "_" + str(chunks.index(chunk))
      embedding = get_embedding(chunk)
      metadata = {
          "email_from": email_from,
          "email_to": email_to,
          "email_subject": email_subject,
          "email_sent_date": email_sent_date,
          "transaction_id": transaction_id,
          "chunk": chunk
      }
      
      for key, value in metadata.items():
            if value is None:
                metadata[key] = "null" 

      pindex.upsert([(chunk_id, embedding, metadata)])
  

In [64]:
import logging
import email
from email import policy
from email.parser import Parser
import re

def parse_email(email_str):
    # Parse the email string
    msg = email.message_from_string(email_str, policy=policy.default)
    return msg

def extract_forwarded_email_headers(payload):
    # Use regex to find the original headers in the forwarded email|
    forwarded_headers = {}
    forwarded_headers['From'] = re.findall(r'From: (.+)', payload)
    forwarded_headers['To'] = re.findall(r'To: (.+)', payload)
    forwarded_headers['Date'] = re.findall(r'Date: (.+)', payload)
    forwarded_headers['Subject'] = re.findall(r'Subject: (.+)', payload)
    return forwarded_headers

def find_included_email_parts(msg):
    # This function checks if the email contains another email
    included_emails = []
    if msg.is_multipart():
        for part in msg.iter_parts():
            included_emails.extend(find_included_email_parts(part))
    else:
        payload = msg.get_payload()
        if isinstance(payload, str) and any(indicator in payload for indicator in ["Forwarded message", "Original Message", "From:", "Sent:", "To:", "Subject:"]):
            included_emails.append(msg)
    return included_emails

def extract_included_email(msg):
    # Extracts the included email text and headers
    included_email_texts = []
    if msg.is_multipart():
        for part in msg.iter_parts():
            included_email_texts.extend(extract_included_email(part))
    else:
        payload = msg.get_payload(decode=True)
        if payload:
            payload = payload.decode()
            # Detect forwarded email indicators
            start_idx = min(
                (payload.find(indicator) for indicator in ["Forwarded message", "Original Message", "From:", "Sent:", "To:", "Subject:"] if payload.find(indicator) != -1),
                default=-1
            )
            if start_idx != -1:
                included_email_texts.append(payload[start_idx:])
                # Extract headers
                headers = extract_forwarded_email_headers(payload[start_idx:])
                return headers, payload[start_idx:]
    return None, None

In [117]:
all_emails_df = pd.DataFrame(columns=[
    "email_from", "email_to", "email_subject", "email_body", 
    "email_sent_date", "transaction_id", "is_forwarded", "parent_transaction_id"
])

def extract_forwarded_date(email_content):
    match = re.search(r'Forwarded by .+ on (\d{2}\/\d{2}\/\d{4})', email_content)
    if match:
        date_str = match.group(1)
        print(date_str)

        return f"{date_str}"
    return None

for index, row in emails_df.iterrows():
    email_str = row['message']  # Assuming the column name is 'message'
    msg = parse_email(email_str)
    
    included_emails = find_included_email_parts(msg)
    
    def collect_emails(msg, parent_transaction_id=None, parent_email_to=None, depth=0, max_depth=5):
        if depth > max_depth:
            print("Maximum recursion depth reached.")
            return
        
        email_from = msg['From']
        email_to = msg['To'] if msg['To'] else parent_email_to  # Inherit 'email_to' from parent if missing
        email_subject = msg['Subject']
        email_body = msg.get_payload()
        email_sent_date = msg['Date']
        
        global all_emails_df
        
        # Ensure email_from and email_to only contain email addresses
        if email_from:
            email_from_matches = re.findall(r'[\w\.-]+@[\w\.-]+', email_from)
            email_from = ', '.join(email_from_matches)
        if email_to:
            email_to_matches = re.findall(r'[\w\.-]+@[\w\.-]+', email_to)
            email_to = ', '.join(email_to_matches)
        
        if email_from is None and parent_transaction_id is not None:
            parent_email = all_emails_df.loc[all_emails_df['transaction_id'] == parent_transaction_id, 'email_from']
            if not parent_email.empty:
                email_from = parent_email.values[0]
        
        hash_input = f"{email_from}{email_to}{email_subject}{email_sent_date}"
        transaction_id = hashlib.sha256(hash_input.encode()).hexdigest()
        
        email_data = {
            "email_from": email_from,
            "email_to": email_to,
            "email_subject": email_subject,
            "email_body": email_body,
            "email_sent_date": email_sent_date,
            "transaction_id": transaction_id,
            "is_forwarded": parent_transaction_id is not None,
            "parent_transaction_id": parent_transaction_id
        }
        
        # Extract forwarded date if the email is forwarded
        if parent_transaction_id is not None:
            print(f"Forwarded email: {email_body}")
            forwarded_date = extract_forwarded_date(email_body)
            if forwarded_date:
                email_data["email_sent_date"] = forwarded_date
        
        all_emails_df = pd.concat([all_emails_df, pd.DataFrame([email_data])], ignore_index=True)
        
        extracted_emails = extract_included_email(msg)
        if extracted_emails and len(extracted_emails) == 2 and extracted_emails[0] is not None and extracted_emails[1] is not None:
            headers, email_text = extracted_emails
            nested_msg = parse_email(email_text)
            nested_email_to = headers.get('To', [parent_email_to])[0] if headers.get('To') else parent_email_to  # Use the 'To' from headers or inherit from parent
            collect_emails(nested_msg, transaction_id, nested_email_to, depth + 1, max_depth)  # Pass 'email_to' to nested emails

    collect_emails(msg)

Forwarded email: 
Phillip,

> As discussed  during our phone conversation, In a Parallon 75 microturbine
> power generation deal for a national accounts customer, I am developing a
> proposal to sell power to customer at fixed or collar/floor price. To do
> so I need a corresponding term gas price for same. Microturbine is an
> onsite generation product developed by Honeywell to generate electricity
> on customer site (degen). using natural gas. In doing so,  I need your
> best fixed price forward gas price deal for 1, 3, 5, 7 and 10 years for
> annual/seasonal supply to microturbines to generate fixed kWh for
> customer. We have the opportunity to sell customer kWh 's using
> microturbine or sell them turbines themselves. kWh deal must have limited/
> no risk forward gas price to make deal work. Therein comes Sempra energy
> gas trading, truly you.
>
> We are proposing installing 180 - 240 units across a large number of
> stores (60-100) in San Diego.
> Store number varies because of 

In [None]:
display(all_emails_df)


In [None]:
for index, row in emails_df.iterrows():
    email_str = row['message']  # Assuming the column name is 'message'
    msg = parse_email(email_str)
    
    included_emails = find_included_email_parts(msg)
    

    for included_email in included_emails:
        print(included_email)
        
        extracted_emails = extract_included_email(included_email)
        for email_text in extracted_emails:
            print(email_text)
            

In [None]:
display(all_emails_df)

In [89]:
def process_email(row):
    email_from = row['email_from']
    email_to = row['email_to']
    email_subject = row['email_subject']
    email_body = row['email_body']
    email_sent_date = row['email_sent_date']
    transaction_id = row['transaction_id']
    is_forwarded = row['is_forwarded']
    parent_transaction_id = row['parent_transaction_id']
    
    # logging.info(f"Processing email with transaction ID {transaction_id}")
        
    save_transaction_graph(email_from, email_to, email_subject, email_body, email_sent_date, transaction_id, is_forwarded, parent_transaction_id)
    # save_transaction_embedding(email_from, email_to, email_subject, email_body, email_sent_date, transaction_id)
    
    if (index + 1) % batch_size == 0:
        global batch_count
        batch_count += 1
        logging.info(f"Processed {batch_count * batch_size} emails so far.")

In [None]:
only_forwarded_emails = all_emails_df[all_emails_df['is_forwarded'] == True]
display(only_forwarded_emails)

In [119]:


# Configure logging
# logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

batch_size = 1000
batch_count = 0

flat_emails_df = pd.DataFrame()

for index, row in all_emails_df.iterrows():
    try:
        process_email(row)
                  
    except Exception as e:
        logging.error(f"Error processing email {index + 1}: {e}")

is forwarded: False
Email 49509b39346211cac6ccf03f669ee25ef025d2624877db7eb364e6044d43fcd3 is not forwarded.
is forwarded: False
Email df09fbc5b2781bc145f530930f4cf542841c43bfffe95b5d230b9c855c402e59 is not forwarded.
is forwarded: False
Email acdf5e675632fa9d7ddb2533e1dd6ddc0c541fcd9f3587464a84f28f0925a956 is not forwarded.
is forwarded: False
Email 7993f760ee9dcac7ec120b19325ed5d107457e0545955fe4d093e1d580490bc7 is not forwarded.
is forwarded: False
Email 81e1132106ff5553cae46b4c11088f9490be5c61c4787fd21b7617714652b525 is not forwarded.
is forwarded: False
Email 9f06cac4bbf2b9f28e54fa239a07375cffaeb6ea25847b63ee5ef895bb9b489b is not forwarded.
is forwarded: False
Email 014c243802b3bc07b442335ed11aeab165395c6282143ae094aeadeb325c6f36 is not forwarded.
is forwarded: False
Email 13076f4b88ebd1e57b247d4773ef17f28253c81f8f92b4ae4b8913f0f9ba34d1 is not forwarded.
is forwarded: False
Email 72211e3b02233559e285fe56b3079cfdd6bfe143f56337826043519eaf490f60 is not forwarded.
is forwarded: False

2024-07-10 15:43:49,134 - INFO - Processed 1000 emails so far.


Email 6c122f76a4f968dd2cfc1c65300ed79732255d25665e3e639d0fe49b7a912946 is not forwarded.
is forwarded: True
Error: Sender or recipient address is null.
is forwarded: False
Email f414acf731fbe3308a585e5f92c81ea11b6d389a984fb6c3dd012f098d7b1559 is not forwarded.
is forwarded: True
Error: Sender or recipient address is null.
is forwarded: False
Email 90f5678a8380c35ea38c272dd3cb27872a1010dc814c2bd24c3668fd89da05ad is not forwarded.
is forwarded: False
Email d918fdc2a6022b4181a8ac569b58c17be1cb723dfabc59461b11e667dbc45d2a is not forwarded.
is forwarded: False
Email 33ebd2a956623435856442eb472731c2e474d56a929309dd0ae62f1848a12687 is not forwarded.
is forwarded: True
Error: Sender or recipient address is null.
is forwarded: True
found forwarded email bd4835738dd1439e9fb43f8ffe9ee745a1dff6bd4e6ef9870b65f5f9b8a8ae55 with parent 7b5e3e20c72526d3d034534e7b73194d1c008335e2a4a2bcb1fbd9fef24f8faf
Email bd4835738dd1439e9fb43f8ffe9ee745a1dff6bd4e6ef9870b65f5f9b8a8ae55 is not forwarded.
is forwarded: 