-
-
Notifications
You must be signed in to change notification settings - Fork 417
/
Copy pathpipelines.py
67 lines (52 loc) · 1.99 KB
/
pipelines.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import os
import subprocess
from database.models import Gazette, initialize_database
from scrapy.exceptions import DropItem
from sqlalchemy.orm import sessionmaker
from gazette.settings import FILES_STORE
class PdfParsingPipeline:
def process_item(self, item, spider):
item['source_text'] = self.pdf_source_text(item)
for key, value in item['files'][0].items():
item[f'file_{key}'] = value
item.pop('files')
item.pop('file_urls')
return item
def pdf_source_text(self, item):
pdf_path = os.path.join(FILES_STORE, item['files'][0]['path'])
command = f'pdftotext -layout {pdf_path}'
subprocess.run(command, shell=True, check=True)
if '.pdf' in pdf_path:
text_path = pdf_path.replace('.pdf', '.txt')
else:
text_path = pdf_path + '.txt'
with open(text_path) as file:
return file.read()
class PostgreSQLPipeline(object):
def __init__(self):
engine = initialize_database()
self.Session = sessionmaker(bind=engine)
def process_item(self, item, spider):
session = self.Session()
# TEMP: The attribute "municipality_id" was recently renamed to "territory_id"
# in the database. The two following lines may be deleted once we have
# no branches using "municipality_id".
if 'municipality_id' in item:
item['territory_id'] = item.pop('municipality_id')
gazette = Gazette(**item)
try:
session.add(gazette)
session.commit()
except:
session.rollback()
raise
finally:
session.close()
return item
class GazetteDateFilteringPipeline(object):
def process_item(self, item, spider):
if hasattr(spider, 'start_date'):
if spider.start_date > item.get('date'):
raise DropItem(
'Droping all items before {}'.format(spider.start_date))
return item