# GCP Pipeline: SFTP >> GCS >> Big Query

Instruction on how to set up a pipeline triggered by a file "PUT" to an sftp server.
Set up steps:
1. SFTP server
2. GCS bucket mirroring/sync on SFTP server
3. Webhook trigger using cloud functions
4. Airflow DAG (Composer)

# 1. SFTP Server

Under Compute Engine, create a VM instace with the minimal compute and storage. This server will act as a pass-through to GCS.

Since external sources will be connecting to this server, a static IP will need to be assigned. (Ephemeral by default)

Specs I used for an hourly 200mb file dump:
- machine type: f1-micro, 1vCPU, 0.6 GB memory
- debian-9-stretch image
- 10 GB SSD storage

# 2. GCS Mirror/Sync

Mount GCS bucket onto SFTP server directory.

1. GCS UI: Create a new bucket with unique name

2. SFTP Server: Install lftp

In [None]:
sudo apt-get install lftp

3. SFTP Server: Install gcsfuse

In [None]:
export GCSFUSE_REPO=gcsfuse-`lsb_release -c -s`
echo "deb http://packages.cloud.google.com/apt $GCSFUSE_REPO main" | sudo tee /etc/apt/sources.list.d/gcsfuse.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add -

sudo apt-get update
sudo apt-get install gcsfuse

4. SFTP Server: Mount GCS Bucket

Replace < replace-by-bucket-name > by bucket name (created in GCS UI). Do not use gs://

In [None]:
mkdir ~/gcsBucket
gcsfuse <replace-by-bucket-name> ~/gcsBucket

Test directory and GCS mirror:
- create blank .txt file >> touch test.txt
- move file to mounted dir >> mv /file_path/test.txt /directory_path/mounted_gcsBucket
- (Ignore date warning while moving file)
- open GCS UI and verify file has been moved over

# 3. Cloud Functions - Webhook

1. Enable the Cloud Composer, Cloud Functions, and Cloud Identity and Access Management (Cloud IAM) APIs.

Link: https://console.cloud.google.com/flows/enableapi?apiid=cloudfunctions,iam,composer&redirect=https://console.cloud.google.com&_ga=2.93559517.1591282240.1584371833-334489940.1583948449

2. Grant blob signing permissions to the Cloud Functons Service Account:

To authenticate to IAP, grant the Appspot Service Account (used by Cloud Functions) the Service Account Token Creator role on itself. To do this, execute the following command in the gcloud command-line tool or Cloud Shell:

Run in the cloud console:

Insert/swap out below: your-project-id

In [None]:
gcloud iam service-accounts add-iam-policy-binding \
your-project-id@appspot.gserviceaccount.com \
--member=serviceAccount:your-project-id@appspot.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

3. Get the client ID

To construct a token to authenticate to IAP, the function requires the client ID of the proxy that protects the Airflow webserver. The Cloud Composer API does not provide this information directly. Instead, make an unauthenticated request to the Airflow webserver and capture the client ID from the redirect URL. The following Python code sample demonstrates how to get the client ID. After executing this code on the command line or in Cloud Shell, your client ID will be printed.

Run python script in cloud console:

Uncomment and update variables in script below--

project_id = 'YOUR_PROJECT_ID'

location = 'us-central1' # Choose your preferred region

composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'


In [None]:
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
composer_response = authed_session.request('GET', environment_url)
environment_data = composer_response.json()
airflow_uri = environment_data['config']['airflowUri']

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string['client_id'][0])

CLIENT ID: Save/keep track of client ID for later!

4. Create cloud function

Navigate over to the cloud functions UI in GCP: create a function 

- Name: your_dag_trigger_function
- Memory Allocated: 128 MiB
- Trigger: Cloud Storage
- Event Type: Finalize/Create
- Bucket: your-gcs-bucket-name
- Source code: Inline editor
- Runtime: Python 3.7
- Function to execute: trigger_your_dag


### main.py & requirements.txt will need to be edited:

### requirements.txt:

In [None]:
requests_toolbelt==0.9.1
google-auth==1.11.2

