# Reading Data, Transforming, and Loading into ChromaDB

In [10]:
import pandas as pd
import requests
from datetime import datetime
import zipfile
from io import BytesIO
import os
import xml.etree.ElementTree as ET
import csv
from io import StringIO
import chromadb
from chromadb.utils import embedding_functions
from decouple import config


## 1. Extract Data from Toronto Open Data Portal

To keep track of the files downloaded, but this would ideally go into a log instead if the pipeline is automated.

In [9]:
pipeline_run_filenames = []

Extracting the zip files and moving files into proper folder with dates appended.

In [11]:

# Toronto Open Data is stored in a CKAN instance. It's APIs are documented here:	
# https://docs.ckan.org/en/latest/api/
	
base_url = "https://ckan0.cf.opendata.inter.prod-toronto.ca"

# Datasets are called "packages". Each package can contain many "resources"	
# To retrieve the metadata for this package and its resources, use the package name in this page's URL:
	
url = base_url + "/api/3/action/package_show"
params = { "id": "lobbyist-registry"}
package = requests.get(url, params = params).json()

# To get resource data:	
for idx, resource in enumerate(package["result"]["resources"]):	
    # To get metadata for non datastore_active resources:
    if not resource["datastore_active"]:
        url = base_url + "/api/3/action/resource_show?id=" + resource["id"]
        resource_metadata = requests.get(url).json()
        if resource_metadata["result"]:
            resource_url = resource_metadata["result"]["url"]
            # Download the resource
            response = requests.get(resource_url)
            if response.status_code == 200:
                # Get the current date to append to the filename
                current_date = datetime.now().strftime("_%d_%m_%Y")
                
                # Open the ZIP file
                with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
                    # Extract all the contents into the directory
                    zip_file.extractall("staging/downloaded_files")
                    
                    # Rename each file in the ZIP
                    for file_info in zip_file.infolist():
                        original_filename = file_info.filename
                        new_filename = f"{original_filename.split('.')[0]}{current_date}.{original_filename.split('.')[-1]}"
                        os.rename(f"staging/downloaded_files/{original_filename}", f"staging/downloaded_files/{new_filename}")
                        pipeline_run_filenames.append(new_filename)
                break # Break after the first file for demonstration
            else:
                print("Error downloading data, status code: " + str(response.status_code))
                print(response.headers)

## 2. Transform data from XML format into CSV for easier human review 

In [12]:
# Function to recursively extract data from nested tags
def extract_data(element):
    data = {}
    for child in element:
        # If the child has no children of its own, it is a data tag
        if not list(child):
            data[child.tag] = child.text
        else:
            # If the child has children, we recursively extract their data
            data.update(extract_data(child))
    return data

The files from the most recent run of the extract stage that can be used in the transformation stage. This would ideally go into a log instead.

In [13]:
print(pipeline_run_filenames)

['lobbyactivity-closed_03_11_2023.xml', 'lobbyactivity-active_03_11_2023.xml']


In [20]:
for filename in pipeline_run_filenames:

    # Let's read the content of the XML file to understand its structure
    file_path = './staging/downloaded_files/' + filename

    # Reading the content of the file
    with open(file_path, 'r') as file:
        xml_content = file.read()

    # Parse the XML content
    root = ET.fromstring(xml_content)

    # Find all ROW elements
    rows = root.findall('.//ROW')

    # Extracting the column names (tag names) assuming all rows have the same structure
    # and all tags are direct children of the <ROW> tag
    column_names = [elem.tag for elem in rows[0]]

    # Create a new CSV in-memory file
    csv_io = StringIO()
    csv_writer = csv.writer(csv_io)

    # Initialize an empty list for column names
    column_names = []

    # Processing rows to extract column names and data
    all_row_data = []

    for row in rows:
        # Extract data from each <SM> tag, which contains the relevant data tags
        sm_elements = row.findall('.//SM')
        # Collect data from all <SM> elements (assuming there might be more than one <SM> per <ROW>)
        row_data = {}
        for sm in sm_elements:
            row_data.update(extract_data(sm))

        all_row_data.append(row_data)
        # Update the column names with any new unique tags found
        column_names.extend([key for key in row_data.keys() if key not in column_names])

    # Write the column names as the header
    csv_writer.writerow(column_names)

    # Write the data to the CSV, ensuring each row has data in the same order as the column names
    for row_data in all_row_data:
        csv_writer.writerow([row_data.get(col, None) for col in column_names])

    # Retrieve the CSV content
    csv_content = csv_io.getvalue()

    # Save the CSV content to a file
    csv_file_path = './staging/transformed_files/'+str(filename[:-4])+'.csv'
    with open(csv_file_path, 'w') as csvfile:
        csvfile.write(csv_content)

    csv_file_path  # Return the path of the created CSV file
    
    pipeline_run_filenames.append(csv_file_path)
    print("File Processed: ", csv_file_path)


