Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ verify_ssl = true
[dev-packages]
pytest = "*"
apache-airflow = "==1.10.15"
apache-airflow-backport-providers-amazon = "*"
apache-airflow-backport-providers-slack = "*"
apache-airflow-backport-providers-ftp = "*"
apache-airflow-backport-providers-http = "*"
apache-airflow-backport-providers-sftp = "*"
apache-airflow-backport-providers-ssh = "*"
pylint = "*"
moto = "*"
moto = "==2.0.6"
requests-mock = "*"
paramiko = "*"
sshtunnel = "*"
sickle = "*"
lxml = "==4.6.5"
tulflow = "==0.8.4"
lxml = "==4.7.1"
tulflow = "==0.8.5"
boto3 = "*"
pexpect = "*"
pysftp = "*"
Expand Down
368 changes: 223 additions & 145 deletions Pipfile.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions cob_datapipeline/alma_electronic_notes_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from airflow.hooks.base_hook import BaseHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.s3_to_sftp_operator import S3ToSFTPOperator
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.providers.amazon.aws.transfers.s3_to_sftp import S3ToSFTPOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from cob_datapipeline.tasks.task_slack_posts import notes_slackpostonsuccess


Expand Down
4 changes: 2 additions & 2 deletions cob_datapipeline/catalog_full_reindex_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.contrib.operators.s3_list_operator import S3ListOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.amazon.aws.operators.s3_list import S3ListOperator
from cob_datapipeline.tasks.xml_parse import prepare_boundwiths, prepare_alma_data, update_variables
from cob_datapipeline.tasks.task_solr_get_num_docs import task_solrgetnumdocs
from cob_datapipeline.tasks.task_slack_posts import catalog_slackpostonsuccess
Expand Down
2 changes: 1 addition & 1 deletion cob_datapipeline/catalog_move_alma_sftp_to_s3_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tulflow import tasks
import airflow
from airflow.models import Variable
from airflow.contrib.hooks.sftp_hook import SFTPHook
from airflow.providers.sftp.hooks.sftp import SFTPHook
from airflow.operators.python_operator import PythonOperator
from cob_datapipeline.operators.batch_sftp_to_s3_operator import BatchSFTPToS3Operator
from cob_datapipeline.helpers import determine_most_recent_date
Expand Down
4 changes: 2 additions & 2 deletions cob_datapipeline/catalog_preproduction_oai_harvest_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import os
from tulflow import harvest, tasks
import airflow
from airflow.contrib.operators.s3_list_operator import S3ListOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.amazon.aws.operators.s3_list import S3ListOperator
from airflow.hooks.base_hook import BaseHook
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from cob_datapipeline.tasks.xml_parse import prepare_oai_boundwiths, update_variables
from cob_datapipeline.tasks.task_solr_get_num_docs import task_solrgetnumdocs
from cob_datapipeline.tasks.task_slack_posts import catalog_slackpostonsuccess
from airflow.operators.http_operator import SimpleHttpOperator
from cob_datapipeline import helpers

"""
Expand Down
4 changes: 2 additions & 2 deletions cob_datapipeline/catalog_production_oai_harvest_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
import os
from tulflow import harvest, tasks
import airflow
from airflow.contrib.operators.s3_list_operator import S3ListOperator
from airflow.providers.amazon.aws.operators.s3_list import S3ListOperator
from airflow.hooks.base_hook import BaseHook
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from cob_datapipeline.tasks.xml_parse import prepare_oai_boundwiths, update_variables
from cob_datapipeline.tasks.task_solr_get_num_docs import task_solrgetnumdocs
from cob_datapipeline.tasks.task_slack_posts import catalog_slackpostonsuccess
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.providers.http.operators.http import SimpleHttpOperator


"""
Expand Down
2 changes: 1 addition & 1 deletion cob_datapipeline/dspace_harvest_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.models import Variable
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.s3_list_operator import S3ListOperator
from airflow.providers.amazon.aws.operators.s3_list import S3ListOperator
from cob_datapipeline.operators.batch_s3_to_sftp_operator import BatchS3ToSFTPOperator
import airflow

Expand Down
2 changes: 1 addition & 1 deletion cob_datapipeline/operators/batch_s3_to_sftp_operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from airflow.contrib.operators.s3_to_sftp_operator import S3ToSFTPOperator
from airflow.providers.amazon.aws.transfers.s3_to_sftp import S3ToSFTPOperator
from airflow.utils.decorators import apply_defaults


Expand Down
11 changes: 5 additions & 6 deletions cob_datapipeline/operators/batch_sftp_to_s3_operator.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from airflow.contrib.operators.sftp_to_s3_operator import SFTPToS3Operator
from airflow.providers.amazon.aws.transfers.sftp_to_s3 import SFTPToS3Operator
from airflow.utils.decorators import apply_defaults


Expand All @@ -9,15 +9,15 @@ class BatchSFTPToS3Operator(SFTPToS3Operator):
:param sftp_conn_id: The sftp connection id. The name or identifier for
establishing a connection to the SFTP server.
:type sftp_conn_id: str
:param sftp_base_path: The sftp remote path where the batch of files can be found.
:param sftp_base_path: The sftp remote path where the batch of files can be found.
:type sftp_path: str
:param s3_conn_id: The s3 connection id. The name or identifier for
establishing a connection to S3
:type s3_conn_id: str
:param s3_bucket: The targeted s3 bucket. This is the S3 bucket to where
the file is uploaded.
:type s3_bucket: str
:param s3_prefix: The prefix that will be used to generate full s3 key path for each file in
:param s3_prefix: The prefix that will be used to generate full s3 key path for each file in
the batch.
:type s3_prefix: str
:param xcom_id: Id of the task which pushed the list of files to be transferred to xcom.
Expand Down Expand Up @@ -54,9 +54,8 @@ def execute(self, context):
count += 1
self.s3_key = f"{self.s3_prefix}{f}"
self.sftp_path = f"{self.sftp_base_path}/{f}"

super(BatchSFTPToS3Operator, self).execute(context)
self.log.info("Sent to s3://%s/%s", self.s3_bucket, self.s3_key)

self.log.info(f"Total Files transfered: {count}")

self.log.info(f"Total Files transfered: {count}")
3 changes: 2 additions & 1 deletion cob_datapipeline/tasks/task_slack_posts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import json
from airflow.hooks.base_hook import BaseHook
from airflow.models import Variable
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from tulflow import tasks


Expand Down