# Hacker News DAG Pipeline
### An Example Project Illustrating a Multiple Dependency Pipeline with Directed Acyclic Graph Implementation

Our goal is to identify the most frequent keywords used in titles of popular stories on [Hacker News](https://news.ycombinator.com/) during 2014.  Starting with a json file of 2014 story data from a Hacker News API, we will implement a pipeline utilizing a DAG to extract the top 100 most frequent title words.

Pipeline Overview:
- Instantiate Pipeline object
- Assign tasks:
    - Load data from json file
    - Filter for most popular stories
    - Convert to csv format
    - Extract titles
    - Clean titles
    - Count word frequency
    - Find top 100 words

### Load Libraries and Dependencies
- `Pipeline` and `build_csv` implementations are imported from [pipeline.py](https://github.com/teresahanak/hacker_news_dag_pipeline/blob/main/pipeline.py)

In [11]:
%load_ext nb_black

from pipeline import Pipeline
import json
import csv
from pipeline import build_csv
import io
import datetime
import string
import stop_words

The nb_black extension is already loaded. To reload it, use:
  %reload_ext nb_black


<IPython.core.display.Javascript object>

### Utilizing the Pipeline

In [20]:
# Instantiating pipeline object
pipeline = Pipeline()

# Loading stories in json file as list of dictionaries for each story
@pipeline.task()
def file_to_json():
    with open("hn_stories_2014.json") as f:
        stories = json.load(f)["stories"]
    return stories

# Filtering into generator of most popular stories
@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
    return (
        story
        for story in stories
        if (story["points"] > 50)
        and (story["num_comments"] > 1)
        and not (story["title"].startswith("Ask HN"))
    )

# Converting to csv file format as io.StringIO object for ease of use
@pipeline.task(depends_on=filter_stories)
def json_to_csv(stories):
    lines = [
        [
            story["objectID"],
            datetime.datetime.strptime(
                story["created_at"], "%Y-%m-%dT%H:%M:%SZ"
            ),  # Sample format: 2014-05-29T04:27:42Z
            story["url"],
            story["points"],
            story["title"],
        ]
        for story in stories
    ]
    return build_csv(
        lines,
        header=["objectID", "created_at", "url", "points", "title"],
        file=io.StringIO(),
    )

# Extracting titles into generator
@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index("title")
    return (story[idx] for story in reader)

# Cleaning titles into generator
@pipeline.task(depends_on=extract_titles)
def clean_titles(titles):
    return (
        title.translate(title.maketrans("", "", string.punctuation + "‘’–")).lower()
        for title in titles
    )

# Counting frequency of words in titles into sorted list of (word, count) tuples, exluding stop_words
@pipeline.task(depends_on=clean_titles)
def word_freq_tbl_sorted(titles):
    counts = {}
    for title in titles:
        for word in title.split():
            if word not in counts:
                counts[word] = 0
            counts[word] += 1

    exclude = stop_words.get_stop_words("en")
    counts = {k: v for k, v in counts.items() if k not in exclude}
    return sorted(counts.items(), key=lambda x: x[1], reverse=True)

# Reducing list of words to top 100 most frequent
@pipeline.task(depends_on=word_freq_tbl_sorted)
def top_100_title_words(word_counts):
    return word_counts[:100]

results = pipeline.run()
results[top_100_title_words]

[('hn', 210),
 ('show', 192),
 ('new', 185),
 ('google', 167),
 ('bitcoin', 101),
 ('open', 92),
 ('programming', 90),
 ('web', 89),
 ('data', 85),
 ('us', 85),
 ('video', 79),
 ('python', 76),
 ('code', 72),
 ('using', 71),
 ('facebook', 71),
 ('released', 71),
 ('now', 69),
 ('2013', 65),
 ('javascript', 65),
 ('free', 64),
 ('source', 64),
 ('internet', 63),
 ('game', 63),
 ('first', 62),
 ('go', 60),
 ('will', 59),
 ('microsoft', 59),
 ('one', 59),
 ('c', 59),
 ('linux', 58),
 ('app', 57),
 ('can', 56),
 ('pdf', 55),
 ('work', 54),
 ('language', 54),
 ('dont', 54),
 ('software', 52),
 ('2014', 52),
 ('startup', 51),
 ('apple', 50),
 ('use', 50),
 ('make', 50),
 ('time', 48),
 ('yc', 48),
 ('security', 48),
 ('get', 45),
 ('nsa', 45),
 ('github', 45),
 ('system', 44),
 ('windows', 44),
 ('1', 41),
 ('world', 41),
 ('way', 41),
 ('like', 41),
 ('project', 40),
 ('computer', 40),
 ('heartbleed', 40),
 ('git', 37),
 ('users', 37),
 ('design', 37),
 ('ios', 37),
 ('back', 36),
 ('develo

<IPython.core.display.Javascript object>

#### Observations:
- We were able to retrieve the information we sought very quickly and with minimal code.
- Additionally, by relying on generators, RAM usage is contained.
- We can see that there are still some less useful words in our results, so we could do another iteration adding to the `excluded` stop words for a more refined list.

## Conclusion
This base execution of the DAG pipeline with multiple dependencies illustrates the efficiency of the approach both for data preparation and filtering for insights.  With the Pipeline and DAG classes established, they can be easily applied to any number of tasks on various datasets, as we have done here.  In a specific use case, such as the above example, the code is very readable and accessible for modification and adaptation.