## Notebook to test the email content processing

#### includs test with Modin on Dask as distributed compute

In [2]:
import openai
from pandas import DataFrame, concat, read_csv, read_parquet    
import requests
#from langchain.text_splitter import RecursiveCharacterTextSplitter
#from langchain.vectorstores import Chroma
#from langchain.embeddings import OpenAIEmbeddings
#from langchain.docstore.document import Document
from azure.data.tables import TableServiceClient, TableEntity
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
import os
from io import BytesIO
from datetime import date
from tqdm import tqdm
from numpy import array, array_split, float32, set_printoptions
from multiprocessing import  Pool
import tiktoken
import re
from itertools import islice
import json



In [3]:
import modin
os.environ["MODIN_CPUS"] = "24"
import modin.pandas as mpd
from distributed import Client
# global variable
DASK_RUNNING = False

os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask
if not DASK_RUNNING:
        from dask.distributed import Client, LocalCluster
        cluster = LocalCluster()  # Launches a scheduler and workers locally
        client = Client(cluster)  # Connect to distributed cluster and override default
        print(f"Started cluster at {cluster.dashboard_link}")
        DASK_RUNNING = True


Started cluster at http://127.0.0.1:8787/status


In [3]:
import modin
print(modin.config.NPartitions.get())

24


#### Functions

In [4]:
OUTLOOK_CONTENT_CONNECTION_STRING = os.environ.get('OUTLOOK_CONTENT_CONNECTION_STRING')

In [None]:
#progress bar for pandas
tqdm.pandas()

In [None]:
encoding = tiktoken.get_encoding("cl100k_base")


In [None]:
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
encoding.encode("tiktoken is great")

In [5]:
#load data from azure storage table and create data frame

def load_data():
    # Create the TableServiceClient object which will be used to create a container client
    connect_str = OUTLOOK_CONTENT_CONNECTION_STRING
    table_service = TableServiceClient.from_connection_string(connect_str)
    table_name = "outlooktest"
    table_client = table_service.get_table_client(table_name) 
    documents = []
    for entity in table_client.list_entities():
        documents.append(entity)
    #df =DataFrame(documents)
    df = mpd.DataFrame(documents)
    return df


In [6]:
def clean_content(row):
    content = row['content']
    content = content.replace("\r\n", "\r")
    content = re.sub(r"\r+", "\r", content)
    content = re.sub(r"\[(.*?)\]", " ", content)
    

    return content


In [None]:
#function to break up text into chunks 
def batched(iterable, n):
    """Batch data into tuples of length n. The last batch may be shorter."""
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while (batch := tuple(islice(it, n))):
        yield batch

In [None]:
#define a function that encodes a string into tokens and then breaks it up into chunks
def chunked_tokens(text, encoding_name, chunk_length):
    encoding = tiktoken.get_encoding(encoding_name)
    tokens = encoding.encode(text)
    chunks_iterator = batched(tokens, chunk_length)
    yield from chunks_iterator

In [7]:
#unction to count tokens
def num_tokens_from_string(string: str, encoding_name: str) -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.get_encoding(encoding_name)
    num_tokens = len(encoding.encode(string))
    return num_tokens


In [None]:
def parallelize_dataframe(df, func, n_cores=8):
    df_split = array_split(df, n_cores)
    pool = Pool(n_cores)
    df = concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

In [None]:
#TO DO - SPLITS LONG CONTENTS INTO CHUNKS

In [8]:
#funciton to query chatgpt with content, ask for classification and return response
def get_completion(row):
    prompt = f"""
                Analysiere folgende Email-Unterhaltung, getrennt durch <>, nach folgenden Kriterien:
                - Sender
                - Gesendet
                - Betreff
                - Nachricht (nur Text, keine Signaturen, Adressen, Bilder, Links, Disclaimer oder Fussnoten)
                - Typ (Frage, Antwort, Information, Aufforderung, Werbung...)

                Antwort als JSON-Objekte in einer Liste. Liste sortiert nach Datum Gesendet, älteste zuerst. 
                Beispiel:
                [{{"Sender": "Max Mustermann", "Gesendet": "2021-01-01", "Betreff": "Test", "Nachricht": "Hallo Welt", "Typ": "Frage"}}]
                <{row['content']}>
                """
    try:
        if row['content_token_lenght'] < 2000:
            model = "gpt-3.5-turbo"
            max_tokens=3800 - row['content_token_lenght']
        else:
            model = "gpt-3.5-turbo-16k"
            max_tokens=15500 - row['content_token_lenght']
        messages = [{"role": "user", "content": prompt}]
        response = openai.ChatCompletion.create(
            model=model,
            messages=messages,
            temperature=0, # this is the degree of randomness of the model's output
            max_tokens=max_tokens, # this is the maximum number of tokens that the model will generate
            n=1, # this is the number of samples to return
        )
        return response
    except:
        response = {"choices": [{"text": "Error"}]}
        return response



In [74]:
df5 = load_data()
df5.shape



(1836, 10)

In [10]:
drop_list_PartitionKey = ["noreply@emeaemail.teams.microsoft.com", 'Ambassador@mc.ihg.com', 'microsoft-noreply@microsoft.com']
df = df[~df['PartitionKey'].isin(drop_list_PartitionKey)]
df.reset_index(drop=True, inplace=True)
df.shape

(1821, 10)

In [11]:
#import modin.pandas as mpd
#sample = mpd.DataFrame(get_sample())
df['content_cleaned'] = df.apply(clean_content, axis=1)


In [12]:
df['content_cleaned'] = df.apply(clean_content, axis=1)