### main.py: 3 edits required in line 25 (script can be copied and pasted edits are made)
- client_id: 'This was printed in the gconsole in step 3'
- webserver_id: 'This can be found in the URL address bar when in the Airflow UI (composer)-- https:// < your_web_server_id > .appspot.com/'
- dag_name: '< your_dag_name >'

In [None]:
import google.auth
import google.auth.compute_engine.credentials
import google.auth.iam
from google.auth.transport.requests import Request
import google.oauth2.credentials
import google.oauth2.service_account
import requests

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.
    """
    # ----------------------------------------------------------------------------------------
    # Fill in with your Composer info here
    # ----------------------------------------------------------------------------------------
    
    client_id = 'YOUR-CLIENT-ID'
    
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'YOUR-TENANT-PROJECT'
    
    # The name of the DAG you wish to trigger
    dag_name = 'composer_sample_trigger_response_dag'
    
    # ----------------------------------------------------------------------------------------
    # End of info fill in
    # ----------------------------------------------------------------------------------------
    
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/api/experimental/dags/'
        + dag_name
        + '/dag_runs'
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method='POST', json={"conf":data})


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.

    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.

    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Figure out what environment we're running in and get some preliminary
    # information about the service account.
    bootstrap_credentials, _ = google.auth.default(
        scopes=[IAM_SCOPE])

    # For service account's using the Compute Engine metadata service,
    # service_account_email isn't available until refresh is called.
    bootstrap_credentials.refresh(Request())

    signer_email = bootstrap_credentials.service_account_email
    if isinstance(bootstrap_credentials,
                  google.auth.compute_engine.credentials.Credentials):
        # Since the Compute Engine metadata service doesn't expose the service
        # account key, we use the IAM signBlob API to sign instead.
        # In order for this to work:
        # 1. Your VM needs the https://www.googleapis.com/auth/iam scope.
        #    You can specify this specific scope when creating a VM
        #    through the API or gcloud. When using Cloud Console,
        #    you'll need to specify the "full access to all Cloud APIs"
        #    scope. A VM's scopes can only be specified at creation time.
        # 2. The VM's default service account needs the "Service Account Actor"
        #    role. This can be found under the "Project" category in Cloud
        #    Console, or roles/iam.serviceAccountActor in gcloud.
        signer = google.auth.iam.Signer(
            Request(), bootstrap_credentials, signer_email)
    else:
        # A Signer object can sign a JWT using the service account's key.
        signer = bootstrap_credentials.signer

    # Construct OAuth 2.0 service account credentials using the signer
    # and email acquired from the bootstrap credentials.
    service_account_credentials = google.oauth2.service_account.Credentials(
        signer, signer_email, token_uri=OAUTH_TOKEN_URI, additional_claims={
            'target_audience': client_id
        })
    # service_account_credentials gives us a JWT signed by the service
    # account. Next, we use that to obtain an OpenID Connect token,
    # which is a JWT signed by Google.
    google_open_id_connect_token = get_google_open_id_connect_token(
        service_account_credentials)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account {} does not have permission to '
                        'access the IAP-protected application.'.format(
                            signer_email))
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text


def get_google_open_id_connect_token(service_account_credentials):
    """Get an OpenID Connect token issued by Google for the service account.

    This function:

      1. Generates a JWT signed with the service account's private key
         containing a special "target_audience" claim.

      2. Sends it to the OAUTH_TOKEN_URI endpoint. Because the JWT in #1
         has a target_audience claim, that endpoint will respond with
         an OpenID Connect token for the service account -- in other words,
         a JWT signed by *Google*. The aud claim in this JWT will be
         set to the value from the target_audience claim in #1.

    For more information, see
    https://developers.google.com/identity/protocols/OAuth2ServiceAccount .
    The HTTP/REST example on that page describes the JWT structure and
    demonstrates how to call the token endpoint. (The example on that page
    shows how to get an OAuth2 access token; this code is using a
    modified version of it to get an OpenID Connect token.)
    """

    service_account_jwt = (
        service_account_credentials._make_authorization_grant_assertion())
    request = google.auth.transport.requests.Request()
    body = {
        'assertion': service_account_jwt,
        'grant_type': google.oauth2._client._JWT_GRANT_TYPE,
    }
    token_response = google.oauth2._client._token_endpoint_request(
        request, OAUTH_TOKEN_URI, body)
    return token_response['id_token']

# 4. Airflow DAG (Composer)

In [None]:
import re
import os
import logging
import datetime
from datetime import timedelta

import airflow
from airflow import DAG
from airflow import models
from airflow import configuration
from airflow.models import Variable
from airflow.models import TaskInstance
from airflow.utils.dates import days_ago

from airflow.contrib.hooks import gcs_hook
from airflow.operators import bash_operator
from airflow.contrib.operators import gcs_to_bq, gcs_to_gcs
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.python_operator import PythonOperator
"""
GCS trigger (cloudfunction) >> GCS file to Big Query
Files dropped into gcs bucket will trigger this DAG and sink to big query table
"""
default_args={
    'owner': 'your_name_here',
    'depends_on_past': False,
    'start_date': datetime.datetime(1999, 5, 17, 0, 0)
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'sla_miss_callback': yet_another_function,
    # 'trigger_rule': 'all_success'
}

# Input Params
compression_type='GZIP', # or default value 'NONE' 
field_delimiter_type="\t", # tab delimited or "," for comma delimited
source_bucket='your_source_GCSbucket'
destination_dataset='your_bq_data_set'
destination_table='your_bq_table'
write_disposition_type='WRITE_APPEND' 
bq_schema=[
    {'name': 'username', 'type': 'STRING', 'mode': 'NULLABLE'},
    {'name': 'date_time', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'},
    {'name': 'ip', 'type': 'STRING', 'mode': 'NULLABLE'},
]

def get_source(**kwargs):
    """
    gets file_name from DAG context and xcom_pushes
    """
    split = "\/(.*?)\/"
    context_id = kwargs["dag_run"].conf['id']
    gcs_file_name = re.search(split, context_id).group(1)
    task_instance = kwargs['task_instance']
    task_instance.xcom_push(value=gcs_file_name, key='file_name')
    
def branch(**kwargs):
    task_instance = kwargs['task_instance']
    file_name = task_instance.xcom_pull(task_ids='get_file', key='file_name')
    print("FILE NAME: "+file_name)
    if file_name.endswith('.tsv.gz'):
        return 'sink_bq'
    else:
        return 'move_meta_file'
    
with airflow.DAG(
    'sftp_to_bigquery',
    default_args=default_args,
    description='GCS trigger to bigquery',
    max_active_runs=6, # max number of concurrent dag runs (best practice n_cores-1)
    schedule_interval=None) as dag:
    
    
    # Runs get_source function to save file name to xcom
    get_file = PythonOperator(task_id="get_file", python_callable=get_source, provide_context=True)

    fork = BranchPythonOperator(task_id='fork', python_callable=branch, provide_context=True)

    sink_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
        task_id='sink_bq',
        bucket=source_bucket,
        source_objects=["{{ task_instance.xcom_pull(task_ids='get_file', key='file_name') }}"], # pulls file_name
        destination_project_dataset_table='your_dataset.your_table',
        schema_fields=bq_schema,
        compression=compression_type,
        field_delimiter=field_delimiter_type,
        write_disposition=write_disposition_type,
        provide_context=True
        )

    move_sink_file = gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator(
        task_id='move_sink_file',
        source_bucket=source_bucket,
        source_object="{{ task_instance.xcom_pull(task_ids='get_file', key='file_name') }}",
        destination_bucket=del_bucket,
        destination_object="{{ task_instance.xcom_pull(task_ids='get_file', key='file_name') }}",
        move_object=True,
        provide_context=True,
        trigger_rule='none_failed'
        )

    move_meta_file = gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator(
        task_id='move_meta_file',
        source_bucket=source_bucket,
        source_object="{{ task_instance.xcom_pull(task_ids='get_file', key='file_name') }}",
        destination_bucket=del_bucket,
        destination_object="{{ task_instance.xcom_pull(task_ids='get_file', key='file_name') }}",
        move_object=True,
        provide_context=True,
        trigger_rule='none_failed'
        )

    job_complete = DummyOperator(task_id='job_complete', dag=dag, trigger_rule='none_failed')

    get_file >> fork 
    fork >> move_meta_file >> job_complete
    fork >> sink_bq >> move_sink_file >> job_complete
