## Description
This notebook demonstrates the usage of Unstructured Data Ingestion APIs. 

## Usage Instructions
Run each cell sequentially to execute the notebook.
Note some cells are for reference and in order to not accidently excute them, they are marked as "Markdown".

In [None]:
IPADDRESS = "localhost" #Replace this with the correct IP address
UNSTRUCTURED_DATA_PORT = "8086"

## Document Ingestion
#### Get health of the document ingest service

In [None]:
import requests

url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/health'
print(url)
headers = {
    'accept': 'application/json'
}

response = requests.get(url, headers=headers)

# Print the response
print(response.status_code)
print(response.json())

#### Ingest Manuals (pdf)

In [None]:
import requests
import os
# URL of the API endpoint
url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/documents'
# Path to the PDF file you want to upload
directory_path = '../data/manuals_pdf'

# Loop through all files in the directory
for filename in os.listdir(directory_path):
    # Check if the file is a PDF
    if filename.endswith('.pdf'):
        file_path = os.path.join(directory_path, filename)

        # Open the file in binary mode and send it in a POST request
        with open(file_path, 'rb') as file:
            files = {'file': file}
            response = requests.post(url, files=files)

        # Print the response from the server
        print(f'Uploaded {filename}: {response.status_code}')
        print(response.json())

#### Ingest FAQs (pdf)

In [None]:
# URL of the API endpoint
import requests
url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/documents'
# Open the file in binary mode and send it in a POST request
filename = "../data/FAQ.pdf"
with open(filename, 'rb') as file:
    files = {'file': file}
    response = requests.post(url, files=files)

# Print the response from the server
print(f'Uploaded {filename}: {response.status_code}')
print(response.json())

#### Get the list of documents

In [None]:
import requests

# URL of the API endpoint
url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/documents'

# Send the GET request
response = requests.get(url)

# Print the response from the server
print(f'Response Status Code: {response.status_code}')
#print(response.json())

# Check if the request was successful
if response.status_code == 200:
    data = response.json()
    documents = data.get('documents', [])

    # Format and print the list of documents
    print("Available Documents:")
    for idx, document in enumerate(documents, start=1):
        print(f"{idx}. {document}")
else:
    print(f"Failed to retrieve documents. Status Code: {response.status_code}")

## Ingesting Product information from gear-store.csv

Since the data is in csv file, but we support txt file for unstructured data ingestion. We will convert data into multiple text files and ingest them.

### Display Data in csv file

In [None]:
%%capture output
! pip install pandas
! pip install psycopg2-binary

In [None]:
import pandas as pd

# Read the CSV file
df = pd.read_csv('../data/gear-store.csv')

In [None]:
from IPython.display import display
display(df.head())

In [None]:
len(df)

### Create *.txt file from csv data to ingest in canonical RAG

In [None]:
import os
import re

# Function to create a valid filename
def create_valid_filename(s):
    # Remove invalid characters and replace spaces with underscores
    s = re.sub(r'[^\w\-_\. ]', '', s)
    return s.replace(' ', '_')

# Create the directory if it doesn't exist
os.makedirs('../data/product', exist_ok=True)

# Iterate through each row in the DataFrame
for index, row in df.iterrows():
    # Create filename using name, category, and subcategory
    filename = f"{create_valid_filename(row['name'])}_{create_valid_filename(row['category'])}_{create_valid_filename(row['subcategory'])}.txt"

    print(f"Creating file {filename}, current index {index}")
    # Full path for the file
    filepath = os.path.join('../data/product', filename)

    # Create the content for the file
    content = f"Name: {row['name']}\n"
    content += f"Category: {row['category']}\n"
    content += f"Subcategory: {row['subcategory']}\n"
    content += f"Price: ${row['price']}\n"
    content += f"Description: {row['description']}\n"

    # Write the content to the file
    with open(filepath, 'w', encoding='utf-8') as file:
        file.write(content)

print(f"Created {len(df)} files in ../data/product")

### Ingest data from newly created text file in canonical RAG

In [None]:
import requests
import os
import time

# Retry configuration (Added due to rate limits for API Catalog embedding model )
MAX_RETRIES = 5
INITIAL_BACKOFF = 1  # Initial backoff in seconds

def ingest_file(filepath: str) -> bool:
    """
    Ingest file in canonical RAG retriever with retry mechanism

    Args:
        filepath: Path to the file to be ingested in retriever

    Returns:
        bool: Status of file ingestion
    """
    # URL of the API endpoint
    url = f'http://{IPADDRESS}:{UNSTRUCTURED_DATA_PORT}/documents'
    retries = 0
    backoff = INITIAL_BACKOFF

    while retries <= MAX_RETRIES:
        with open(filepath, 'rb') as file:
            files = {'file': file}
            try:
                response = requests.post(url, files=files)

                if response.status_code == 200:
                    return True
                elif response.status_code != 200:  # Handle Too Many Requests error
                    if retries < MAX_RETRIES:
                        print(f"Internal Server error for {os.path.basename(filepath)}. Retrying after {backoff}s...")
                        time.sleep(backoff)
                        backoff *= 2  # Exponential backoff
                        retries += 1
                    else:
                        print(f"Max retries reached for {os.path.basename(filepath)}. Giving up.")
                        return False

            except requests.exceptions.RequestException as e:
                print(f"Request failed for {os.path.basename(filepath)}: {e}")
                return False

    return False

