# Nelson_Project Notebook: Building Data Pipelines

## 1. Introduction to the Data

In this past, we began with the concepts of functional programming, and then built our own data pipeline class in Python. We learned about advanced Python concepts such as the decorators and closures. In the last session, we also learned how to implement a directed acyclic graph as the scheduler for our pipeline.

After completing all these sessions, we have finally built a robust data pipeline that schedules our tasks in the correct order! In this project, we will use the pipeline we have been building, and apply it to a real world data pipeline project.

From a JSON API, we will filter, clean, aggregate, and summarize data in a sequence of tasks that will apply these transformations for us.

The data we will use comes from a Hacker News (HN) API that returns JSON data of the top stories in 2014. If you're unfamiliar with Hacker News, it's a link aggregator website that users vote up stories that are interesting to the community. It is similar to Reddit, but the community only revolves around on computer science and entrepreneurship posts.

To make things easier, we have already downloaded a list of JSON posts to a file called `hn_stories_2014.json`. The JSON file contains a single key stories, which contains a list of stories (posts). Each post has a set of keys, but we will deal only with the following keys:

* `created_at`: A timestamp of the story's creation time.
* `created_at_i`: A unix epoch timestamp.
* `url`: The URL of the story link.
* `objectID`: The ID of the story.
* `author`: The story's author (username on HN).
* `points`: The number of upvotes the story had.
* `title`: The headline of the post.
* `num_comments`: The number of a comments a post has.

Here's an example of the full list of keys in a story:

```
{
    "story_text": "",
    "created_at": "2014-05-29T08:23:46Z",
    "story_title": null,
    "story_id": null,
    "comment_text": null,
    "created_at_i": 1401351826,
    "url": "http://bits.blogs.nytimes.com/2014/05/28/making-twitter-easier-to-use/",
    "parent_id": null,
    "objectID": "7815285",
    "author": "Leynos",
    "points": 1,
    "title": "Making Twitter Easier to Use",
    "_tags": [
        "story",
        "author_Leynos",
        "story_7815285"
    ],
    "num_comments": 0,
    "_highlightResult": {
        "story_text": {
            "matchedWords": [],
            "value": "",
            "matchLevel": "none"
        },
        "author": {
            "matchedWords": [],
            "value": "Leynos",
            "matchLevel": "none"
        },
        "url": {
            "matchedWords": [],
            "value": "http://bits.blogs.nytimes.com/2014/05/28/making-twitter-easier-to-use/",
            "matchLevel": "none"
        },
        "title": {
            "matchedWords": [],
            "value": "Making Twitter Easier to Use",
            "matchLevel": "none"
        }
    },
    "story_url": null
}
```

Using this dataset, we will run a sequence of basic natural language processing tasks using our Pipeline class. The goal will be to find the top 100 keywords of Hacker News posts in 2014. Because Hacker News is the most popular technology social media site, this will give us an understanding of the most talked about tech topics in 2014!


**Tasks**

* Import the Pipeline class from the pipeline module. You can import it like so: `from pipeline import Pipeline`.

* Instantiate an instance of the `Pipeline` class and assign it to the variable `pipeline`.

In [1]:
# Your code goes here
from pipeline import Pipeline
pipeline = Pipeline()

## 2. Loading the JSON Data

We'll start the project by loading the JSON file data into Python. Because JSON files resemble a key-value dictionary, the goal is to parse the JSON file into a Python `dict` object. We can accomplish this using the `json` module.

In a previous session, we worked with this JSON parser before. As a reminder, this is how you can parse JSON strings:

```
import json
​
# Notice that `sample_json` is a string, and
# NOT a dict.
sample_json = '{"hello": "world"}'
sample_dict = json.loads(sample_json)
print(sample_dict)
{'hello': 'world'}
```

To load in a file, json exposes a method called `json.load()` which takes in a Python file object as the first argument. Using this `json.load()` method, we'll load the `hn_stories_2014.json` file as a Python dict.


**Tasks**
1. Create a `pipeline.task()` function that takes in no arguments.
2. Call the function `file_to_json()`, where the function does the following:
* Loads the `hn_stories_2014.json` file into a Python dict.
* Returns the list of `stories`.




In [2]:
# Your code goes here
#we import Json first then pipe the task
import json
@pipeline.task()
def file_to_json():
  """Reads a json file into a python dictionary
  
  Returns:
    List
  """
  f = open('hn_stories_2014.json')
  data = json.load(f)
  stories = data['stories']
  return stories

## 3. Filtering the Stories

Great! Now that we have loaded in all the stories as a list of `dict` objects, we can now operate on them. Let's start by filtering the list of stories to get the most popular stories of the year.

