-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.py
151 lines (127 loc) · 5.81 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import json
import os
import requests
import logging
from kafka import KafkaConsumer
from dotenv import load_dotenv
from datetime import datetime
import time
from file_processing_utils import DocumentProcessor
from get_llm import get_llm
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class FileProcessingPipeline:
def __init__(self):
# Configuration
self.kafka_bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092")
self.kafka_topic = os.getenv("KAFKA_TOPIC", "file_uploads")
self.download_dir = os.getenv("DOWNLOAD_DIR", "downloaded_files")
self.processed_dir = os.getenv("PROCESSED_DIR", "processed_files")
self.llm_api_key = os.getenv("GOOGLE_API_KEY")
self.llm = get_llm(api_key = self.llm_api_key)
self.api_base_url = os.getenv("INFERENCE_API_BASE_URL")
self.document_processor = DocumentProcessor(llm=self.llm, api_base_url=self.api_base_url)
# Ensure directories exist
os.makedirs(self.download_dir, exist_ok=True)
os.makedirs(self.processed_dir, exist_ok=True)
# Initialize Kafka consumer
self.consumer = KafkaConsumer(
self.kafka_topic,
bootstrap_servers=self.kafka_bootstrap_servers,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
key_deserializer=lambda x: json.loads(x.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='file_processing_group'
)
logger.info(f"Connected to Kafka topic: {self.kafka_topic}")
def download_file(self, download_url, file_path):
"""Download file from the provided URL"""
try:
response = requests.get(download_url, stream=True)
response.raise_for_status()
with open(file_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
return True
except Exception as e:
logger.error(f"Error downloading file: {str(e)}")
return False
def process_file(self, file_path, message):
"""Process the downloaded file"""
try:
# Get file extension
_, ext = os.path.splitext(message['originalFileName'])
# Create processed file path
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
processed_filename = f"processed_{message['fileId']}_{timestamp}{ext}"
processed_path = os.path.join(self.processed_dir, processed_filename)
# First, copy the file to processed directory
with open(file_path, 'rb') as source, open(processed_path, 'wb') as dest:
dest.write(source.read())
# Now process the copied file
doc_elements = self.document_processor.process_document(
path=self.processed_dir, # Pass the directory path
file_name=processed_filename, # Pass just the filename
image_folder="figures"
)
success = self.document_processor.push_to_api(doc_elements=doc_elements)
if success:
logger.info("[Success]: Parsed document and fed to RAG Store")
else:
logger.error("[Fail]: Unable to parse document and feed to RAG Store")
logger.info(f"File processed successfully: {processed_filename}")
return True
except Exception as e:
logger.error(f"Error processing file: {str(e)}")
return False
def run(self):
"""Main processing loop"""
try:
logger.info("Starting file processing consumer...")
for message in self.consumer:
try:
value = message.value
logger.info(f"Received message: {value['fileId']}")
# Create file path
file_name = value['originalFileName']
file_path = os.path.join(self.download_dir, file_name)
# Download file
if self.download_file(value['downloadUrl'], file_path):
logger.info(f"File downloaded successfully: {file_name}")
# Process file
if self.process_file(file_path, value):
# Clean up downloaded file
os.remove(file_path)
logger.info(f"Processed and cleaned up: {file_name}")
# Commit the offset
self.consumer.commit()
else:
logger.error(f"Failed to process file: {file_name}")
else:
logger.error(f"Failed to download file: {file_name}")
except Exception as e:
logger.error(f"Error processing message: {str(e)}")
continue
except KeyboardInterrupt:
logger.info("Shutting down consumer...")
finally:
self.cleanup()
def cleanup(self):
"""Cleanup resources"""
try:
self.consumer.close()
logger.info("Consumer closed successfully")
except Exception as e:
logger.error(f"Error during cleanup: {str(e)}")
if __name__ == "__main__":
processor = FileProcessingPipeline()
logger.info("Processing pipeline is up!")
processor.run()