# An Airflow Story: Cleaning & Visualizing our Github Data.


Through Astronomer’s short but exciting life so far, we’ve changed our stack, product direction, and target market more times than we can count. The flux has definitely caused some stress, but it has ultimately been a major source of growth both for me individually and for our company as a whole.

However, all this pivoting has been unequivocally unkind to two things: our cofounder’s hair color (BEFORE AND AFTER) and our Github org.

The last few years have left us with 1000 repos and 1000 orgs. As you can imagine, this made it difficult to do any internal github reporting on which repos we still use, who was closing out issues, what type of issues stay open, and how many issues we have left on a milestone.

## Apache Airflow on Astronomer.


Lucky for us, we’re a data engineering company with a module designed to solve exactly that type of problem

_Apache Airflow_ is a data workflow management system that allows engineers to schedule, deploy and monitor their own data pipes as DAGs (directed acyclic graphs). Built by developers, for developers, it’s based on the principle that ETL is best expressed in code.

Running Airflow on Astronomer lets you leverage all of its features without getting bogged down in things that aren’t related to your data. Not only that, but you get access to your own celery workers, Graphana dashboards for monitoring, and Prometheus for alerting.

From a developer's perspective, it lets you focus on _using a tool to solve a problem instead of solving a tool's problems._

Astronomer Open is our entire platform in a docker container that you can download and hack on locally.

## Spinning up Astronomer Open.