Like any social link aggregator site, individual users can post whatever content they want. The reason we want the most popular stories is to ensure that we select stories that were the most talked about during the year. We can filter for popular stories by ensuring they are links (not `Ask HN` posts), have a good number of points, and have some comments.

**Tasks**

* Create a `pipeline.task()` function that depends on the `file_to_json()` function.
* Call the new function `filter_stories()`, that filters popular stories that have more than 50 points, more than 1 comment, and do not begin with `Ask HN`.
* `filter_stories()` should return a generator of these filtered stories.

In [3]:
# Your code goes here
#we imports all the content from the datetime module
from datetime import datetime

def parse_time(time_str):
    """
    Parses time in the format [2014-05-29T08:23:46Z]
    to a datetime object.
    """
    time_obj = datetime(int(time_str[0:4]),int(time_str[5:7]),int(time_str[8:10]))
    return time_obj

@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
  """Filters lines from a file

  Args:
    stories(str): The string to filter

  Returns:
    str
  """
  for story in stories:
    if (story['points'] > 50) & (story['num_comments'] > 1) & (not story['title'].startswith('Ask HN')):
      objectID = story['objectID']
      created_at = parse_time(story['created_at'])
      url = story['url']
      points = story['points']
      title = story['title']
      yield (objectID,created_at,url,points,title)

## 4. Convert to CSV

With a reduced set of stories, it's time to write these `dict` objects to a CSV file. The purpose of translating the dictionaries to a CSV is that we want to have a consistent data format when running the later summarizations. By keeping consistent data formats, each of your pipeline tasks will be adaptable with future task requirements.


**Tasks**

1. Create a `pipeline.task()` function that depends on the `filter_stories()` function.
2. Call the new function `json_to_csv()`, that writes the filtered JSON stories to a CSV file:
* Import `build_csv` from the `pipeline` module and `io`. The `build_csv()` function has the same API as the one you wrote in the second and third lesson.
* Create a CSV file with the headers `'objectID', 'created_at', 'url', 'points', and 'title'`.
* Parse the `created_at` column using `datetime.datetime`.
3. `json_to_csv()` should return the value from `build_csv()` using the above header, lines, and the `io.StringIO()` file.

In [4]:
# Your code goes here
#import io which will help us to extract, prepare, and 
#integrate unstructured and semi-structured web data into structured data tables.
from pipeline import build_csv
import io