In [13]:
#get row with longest content
df["content_length"] = df["content_cleaned"].apply(lambda x: len(x))
df["content_length"].max()

27565

In [None]:
#get index of row with longest content
df["content_length"].idxmax()

In [None]:
df.iloc[1305]["content_cleaned"]

In [14]:
#count tokens in content
df["content_token_lenght"] = df["content"].apply(lambda x: num_tokens_from_string(x, "cl100k_base"))
df["content_token_lenght"].max()



8882

In [None]:
df["content_token_lenght"].idxmax()

In [None]:
df1 = df[:2]

In [None]:
df1.iloc[0]["content_token_lenght"].dtype


In [None]:
df1.head()

In [None]:
def apply_parallel(df):
    df['content_processed'] = df.apply(get_completion, axis=1)
    return df

In [None]:
df1 = parallelize_dataframe(df1, apply_parallel)

In [16]:
df["content_processed"]= df.apply(get_completion, axis=1)



In [64]:
#replace empty lists in content_processed with empty dict
df["content_processed"] = df["content_processed"].apply(lambda x: {} if x == [] else x)

In [69]:
df_normal = df._to_pandas()

In [70]:
#function to upload data to azure blob storage
def upload_data(df):
    try:
        #Save to Azure Blob Storage
        # Create the BlobServiceClient object which will be used
        blob_service_client = BlobServiceClient.from_connection_string(OUTLOOK_CONTENT_CONNECTION_STRING)

        container_name = 'outlookcontent'
        #get today's date
        today = date.today().strftime('%Y-%m-%d')
        # Create a blob client using the local file name as the name for the blob
        file_name = today + "_outlook_data.parquet"
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=file_name)
        
        # save dataframe to csv
        #csv_file = df.to_csv(index=False)

        parquet_file = BytesIO()
        df.to_parquet(parquet_file,  engine='pyarrow')
        parquet_file.seek(0)  # change the stream position back to the beginning after writing
        response = blob_client.upload_blob(data=parquet_file, overwrite=True)

        
    except:
        print("error uploading data to blob storage")
    else:
        return response


In [None]:
upload_data(df_normal)

In [42]:
set_printoptions(linewidth=100000)

In [36]:
df_normal = df._to_pandas()

In [59]:
df_normal.to_excel("outlook1_data.xlsx")

In [58]:
l = df_normal.content_processed.to_list()

In [None]:
df.iloc[140:150]

In [43]:
df_normal.to_csv("outlook1_data2.csv", index=False)

In [33]:
df.iloc[140:150].to_csv("test.csv",sep=';', encoding='utf-8', quotechar='"', index=False)



In [None]:
df.iloc[140]["content"]

In [None]:
df.iloc[142]["content_cleaned"]

In [None]:
df.iloc[11]["content_cleaned"]

In [None]:
df1.iloc[0]["content_processed"]["choices"][0]["message"]['content'].json.loads()

In [None]:
prompt = f"""
Analysiere folgende Email-Unterhaltung, getrennt durch dreifache Anführungsstrich, nach folgenden Kriterien:
- Sender
- Gesendet
- Betreff
- Nachricht (nur Text, keine Signaturen oder Fussnoten)
- Typ (Frage, Antwort, Information, Aufforderung, Werbung...)

Antwort als JSON-Objekte in einer Liste. Liste sortiert nach Datum Gesendet, älteste zuerst. JSON-Objekte mit den Kriterien als Keys und den entsprechenden Werten.

"""

In [None]:
num_tokens_from_string(prompt, "cl100k_base")

In [None]:
from IPython.display import display, HTML

def pretty_print(text):
    return display( HTML( text.replace("\\r","<br>") ) )

In [None]:
pretty_print(df.content_cleaned[11])

In [None]:
df.content_cleaned[1305]

In [None]:
import re

def split_string_by_email(text):
    # Use a regex to split the string at each 'Von:' followed by an email (up to the next '<')
    return re.split(r'Von:.*?<', text)

example_string = df.iloc[11]["content_cleaned"]  # Your string here

split_result = split_string_by_email(example_string)

for i, part in enumerate(split_result):
    print(f"Part {i}:")
    print(part)
    print()

In [73]:
df_download.shape

(1821, 14)

In [None]:
#clean json string to be able to convert to json
df_download["content_processed"] = df_download["content_processed"].apply(lambda x: x.replace("\'", '"'))

In [None]:
#convert string to json
df1["content_processed_content"] = df1["content_processed"].apply(lambda x: x["choices"])

In [None]:
import ast
ast.literal_eval(df1.iloc[1]["content_processed_content"]["choices"])

In [None]:
df1.iloc[1]["content_processed_content"]

In [None]:
prompt = f"""How is the regex for multiple new lines?"""

In [None]:
messages = [{"role": "user", "content": prompt}]
response = openai.ChatCompletion.create(
    model="gpt-3.5-turbo",
    messages=messages,
    temperature=0, # this is the degree of randomness of the model's output
    max_tokens=4000, # this is the maximum number of tokens that the model will generate
    n=1, # this is the number of samples to return
)

In [None]:
response

In [45]:
df_temp = load_data()



In [46]:
df_temp.to_csv("outlook1_data3.csv", index=False)



In [47]:
df_save = df.copy()

In [48]:
df_save2 = df.copy()


In [61]:
#replace empty lists in content_processed with empty dict
df_save["content_processed"] = df_save["content_processed"].apply(lambda x: {} if x == [] else x)

In [63]:
df_save.to_parquet("outlook1_data.parquet", engine='pyarrow')