Clone [https://github.com/astronomerio/astronomer](Astronomer Open) for a local, hackable, version of the entire Astronomer Platform - Airflow, Grafana, Prometheus, and so much more.

Jump into the airflows directory and run the start script pointing to the DAG directory.

`cd astronomer/examples/airflow`

`./start YOURDIRECTORYHERE`

A bunch of stuff will pop up in the command line, including the address where everything will be located 


<img src="astro_open.png" title="Asto Open" />

All custom requirements can be stored in a `requirements.txt` file in the directory you're pointed at (an empty one will be initialized when you run start).

## Getting Github Data.

All the heavy lifting in Airflow is done with hooks and operators. 

    **Hooks** are an interface to external systems (APIs, DBs, etc).
    **Operators** are units of logic that use hooks to actually do the work.

Since the Github API takes HTTP requests, writing the hook is simply going to be wrapping around the code you'd usually write to hit the Github API.

In [1]:
## python code for hitting the Github API 
import requests
r = requests.get('https://api.github.com/', headers={'Authorization': 'TOK:TOKEN'})

In an Airflow Hook, we take a regular request and wrap around some additional logic (i.e. where is the token, what kind of requests are necessary, etc):

The request itself and all the heavy lifting is done by the HttpHook.

In [4]:
from airflow.hooks.http_hook import HttpHook

# Inherit from the Airflow HttpHook.
class GithubHook(HttpHook):
    
    def __init__(self, github_conn_id):
        self.github_token = None
        conn_id = self.get_connection(github_conn_id)
        if conn_id.extra_dejson.get('token'):
            # If the token was entered in the extras field in the connections panel, 
            
            self.github_token = conn_id.extra_dejson.get('token')
        super().__init__(method='GET', http_conn_id=github_conn_id) # Keeping it to GET requests here.

    def get_conn(self, headers):
        """
        Accepts both Basic and Token Authentication.
        If a token exists in the "Extras" section
        with key "token", it is passed in the header.

        If not, the hook looks for a user name and password.

        In either case, it is important to ensure your privacy
        settings for your desired authentication method allow
        read access to user, org, and repo information.
        """
        if self.github_token:
            headers = {'Authorization': 'token {0}'.format(self.github_token)}
            session = super().get_conn(headers)
            session.auth = None
            return session
        return super().get_conn(headers)

When converting this to an Airflow hook, all the credentials can be stored in the Connections panel (from above, airflow spins up at `http://localhost:8080`.

<img src="github_connections.png" title="Github Connecitons" />

## Moving Data.

Now that there's an interface to interact with the external system, we need to define what actually needs to be done.

We like using Amazon S3 as an intermediary system when working with Redshift - the COPY command makes inserts easy and if a task fails during a data pipeline, the pipeline can restart and pick up where it left off using the data in S3 . 

So the workflow is going to go from Github to S3 to Redshift.

In [None]:
from airflow.utils.decorators import apply_defaults
from airflow.models import BaseOperator
from ..hooks.github_hook import GithubHook # Import the GithubHook from before
from airflow.hooks import S3Hook
from flatten_json import flatten
import logging
import json


class GithubToS3Operator(BaseOperator):
    
    # Define the params - what input is needed for the logic to execute
    """
    Github To S3 Operator
    :param github_conn_id:           The Github connection id.
    :type github_conn_id:            string
    
    :param github_org:               The Github organization.
    :type github_org:                string
    
    :param github_repo:              The Github repository. Required for
                                     commits, commit_comments, issue_comments,
                                     and issues objects.
    :type github_repo:               string
    
    :param github_object:            The desired Github object. The currently
                                     supported values are:
                                        - commits
                                        - commit_comments
                                        - issue_comments
                                        - issues
                                        - members
                                        - organizations
                                        - pull_requests
                                        - repositories
    :type github_object:             string
    
    :param payload:                  The associated github parameters to
                                     pass into the object request as
                                     keyword arguments.
    :type payload:                   dict
    
    :param s3_conn_id:               The s3 connection id.
    :type s3_conn_id:                string
    
    :param s3_bucket:                The S3 bucket to be used to store
                                     the Github data.
    :type s3_bucket:                 string
    
    :param s3_key:                   The S3 key to be used to store
                                     the Github data.
    :type s3_key:                    string
    """

    template_field = ['s3_key', 'payload']

    @apply_defaults
    def __init__(self,
                 github_conn_id,
                 github_org,
                 github_object,
                 s3_conn_id,
                 s3_bucket,
                 s3_key,
                 github_repo=None,
                 payload={},
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.github_conn_id = github_conn_id
        self.github_org = github_org
        self.github_repo = github_repo
        self.github_object = github_object
        self.payload = payload
        self.s3_conn_id = s3_conn_id
        self.s3_bucket = s3_bucket
        self.s3_key = s3_key
        if self.github_object.lower() not in ('commits',
                                              'commit_comments',
                                              'issue_comments',
                                              'issues',
                                              'members',
                                              'organizations',
                                              'pull_requests',
                                              'repositories'):
            raise Exception('Specified Github object not currently supported.')

    def execute(self, context):
        g = GithubHook(self.github_conn_id)
        s3 = S3Hook(self.s3_conn_id)
        output = []

        if self.github_object not in ('members',
                                      'organizations',
                                      'repositories'):
            if self.github_repo == 'all':
                repos = [repo['name'] for repo in
                         self.paginate_data(g,
                                            self.methodMapper('repositories'))]
                for repo in repos:
                    output.extend(self.retrieve_data(g, repo=repo))
            elif isinstance(self.github_repo, list):
                repos = self.github_repo
                for repo in repos:
                    output.extend(self.retrieve_data(g, repo=repo))
            else:
                output = self.retrieve_data(g, repo=self.github_repo)
        else:
            output = self.retrieve_data(g, repo=self.github_repo)
        output = '\n'.join([json.dumps(flatten(record)) for record in output])
        s3.load_string(
            string_data=output,
            key=self.s3_key,
            bucket_name=self.s3_bucket,
            replace=True
        )
# Read the rest of the code HERE.

View the rest of the Github code [here](https://github.com/airflow-plugins/github_plugin/blob/master/operators/github_to_s3_operator.py). Once the data is in S3, we we use a standard  [S3 to Redshift Operator](https://github.com/airflow-plugins/s3_to_redshift_operator/blob/master/operators/s3_to_redshift.py).

## Writing the DAG.

Now the DAG itself can be written to leverage how the hook and operator handle all of the logic.


In [None]:
from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import GithubToS3Operator, S3ToRedshiftOperator
from airflow.operators.postgres_operator import PostgresOperator

default_args = {'owner': 'airflow',
                'start_date': datetime(2018, 1, 5),
                'email': ['l5t3o4a9m9q9v1w9@astronomerteam.slack.com'], # Send failure alerts to Slack.
                'email_on_failure': True,
                'email_on_retry': False
                }

dag = DAG('github_to_redshift',
          default_args=default_args,
          schedule_interval='@hourly',
          catchup=False
          )

endpoints = [{"name": "commits",
              "payload": {},
              "load_type": "rebuild"},
             {"name": "issue_comments",
              "payload": {"state": "all"},
              "load_type": "rebuild"},
             {"name": "issues",
              "payload": {"state": "all"},
              "load_type": "rebuild"},
             {"name": "repositories",
              "payload": {},
              "load_type": "rebuild"},
             {"name": "members",
              "payload": {},
              "load_type": "rebuild"},
             {"name": "pull_requests",
              "payload": {"state": "all"},
              "load_type": "rebuild"}]

get_individual_issue_counts = \
    """
    INSERT INTO github.issue_count_by_user
    (SELECT login, sum(count) as count, timestamp
     FROM
            ((SELECT m.login, count(i.id), cast('{{ execution_date + macros.timedelta(hours=-4) }}' as timestamp) as timestamp
            FROM github.astronomerio_issues i
            JOIN github.astronomerio_members m
            ON i.assignee_id = m.id
            WHERE i.state = 'open'
            GROUP BY m.login
            ORDER BY login)
        UNION
            (SELECT m.login, count(i.id), cast('{{ execution_date + macros.timedelta(hours=-4) }}' as timestamp) as timestamp
            FROM github.astronomerio_issues i
            JOIN github.astronomerio_members m
            ON i.assignee_id = m.id
            WHERE i.state = 'open'
            GROUP BY m.login
            ORDER BY login)
        UNION
            (SELECT m.login, count(i.id), cast('{{ execution_date + macros.timedelta(hours=-4) }}' as timestamp) as timestamp
            FROM github."airflow-plugins_issues" i
            JOIN github."airflow-plugins_members" m
            ON i.assignee_id = m.id
            WHERE i.state = 'open'
            GROUP BY m.login
            ORDER BY login))
    GROUP BY login, timestamp);
    """

copy_params = ["COMPUPDATE OFF",
               "STATUPDATE OFF",
               "JSON 'auto'",
               "TRUNCATECOLUMNS",
               "region as 'us-east-1'"]

orgs = [{'name': 'astronomerio',
         'github_conn_id': 'astronomerio-github'},
        {'name': 'astronomer-integrations',
         'github_conn_id': 'astronomer-integrations-github'},
        {'name': 'airflow-plugins',
         'github_conn_id': 'astronomerio-github'}]

with dag:
    kick_off_dag = DummyOperator(task_id='kick_off_dag')

    github_transforms = PostgresOperator(task_id='github_transforms',
                                         sql=get_individual_issue_counts,
                                         postgres_conn_id='astronomer-new-redshift')

    for endpoint in endpoints:
        for org in orgs:
            github = GithubToS3Operator(task_id='github_{0}_data_from_{1}_to_s3'.format(endpoint['name'], org['name']),
                                        github_conn_id=org['github_conn_id'],
                                        github_org=org['name'],
                                        github_repo='all',
                                        github_object=endpoint['name'],
                                        payload=endpoint['payload'],
                                        s3_conn_id='astronomer-s3',
                                        s3_bucket='astronomer-internal-reporting',
                                        s3_key='github/{0}/{1}.json'.format(org['name'], endpoint['name']))

            redshift = S3ToRedshiftOperator(task_id='github_{0}_from_{1}_to_redshift'.format(endpoint['name'], org['name']),
                                            s3_conn_id='astronomer-s3',
                                            s3_bucket='astronomer-internal-reporting',
                                            s3_key='github/{0}/{1}.json'.format(org['name'], endpoint['name']),
                                            origin_schema='github/{0}_schema.json'.format(endpoint['name']),
                                            load_type='rebuild',
                                            copy_params=copy_params,
                                            redshift_schema='github',
                                            table='{0}_{1}'.format(org['name'], endpoint['name']),
                                            redshift_conn_id='astronomer-new-redshift'
                                            )
            
            # Define the dependencies.
            kick_off_dag >> github >> redshift >> github_transforms

Apart from the SQL and some For loops, the DAG is essentially a config file - it says _what_ to do, _when_ to do it, and the order it should be done - all of the _how_ gets stored in the hook and operator.


<img src="github_dag.png" title="Github DAG" />

DummyOperators can help keep DAGs clean. In the DAG above, there's one that starts the DAG, and one that seperates the moving and using of data.

This makes the DAG more idempotent and helps with debugging (i.e. I can easily iterate on my SQL in the DAG above once I've gotten all the data into Redshift the way that I like it).

The folder structure for the Github DAG looks like:
    
<img src="github_files.png" title="Github File Tree" />
  

The ___init__.py_ files in the _hooks_ and _operators_ folders are empty, but the one in the _GithubPlugin_ directory tells Airflow that this is a plugins folder:


In [None]:
from airflow.plugins_manager import AirflowPlugin
from GithubPlugin.hooks.github_hook import GithubHook
from GithubPlugin.operators.github_to_s3_operator import GithubToS3Operator


class GithubPlugin(AirflowPlugin):
    name = "github_plugin"
    operators = [GithubToS3Operator]
    hooks = [GithubHook]

## Step 4: Running Locally with Open

Now that the workflow has been defined, pop over to `https://localport:8080` to see the workflow run.


<img src="airflow_ui.png" title="Airflow UI" />


This is a pretty simple workflow where there's not a whole lot that can go wrong. But what if there was a lot more going on, and you needed information on which part of your DAG was using the most resource intensive or what worker utilization looked like?

With Prometheus and Grafana bundled with Open, you get all this and more.


<img src="open_dashboards.png" title="Open Dashboards" />


## Visualizing and Dashboarding.

Now that the workflow has been tested all the data is in Redshift, it's ready for visualization and dashboarding. The SQL in the DAG is what we started with, but our dashboard ended up showing a lot more.

There are a ton of [great dashboarding tools out there](https://www.astronomer.io/blog/six-open-source-dashboards/), but we decided on Apache Superset because we love all things open source.

<img src="github_dashboard.png" title="Github Dashboard" />


Unsurprisingly, our CTO @schnie is leading the way with commits - guess he's CTO for a reason. 

We maintain all of our Airflow plugins [here](https://github.com/airflow-plugins) and [have a list of best pracitces that we briefly touched on](https://docs.astronomer.io/v2/apache_airflow/tutorial/best-practices.html).


Like what you see? Want to try something yourself? Download [Open Edition](https://github.com/astronomerio/astronomer) and hack on it!