In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed

directory_path = '../data/product'
max_workers = 5  # Adjust this based on your system's capabilities and API limits

filepaths = [os.path.join(directory_path, filename) for filename in os.listdir(directory_path) if filename.endswith(".txt")]
filepaths

successfully_ingested = []
failed_ingestion = []

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_to_file = {executor.submit(ingest_file, filepath): filepath for filepath in filepaths}

    for future in as_completed(future_to_file):
        filepath = future_to_file[future]
        try:
            if future.result():
                print(f"Successfully Ingested {os.path.basename(filepath)}")
                successfully_ingested.append(filepath)
            else:
                print(f"Failed to Ingest {os.path.basename(filepath)}")
                failed_ingestion.append(filepath)
        except Exception as e:
            print(f"Exception occurred while ingesting {os.path.basename(filepath)}: {e}")
            # traceback.print_exc()
            failed_ingestion.append(filepath)

print(f"Total files successfully ingested: {len(successfully_ingested)}")
print(f"Total files failed ingestion: {len(failed_ingestion)}")

#### (For reference) Delete a document

The cell is in "raw" and does not execute. This code is for reference alone.

### Ingest the customer order history data into a postgres db

In [None]:

import csv
import re
import psycopg2
from datetime import datetime

# Database connection parameters
db_params = {
    'dbname': 'customer_data',
    'user': 'postgres',
    'password': 'password',
    'host': IPADDRESS,  # e.g., 'localhost' or the IP address
    'port': '5432'   # e.g., '5432'
}

# CSV file path
csv_file_path = '../data/orders.csv'

# Connect to the database
conn = psycopg2.connect(**db_params)
cur = conn.cursor()

# Create the table if it doesn't exist
create_table_query = '''
CREATE TABLE IF NOT EXISTS customer_data (
    customer_id INTEGER NOT NULL,
    order_id INTEGER NOT NULL,
    product_name VARCHAR(255) NOT NULL,
    product_description VARCHAR NOT NULL,
    order_date DATE NOT NULL,
    quantity INTEGER NOT NULL,
    order_amount DECIMAL(10, 2) NOT NULL,
    order_status VARCHAR(50),
    return_status VARCHAR(50),
    return_start_date DATE,
    return_received_date DATE,
    return_completed_date DATE,
    return_reason VARCHAR(255),
    notes TEXT,
    PRIMARY KEY (customer_id, order_id)
);
'''
cur.execute(create_table_query)

# Open the CSV file and insert data
with open(csv_file_path, 'r') as f:
    reader = csv.reader(f)
    next(reader)  # Skip the header row

    for row in reader:
        # Access columns by index as per the provided structure
        order_id = int(row[1])  # OrderID
        customer_id = int(row[0])  # CID (Customer ID)

        # Correcting the order date to include time
        order_date = datetime.strptime(row[4], "%Y-%m-%dT%H:%M:%S")  # OrderDate with time

        quantity = int(row[5])  # Quantity

        # Handle optional date fields with time parsing
        return_start_date = datetime.strptime(row[9], "%Y-%m-%dT%H:%M:%S") if row[9] else None  # ReturnStartDate
        return_received_date = datetime.strptime(row[10],"%Y-%m-%dT%H:%M:%S") if row[10] else None  # ReturnReceivedDate
        return_completed_date = datetime.strptime(row[11], "%Y-%m-%dT%H:%M:%S") if row[11] else None  # ReturnCompletedDate

        # Clean product name
        product_name = re.sub(r'[®™]', '', row[2])  # ProductName

        product_description = re.sub(r'[®™]', '', row[3])
        # OrderAmount as float
        order_amount = float(row[6].replace(',', ''))

        # Insert data into the database
        cur.execute(
            '''
            INSERT INTO customer_data (
                customer_id, order_id, product_name, product_description, order_date, quantity, order_amount,
                order_status, return_status, return_start_date, return_received_date,
                return_completed_date, return_reason, notes
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ''',
            (customer_id, order_id, product_name, product_description, order_date, quantity, order_amount,
             row[7],  # OrderStatus
             row[8],  # ReturnStatus
             return_start_date, return_received_date, return_completed_date,
             row[12],  # ReturnReason
             row[13])  # Notes
        )

# Commit the changes and close the connection
conn.commit()
cur.close()
conn.close()

print("CSV Data imported successfully!")

#### Read the data to ensure it was written 

In [None]:
import psycopg2

# Database connection parameters
db_params = {
    'dbname': 'customer_data',
    'user': 'postgres',
    'password': 'password',
    'host': IPADDRESS,  # e.g., 'localhost' or the IP address
    'port': '5432'   # e.g., '5432'
}

# Connect to the database
conn = psycopg2.connect(**db_params)
cur = conn.cursor()

# Query to select the first 5 rows from the customer_data table
query = 'SELECT * FROM customer_data LIMIT 5;'

# Execute the query
cur.execute(query)

# Fetch the column headers
colnames = [desc[0] for desc in cur.description]

# Fetch the first 5 rows
rows = cur.fetchall()

# Print the headers and the corresponding rows
for i, row in enumerate(rows, start=1):
    print(f"\nRow {i}:")
    for header, value in zip(colnames, row):
        print(f"{header}: {value}")

# Close the connection
cur.close()
conn.close()

#### (For reference)Drop the postgres table

The cell is in raw format and does not execute. This code is for reference alone.