File Processed:  ./staging/transformed_files/lobbyactivity-closed_03_11_2023.csv
File Processed:  ./staging/transformed_files/lobbyactivity-active_03_11_2023.csv


FileNotFoundError: [Errno 2] No such file or directory: './staging/downloaded_files/./staging/transformed_files/lobbyactivity-closed_03_11_2023.csv'

## 3. Load data into chromaDB, the vector database being used to support semantic search

Extablish connection to database and create collection object

In [None]:
# Access the API key
api_key = config('OPEN_API_KEY')

# set up the client connection to the docker container server
chroma_client = chromadb.HttpClient(host='localhost', port='8000')

print("db heartbeat: ")
print(chroma_client.heartbeat()) # returns a nanosecond heartbeat. Useful for making sure the client remains connected.

collection = chroma_client.get_or_create_collection(name="lobbying_metadata", embedding_function=openai_ef, metadata={"hnsw:space": "cosine"}) 

Read in each csv file into a pandas dataframe and write to database 

In [23]:
for pipeline_file in pipeline_run_filenames:
    if pipeline_file[-4:] == ".csv":
        print(pipeline_file)

./staging/transformed_files/lobbyactivity-closed_03_11_2023.csv
./staging/transformed_files/lobbyactivity-active_03_11_2023.csv


In [None]:
# read in the files from a dataframe

Set up the collections

In [15]:
documents = []
metadatas = []
ids = []

loop through the dataframe and add elements to the different collections

In [16]:
for index, row in df.iterrows():
    r_document = row['VARIABLEDESC']
    r_metadata = {}
    r_metadata['COUNTRY'] = row['COUNTRY']
    r_metadata['DATASETNAME'] = row['DATASETNAME']
    r_metadata['VINTAGE'] = row['VINTAGE']
    r_metadata['VARIABLENAME'] = row['VARIABLENAME']
    r_id = row['UniqueId']

    documents.append(r_document)
    metadatas.append(r_metadata)
    ids.append(r_id)

Exploring the values in each list

In [None]:
documents

In [None]:
metadatas

In [None]:
ids

In [20]:

collection = chroma_client.get_or_create_collection(name="lobbying_metadata", embedding_function=openai_ef, metadata={"hnsw:space": "cosine"}) 

### WRITING THE DATA ==================================================

# use batches of 1000 so as not to overwhelm the OpenAI API
batch_size = 1000
num_batches = (len(df) + batch_size - 1) // batch_size

for batch_num in range(num_batches):
    print("on batch" + str(batch_num) + " of " + str(num_batches) + "...")
    start_index = batch_num * batch_size
    end_index = min((batch_num + 1) * batch_size, len(df))
    batch = df.iloc[start_index:end_index]

    documents = []
    metadatas = []
    ids = []

    for index, row in batch.iterrows():
        r_document = row['VARIABLEDESC']
        r_metadata = {}
        r_metadata['COUNTRY'] = row['COUNTRY']
        r_metadata['DATASETNAME'] = row['DATASETNAME']
        r_metadata['VINTAGE'] = row['VINTAGE']
        r_metadata['VARIABLENAME'] = row['VARIABLENAME']
        r_id = str(row['UniqueId'])
        documents.append(r_document)
        metadatas.append(r_metadata)
        ids.append(r_id)

    try:
        collection.upsert(
            documents=documents,
            metadatas=metadatas,
            ids=ids
        )  
    except:
        print("\nError in batch: " + str(batch_num))
        print("IDs starting at " + str(start_index) + " and ending at " + str(end_index) + "... \n")

print("\n\n...done writing data!")

True