@pipeline.task(depends_on=filter_stories)
def json_to_csv(filtered_stories):
  """Creates a csv file from a lines

  Args:
    filtered_stories(str): Lines to write to the csv file

  Returns:
    str: _io.stringIO instance
  """
  return build_csv(filtered_stories, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

## 5. Extract Title Column

Using the CSV file format we created in the previous task, we can now extract the title column. Once we have extracted the titles of each popular post, we can then run the next word frequency task. To extract the titles, we'll follow the steps in the tasks we wrote in the past.

The steps were: **1.** Import csv, and create a `csv.reader()` object from the file object. **2.** Find the index of the title in the header. **3.** Iterate the through the reader, and return each item from the reader in the corresponding title index position.

**Tasks**

1. Create a `pipeline.task()` function that depends on the `json_to_csv()` function.
2. Call the new function `extract_titles()`, that returns of a generator of every Hacker News story title:
* Follow the steps listed in the instructions.
3. `extract_titles()` should return a generator of titles.

In [5]:
# Your code goes here
import csv
import itertools
@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    """Generates a string from a file

    Args:
      csv_file(str): File to read from

    Returns:
      str: String from csv file
    """
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('title')
    for line in reader:
      yield line[idx]

## 6. Clean the Titles

Because we're trying to create a word frequency model of words from Hacker News titles, we need a way to create a consistent set of words to use. For example, words like `Google, google, GooGle?, and google.`, all mean the same keyword: google. If we were to split the title into words, however, they would all be lumped into different categories.

To clean the titles, we should make sure to lower case the titles, and to remove the punctuation. An easy way to rid a string of punctuation is to check each character, determine if it is a letter or punctuation, and only keep the letter. From the string package, we are given a handy string constant that contains all the punctuation needed:

```
import string
​
print(string.punctuation)
'!"#%&'()*+,-./:;<=>?@[\\]^_`{|}~'
```

**Tasks**

1. Create a `pipeline.task()` function that depends on the `extract_titles()` function.
2. Call the new function `clean_titles()`, that returns of a generator of cleaned titles:
* Ensure the title is lower case.
* Remove any punctuation from the title.

In [6]:
# Your code goes here
import string
@pipeline.task(depends_on=extract_titles)
def clean_titles(titles):
  """Generates a string from a file with all punctuations removed

    Args:
      titles (str)

    Returns:
      str
  """
  for title in titles:
    clean_string = ''.join([i for i in title.lower() if i not in string.punctuation])
    yield clean_string

## 7. Create the Word Frequency Dictionary

With a cleaned title, we can now build the word frequency dictionary. A word frequency dictionary are key value pairs that connects a word to the number of times it is used in a text. Here's an example of how a word frequency would work on a single string:

```
sample_text = "Wow, the Dataquest Data Engineering track is the best track!"
print(word_freq_from_string(sample_text))

{'wow': 1, 'the': 2, 'dataquest': 1, 'data': 1, 'engineering': 1, 'track': 2, 'is': 1, 'best': 1}
```

As you can see, the title has been stripped of its punctuation and lower cased. Furthermore, to find actual keywords, we should enforce the word frequency dictionary to not include stop words. Stop words are words that occur frequently in language like `"the", "or", etc.`, and are commonly rejected in keyword searches.

We have included a module called `stop_words` with a tuple of the most common used stop words in the English language. You can import in your notebook by using from `stop_words` import `stop_words`. Here's what the sample text would look like without the stop words:

```
sample_text = "Wow, the Dataquest Data Engineering track is the best track!"
print(word_freq_no_stop_words(sample_text))

{'wow': 1, 'dataquest': 1, 'data': 1, 'engineering': 1, 'track': 2, 'best': 1}
```

**Tasks**
1. Create a `pipeline.task()` function that depends on the `clean_titles()` function.
2. Call the new function `build_keyword_dictionary()`, that returns a dictionary of the word frequency of all the HN titles.
* The word frequency should **not** include stop words.
* You can find the words by spliting the titles dictionary on the empty space character .
* Empty words **should** be ignored.



In [7]:
from stop_words import stop_words
display(stop_words)

('a',
 'about',
 'above',
 'above',
 'across',
 'after',
 'afterwards',
 'again',
 'against',
 'all',
 'almost',
 'alone',
 'along',
 'already',
 'also',
 'although',
 'always',
 'am',
 'among',
 'amongst',
 'amoungst',
 'amount',
 'an',
 'and',
 'another',
 'any',
 'anyhow',
 'anyone',
 'anything',
 'anyway',
 'anywhere',
 'are',
 'around',
 'as',
 'at',
 'back',
 'be',
 'became',
 'because',
 'become',
 'becomes',
 'becoming',
 'been',
 'before',
 'beforehand',
 'behind',
 'being',
 'below',
 'beside',
 'besides',
 'between',
 'beyond',
 'bill',
 'both',
 'bottom',
 'but',
 'by',
 'call',
 'can',
 'cannot',
 'cant',
 'co',
 'con',
 'could',
 'couldnt',
 'cry',
 'de',
 'describe',
 'detail',
 'do',
 'done',
 'down',
 'due',
 'during',
 'each',
 'eg',
 'eight',
 'either',
 'eleven',
 'else',
 'elsewhere',
 'empty',
 'enough',
 'etc',
 'even',
 'ever',
 'every',
 'everyone',
 'everything',
 'everywhere',
 'except',
 'few',
 'fifteen',
 'fify',
 'fill',
 'find',
 'fire',
 'first',
 'five

In [8]:
@pipeline.task(depends_on=clean_titles)
def build_keyword_dictionary(clean_titles):
  """Counts the number of times a word appears in a list of strings

    Args:
      clean_titles(str)

    Returns:
      dict{str, int}
  """
  word_frequency = {}
  for line in clean_titles:
    words = [w for w in line.split() if not w in stop_words]
    for word in words:
      if not word_frequency.get(word):
        word_frequency[word] = 0
      word_frequency[word] += 1
  return word_frequency

## 8. Sort the Top Words

Finally, we're ready to sort the top words used in all the titles. In this final task, it's up to you to decide how you want to sort the top words. The goal is to output a list of tuples with (`word`, `frequency`) as the entries sorted from most used, to least most used.

**Tasks**

1. Create a `pipeline.task()` function that depends on the `build_keyword_dictionary()` function.
2. The new function can be named whatever you want, but it should return a list of the top 100 tuples described in the explanation above.
3. Run the pipline using `pipeline.run()`, and print the ouput of the new task function.


In [9]:
# Your code goes here
@ pipeline.task(depends_on=build_keyword_dictionary)
def top_words_sort(word_frequency):
  """Sorts a dictionary

    Args:
      word_frequency(dict)

    Returns:
      list 
  """
  sort_tuple = ()
  sort_list = []
  for key,value in word_frequency.items():
      sort_tuple = (key, value)
      sort_list.append(sort_tuple)
  return (sorted(sort_list, key = lambda x: x[1],reverse=True)[0:100])

In [10]:
#import the libraries,
from pipeline import Pipeline
pipeline = Pipeline()
import json

#we open the file hn_stories_2014.json and return the stories
@pipeline.task()
def file_to_json():
  f = open('hn_stories_2014.json')
  data = json.load(f)
  stories = data['stories']
  return stories
#we imports all the content from the datetime module  
from datetime import datetime

def parse_time(time_str):
    """
    Parses time in the format [30/Nov/2017:11:59:54 +0000]
    to a datetime object.
    """
    time_obj = datetime(int(time_str[0:4]),int(time_str[5:7]),int(time_str[8:10]))
    return time_obj

#from the datetime we imported the time format as above then passed the date and time format

@pipeline.task(depends_on=file_to_json)
def filter_stories(stories):
  for story in stories:
    if (story['points'] > 50) & (story['num_comments'] > 1) & (not story['title'].startswith('Ask HN')):
      objectID = story['objectID']
      created_at = parse_time(story['created_at'])
      url = story['url']
      points = story['points']
      title = story['title']
      yield (objectID,created_at,url,points,title)

        
from pipeline import build_csv

import io

@pipeline.task(depends_on=filter_stories)
def json_to_csv(filtered_stories):
  return build_csv(filtered_stories, header=['objectID', 'created_at', 'url', 'points', 'title'], file=io.StringIO())

import csv

import itertools

@pipeline.task(depends_on=json_to_csv)
def extract_titles(csv_file):
    reader = csv.reader(csv_file)
    header = next(reader)
    idx = header.index('title')
    for line in reader:
      yield line[idx]
  
import string

@pipeline.task(depends_on=extract_titles)
def clean_titles(titles):
  for title in titles:
    clean_string = ''.join([i for i in title.lower() if i not in string.punctuation])
    yield clean_string

from stop_words import stop_words

@pipeline.task(depends_on=clean_titles)
def build_keyword_dictionary(clean_titles):
  word_frequency = {}
  for line in clean_titles:
    words = [w for w in line.split() if not w in stop_words]
    for word in words:
      if not word_frequency.get(word):
        word_frequency[word] = 0
      word_frequency[word] += 1
  return word_frequency
#task depend on on the build keywords in the dictionary
@pipeline.task(depends_on=build_keyword_dictionary)
def top_words_sort(word_frequency):
  sort_tuple = ()
  sort_list = []
  for key,value in word_frequency.items():
      sort_tuple = (key, value)
      sort_list.append(sort_tuple)
  return (sorted(sort_list, key = lambda x: x[1],reverse=True)[0:100])

top_words_sorted_list = pipeline.run()

In [11]:
#display to the console the keywords as the top 100 elements
top_100_words = top_words_sorted_list[top_words_sort]
display(top_100_words)

[('new', 185),
 ('google', 167),
 ('bitcoin', 101),
 ('open', 92),
 ('programming', 90),
 ('web', 88),
 ('data', 85),
 ('video', 79),
 ('python', 76),
 ('code', 72),
 ('facebook', 71),
 ('released', 71),
 ('using', 70),
 ('2013', 65),
 ('javascript', 65),
 ('free', 64),
 ('source', 64),
 ('game', 63),
 ('internet', 62),
 ('microsoft', 59),
 ('c', 59),
 ('linux', 58),
 ('app', 57),
 ('pdf', 55),
 ('work', 54),
 ('language', 54),
 ('software', 52),
 ('2014', 52),
 ('startup', 51),
 ('apple', 50),
 ('use', 50),
 ('make', 50),
 ('time', 48),
 ('yc', 48),
 ('security', 48),
 ('nsa', 45),
 ('github', 45),
 ('windows', 44),
 ('1', 41),
 ('world', 41),
 ('way', 41),
 ('like', 41),
 ('project', 40),
 ('computer', 40),
 ('heartbleed', 40),
 ('git', 37),
 ('users', 37),
 ('dont', 37),
 ('design', 37),
 ('ios', 37),
 ('developer', 36),
 ('os', 36),
 ('twitter', 36),
 ('ceo', 36),
 ('vs', 36),
 ('life', 36),
 ('big', 35),
 ('day', 35),
 ('android', 34),
 ('online', 34),
 ('years', 33),
 ('simple', 

## Next Steps

The final result yielded some interesting keywords. There were terms like `bitcoin` (the cryptocurrency), `heartbleed` (the 2014 hack), and many others. Even though this was a basic natural language processing task, it did provide some interesting insights into conversations from 2014. Nonetheless, now that you have created the pipeline, there are additional tasks you can perform with the data.

Here are just a few:

* Rewrite the Pipeline class' output to save a file of the output for each task. This will allow you to "checkpoint" tasks so they don't have to be run twice.
* Use the `nltk` package for more advanced natural language processing tasks.
* Convert to a CSV before filtering, so you can keep all the stories from 2014 in a raw file.
* Fetch the data from Hacker News directly from a JSON API. Instead of reading from the file we gave, you can perform additional data processing using newer data.