In [None]:
def preprocess_file(input_path, output_path):
    with open(input_path, 'r', encoding='ISO-8859-1') as infile, open(output_path, 'w', encoding='utf-8') as outfile:
        current_record = ''
        for line in infile:
            if line.count('|') == 10:  # Identifies a new record
                if current_record:  # If there's an ongoing record, write it before starting a new one
                    outfile.write(current_record + '\n')
                current_record = line.rstrip('\n')  # Start a new record
            else:
                # Part of the ongoing BillText; append including a space to avoid word merging
                current_record += ' ' + line.rstrip('\n')
        # Write the last record if it exists
        if current_record:
            outfile.write(current_record)

# Example usage
input_path = "../data_storage/legislation/complete_full_text.txt"
output_path = "../data_storage/legislation/preprocessed_full_text.txt"
preprocess_file(input_path, output_path)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Parse Legislation") \
    .getOrCreate()

# File path to the preprocessed file
file_path = "../data_storage/legislation/preprocessed_full_text.txt"

schema = "BillID STRING, StateCode STRING, StateBillID STRING, ShortBillName STRING, Created STRING, SponsorParty STRING, billtype STRING, status STRING, CommitteeCategories STRING, statesummary STRING, BillText STRING"
column_names = ['BillID', 'StateCode', 'StateBillID', 'ShortBillName', 'Created', 'SponsorParty', 'billtype', 'status', 'CommitteeCategories', 'statesummary', 'BillText']

# Load data
data = spark.read.option("delimiter", "|").csv(file_path, schema=schema)

# Combine remaining columns into a single column if there are more than 11 columns
remaining_columns = data.columns[10:]
data = data.withColumn("BillText", concat_ws("|", *[data[col] for col in remaining_columns]))

# Select only the first 11 columns
data = data.select(column_names)

data.show()

csv_file_path = "../data_storage/legislation/complete_full_text.csv"
data.write.mode("overwrite").option("header", "true").csv(csv_file_path)



# csv_directory_path = "../data_storage/legislation/complete_full_text.csv"

# Load all part-files from the directory into a DataFrame
# df = spark.read.csv(path=csv_directory_path, header=True, inferSchema=True)
# df.toPandas().to_csv("../data_storage/legislation/complete_full_text_pdf.csv", index=False)

# # Stop Spark session
# spark.stop()



In [6]:
from datetime import datetime
import pandas as pd
import requests
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from InstructorEmbedding import INSTRUCTOR

import weaviate
import json
import os
from dotenv import load_dotenv


load_dotenv()
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
weaviate_api_key = os.getenv('WEAVIATE_API_KEY')
weaviate_url = os.getenv('WEAVIATE_URL')
openai_key = os.getenv('OPENAI_API_KEY')
billtrack50_api_key = os.getenv('bill_tracker_api_key')

client = weaviate.Client(
    url = weaviate_url,
    auth_client_secret=weaviate.AuthApiKey(api_key=weaviate_api_key), 
    additional_headers = {
        "X-OpenAI-Api-Key": openai_key
    }
)


if client.schema.exists("Legislation"):
    client.schema.delete_class("Legislation")
    
# for pre-vectorized data
class_obj = {
    "class": "Legislation",
    "vectorizer": "none",
    "moduleConfig": {
        "generative-openai": {}  # Ensure the `generative-openai` module is used for generative queries
    }
}

client.schema.create_class(class_obj)


# for pre-vectorized data        
client.batch.configure(batch_size=100)
with client.batch as batch:
    for index, row in df.iterrows():
        properties = {
            "billID": row['billID'],
            "stateBillID": row['stateBillID'],
            "stateCode": row['stateCode'],            
            "billName": row['billName'],
            "summary": row['summary'],
            "sponsorCount": row['sponsorCount'],
            "sponsors": row['sponsors'],
            "subjects": row['subjects'],
            "keyWords": row['keyWords'],
            "actions": row['actions'],	
            "lastAction": row['lastAction'],
            "actionDate": row['actionDate'],
            "votes": row['votes'],
            "billProgress": row['billProgress'],	
            "officialDocument": row['officialDocument'],
            "created": row['created']   
        }
        # concatenated_vector = row['billName_vector'] + row['summary_vector'] + row['keyWords_vector']

        # Adjust the property names and structure according to your schema requirements
        batch.add_data_object(properties, "Legislation", vector=row['summary_vector'])



