This repository has been archived by the owner on Dec 2, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
638 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from .sql import SQLDatastore | ||
|
||
|
||
def get_datastore(): | ||
datastore = SQLDatastore() | ||
return datastore |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from sqlalchemy.ext.automap import automap_base | ||
from sqlalchemy.orm import Session | ||
from sqlalchemy import create_engine, desc | ||
|
||
from datapackage_pipelines_measure.config import settings | ||
|
||
import logging | ||
log = logging.getLogger(__name__) | ||
|
||
|
||
class SQLDatastore(): | ||
|
||
def __init__(self): | ||
self.Base = automap_base() | ||
self.engine = create_engine(settings.DB_ENGINE) | ||
# Reflect the tables | ||
self.Base.prepare(self.engine, reflect=True) | ||
|
||
def get_latest_from_table(self, filter, table): | ||
''' | ||
Get the most recent row from a table with the passed filter. | ||
Return result as a dict (or None). | ||
''' | ||
try: | ||
Table = self.Base.classes[table] | ||
except KeyError: | ||
# No table in database | ||
return None | ||
|
||
session = Session(self.engine) | ||
row = session.query(Table) \ | ||
.order_by(desc(Table.timestamp)) \ | ||
.filter_by(**filter) \ | ||
.first() | ||
|
||
if row is None: | ||
return None | ||
|
||
# Return row's columns as dict | ||
return dict((col, getattr(row, col)) | ||
for col in row.__table__.columns.keys()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
87 changes: 72 additions & 15 deletions
87
datapackage_pipelines_measure/pipeline_steps/social_media.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,80 @@ | ||
import os | ||
|
||
ROOT_PATH = os.path.join(os.path.dirname(__file__), '..', '..') | ||
from datapackage_pipelines_measure.config import settings | ||
|
||
|
||
DOWNLOADS_PATH = os.path.join(os.path.dirname(__file__), '../../downloads') | ||
|
||
label = 'social-media' | ||
|
||
|
||
def add_steps(steps: list, pipeline_id: str, | ||
project_id: str, config: dict) -> list: | ||
return steps + [ | ||
('add_resource', { | ||
'name': 'test_resource', | ||
'url': 'https://docs.google.com/spreadsheets/d/' + | ||
'1vbhTuMDNCmxdo2rPkkya9v6X1f9eyqvSGsY5YcxlcLk/' + | ||
'edit#gid=0' | ||
}), | ||
('stream_remote_resources', {}), | ||
('measure.capitalise', {}), | ||
('dump.to_path', { | ||
'out-path': | ||
'{}/downloads/{}'.format(ROOT_PATH, pipeline_id) | ||
}) | ||
] | ||
for entity in config['twitter']['entities']: | ||
steps.append(('measure.add_twitter_resource', { | ||
'entity': entity, | ||
'project_id': project_id | ||
})) | ||
|
||
steps.append(('concatenate', { | ||
'target': { | ||
'name': 'social-media', | ||
'path': 'data/social-media.json'}, | ||
'fields': { | ||
'entity': [], | ||
'entity_type': [], | ||
'source': [], | ||
'date': [], | ||
'followers': [], | ||
'mentions': [], | ||
'interactions': []} | ||
})) | ||
|
||
steps.append(('set_types', { | ||
'types': { | ||
'entity': { | ||
'type': 'string', | ||
}, | ||
'entity_type': { | ||
'type': 'string' | ||
}, | ||
'source': { | ||
'type': 'string' | ||
}, | ||
'date': { | ||
'type': 'date', | ||
}, | ||
'followers': { | ||
'type': 'integer' | ||
}, | ||
'mentions': { | ||
'type': 'integer' | ||
}, | ||
'interactions': { | ||
'type': 'integer' | ||
} | ||
} | ||
})) | ||
|
||
steps.append(('measure.add_project_name', {'name': project_id})) | ||
steps.append(('measure.add_timestamp')) | ||
steps.append(('measure.add_uuid')) | ||
|
||
# temporarily dump to path for development | ||
steps.append(('dump.to_path', { | ||
'out-path': '{}/{}'.format(DOWNLOADS_PATH, pipeline_id) | ||
})) | ||
|
||
steps.append(('dump.to_sql', { | ||
'engine': settings.DB_ENGINE, | ||
'tables': { | ||
'socialmedia': { | ||
'resource-name': 'social-media', | ||
'mode': 'update', | ||
'update_keys': ['entity', 'entity_type', | ||
'source', 'project_id', 'date'] | ||
} | ||
} | ||
})) | ||
|
||
return steps |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.