24/04/08 19:09:10 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 21) 1]
java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:323)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.I

Py4JJavaError: An error occurred while calling o58.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 21) (192.168.0.82 executor driver): java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:323)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:323)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:105)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


In [5]:
import pandas as pd
import glob

# Define the path and get all CSV files
csv_file_path = "../data_storage/legislation/complete_full_text.csv/*.csv"
csv_files = glob.glob(csv_file_path)

# Define your column names here based on your dataset's requirements
column_names = ['BillID', 'StateCode', 'StateBillID', 'ShortBillName', 'Created', 'SponsorParty', 'billtype', 'status', 'CommitteeCategories', 'statesummary', 'BillText']

dfs = []

for file in csv_files[]:
    processed_rows = []
    with open(file, 'r', encoding='utf-8') as f:
        next(f)
        for line in f:
            # Split the line into fields
            fields = line.strip().split(',')

            # Handle extra fields for the 4th column (ShortBillName)
            if len(fields) > 4:
                # Combine fields into the 4th column until what would be the start of the 5th column
                fields[3:10] = [' '.join(fields[3:10])]
                # Ensure no more than 11 fields before processing the 10th column
                fields = fields[:11]

            # Now, handle extra fields for the 10th column (statesummary)
            if len(fields) > 11:
                # Combine fields beyond the 11th into the 10th field
                fields[10:] = [' '.join(fields[10:])]
                fields = fields[:11]
            
            processed_rows.append(fields)
            
    # Create a DataFrame for each file's processed rows with the defined column names
    df = pd.DataFrame(processed_rows, columns=column_names)
    dfs.append(df)

# Concatenate all DataFrames from each file into one
combined_df = pd.concat(dfs, ignore_index=True)


In [6]:
combined_df.head(330)

Unnamed: 0,BillID,StateCode,StateBillID,ShortBillName,Created,SponsorParty,billtype,status,CommitteeCategories,statesummary,BillText
0,1623584,MI,SR0039,"""A resolution to recognize April 27 2023 as ...",Signed/Enacted/Adopted,,"""A RESOLUTION TO RECOGNIZE APRIL 27",2023,AS SURVIVORSSPEAK MICHIGAN DAY,"A PART OF NATIONAL CRIME VICTIMS RIGHTS WEEK""","""Michigan MI SR 0039 MI SR0039 MISR0039 MI SR ..."
1,1623654,AK,HB181,State Commission For Civil Rights 2023-04-26 1...,"""Alaska AK HB 181 AK HB181 AKHB181 AK HB 181 A...",cause includes incompetence,neglect of duty,and misconduct in office,and public statements and public or pr...,the governor shall provide a copy of the char...,or by counsel
2,1623695,MI,HB4474,Crime victims: other; elements for commission ...,"entitled\""The Michigan penal code","\""by amending section 147b (MCL 750.147b)","as added by 1988 PA 371.""","""Michigan MI HB 4474 MI HB4474 MIHB4474 MI HB ...","entitled\""The Michigan penal code","\""by amending section 147b (MCL 750.147b)",as added by 1988 PA 371. \t \t\t \t\...
3,1623696,ME,LD1833,"""An Act to Amend the Definition of \""Education...","""Maine ME LD 1833 ME LD1833 MELD1833 ME LD 183...",sub-§2-A,as amended by PL 1995,c. 393,§4,is further amended to read: 2-A. Educatio...,any public post-secondarypostsecondary inst...
4,1623715,IL,SR0225,SIKH HERITAGE MONTH 2023-04-26 14:31:05.063000...,"""Illinois IL SR 0225 IL SR0225 ILSR0225 IL SR ...",The United States is enriched by the diversit...,The Sikh community,which originated in Punjab,India and began immigrating into the United S...,has played an important role in developing Il...,Sikhism is the world's fifth-largest religion
...,...,...,...,...,...,...,...,...,...,...,...
325,1521088,NY,S01584,"""Prohibits certain student organizations which...","""AN ACT to amend the education law",in relation to certain student organizations ...,"""New York NY S 01584 NY S01584 NYS01584 NY S 1...",CUNY or community colleges. AN ACT to amend ...,in relation to certain student organizations ...,represented in Senate and Assembly,do enact as follows: \t\t Section 1. Sect...
326,1521119,NY,S01532,Requires the board of education and the truste...,in relation to requiring the board of educati...,"""New York NY S 01532 NY S01532 NYS01532 NY S 1...",in relation to requiring the board of educati...,represented in Senate and Assembly,do enact as follows: \t\t Section 1. Sect...,but are not limited to policies which: a. ens...,including using pronouns and names consi...
327,1521185,NY,S01565,Requires pet dealers and pet shops to provide ...,in relation to requiring pet dealers and pet ...,"""New York NY S 01565 NY S01565 NYS01565 NY S 1...",in relation to requiring pet dealers and pet ...,represented in Senate and Assembly,do enact as follows: \t\t Section 1. This...,as added by chapter 259 of the laws of 2000,subdivision 1 as amended by chapter 110 of th...
328,1521196,NY,S01556,"""Relates to an increase in punishment for cert...","""AN ACT to amend the penal law",in relation to on duty auxiliary police offic...,"""New York NY S 01556 NY S01556 NYS01556 NY S 1...",assault or menacing of such officer. AN ACT ...,in relation to on duty auxiliary police offic...,represented in Senate and Assembly,do enact as follows: \t\t Section 1. Sect...


In [7]:
import pandas as pd
import glob

# Define the path and get all CSV files
csv_file_path = "../data_storage/legislation/complete_full_text.csv/*.csv"
csv_files = glob.glob(csv_file_path)

# Define your column names here based on your dataset's requirements
column_names = ['BillID', 'StateCode', 'StateBillID', 'ShortBillName', 'Created', 'SponsorParty', 'billtype', 'status', 'CommitteeCategories', 'statesummary', 'BillText']

dfs = []

for file in csv_files:
    processed_rows = []
    with open(file, 'r', encoding='utf-8') as f:
        next(f)  # Skip the first line if it's headers
        for line in f:
            # Initialize an empty list to collect the split fields
            fields = []
            field_start = 0
            in_quotes = False
            for i, char in enumerate(line):
                # Toggle in_quotes status on quote
                if char == '"':
                    in_quotes = not in_quotes
                # Split on commas not within quotes
                elif char == ',' and not in_quotes:
                    fields.append(line[field_start:i])
                    field_start = i + 1
            # Add the last field
            fields.append(line[field_start:].strip())

            # Handle excess fields by combining them into the appropriate columns
            if len(fields) > 11:
                fields[3] = ','.join(fields[3:10])  # Combine into the 4th field
                fields[10] = ','.join(fields[10:])  # Combine remaining fields into the 11th
                fields = fields[:11]  # Keep only the first 11 fields
            
            processed_rows.append(fields)

    # Create a DataFrame for the processed rows for this file
    df = pd.DataFrame(processed_rows, columns=column_names)
    dfs.append(df)

# Concatenate all DataFrames into one
combined_df = pd.concat(dfs, ignore_index=True)


In [14]:
combined_df = pd.read_csv("../data_storage/legislation/complete_cleaned_full_text.csv")
combined_df.to_csv("../data_storage/legislation/complete_cleaned_full_text.csv", index=False)