## Working with Dask Bags for Unstructured Data

Datasets that have not already been standardized and provided as CSV files can be challenging to work with. In this chapter you'll use the Dask Bag to read raw text files and perform simple text processing workflows over large datasets in parallel. Conceptually, the Dask Bag is a parallel list that can store any Python datatype with convenient functions that map over all of the elements.

### Reading & counting
The sotu/ directory contains text files for each of the 45 US Presidents as of 2017. These text files contain the State of the Union addresses delivered by each president. The texts of the speeches were obtained from the American Presidency Project.

The entire speech for each State of the Union address is on a single line in each text file; that is, individual addresses are separated by '\n'. For example, the 33rd US president Truman delivered 9 State of the Union speeches, so the file sotu/33Truman.txt has 9 lines). Distinct files, then have distinct numbers of lines according to the number of State of the Union addresses each president delivered during their presidency.

In [3]:
# import module
import glob
import dask.bag as db

# Glob filenames matching 'sotu/*.txt' and sort
filenames = glob.glob('sotu/*.txt')
filenames = sorted(filenames)

# Load filenames as Dask bag with db.read_text(): speeches
speeches = db.read_text(filenames)

# Print number of speeches with .count()
print(speeches.count().compute())

237


### Taking one element
Let's start using data from the Dask bag speeches of State of the Union addresses. For large datasets, the .take() method is useful for inspecting data & data types while processing dask bags. The value returned by .take(n) is a tuple of the first n elements of the dask.bag.

The speeches bag from the previous exercise is provided for you. Your task is to extract the first element of the bag using .take(), to determine its datatype, and to print the words of the first speech.

In [4]:
# Call .take(1): one_element
one_element = speeches.take(1)

# Extract first element of one_element: first_speech
first_speech = one_element[0]

# Print type of first_speech and first 60 characters
print(type(first_speech))
print(first_speech[:60])

<class 'str'>
 Fellow-Citizens of the Senate and House of Representatives:


### Splitting by word & count
Using the speeches bag from earlier exercises let's examine some statistics about the State of the Union addresses.

Your job is to split each speech into a list of words using a single space ' ' as the separator. At this point the Dask Bag can be considered a list-of-lists. You'll then map the len() function over each of the inner lists to compute the number of words in each speech and then compute the mean() of the lengths to get the average word count.

In [6]:
# Call .str.split(' ') from speeches and assign it to by_word
by_word = speeches.str.split(' ')

# Map the len function over by_word and compute its mean
n_words = by_word.map(len)
avg_words = n_words.mean()

# Print the type of avg_words and value of avg_words.compute()
print(type(avg_words))
print(avg_words.compute())

<class 'dask.bag.core.Item'>
8239.084388185654


### Filtering on a phrase
In this exercise you'll make use of the filter function to take the 226 State of the Union addresses and find the addresses where the phrase health care was mentioned. In order to do this you must first standardize the capitalization of all words in each speech.

Your job is to convert all speeches to lower case and write a lambda function that returns true if the substring 'health care' is contained in each speech and filter with it. Finally, you'll count the number of speeches retained by the filter.

In [7]:
# Convert speeches to lower case: lower
lower = speeches.str.lower()

# Filter lower for the presence of 'health care': health
health = lower.filter(lambda s:'health care' in s)

# Count the number of entries : n_health
n_health = health.count()

# Compute and print the value of n_health
print(n_health.compute())

47


### Loading & mapping from JSON
A collection of JSON files for Congressional bills have been downloaded from GovTrack.us. All bills presented during each congressional session from 1973 - 2017 are provided in separate files.

Your job is to read the bills*.json files into a Dask Bag. You'll then use the JSON module to map the text of each file into Python dictionaries. You'll then inspect the first element of the Dask Bag.

In [9]:
import json

# Call db.read_text with congress/bills*.json: bills_text
bills_text = db.read_text('congress/bills*.json')

# Map the json.loads function over all elements: bills_dicts
bills_dicts = bills_text.map(json.loads)

# Extract the first element with .take(1) and index to the first position: first_bill
first_bill = bills_dicts.take(1)[0]

# Print the keys of first_bill
print(first_bill.keys())


dict_keys(['bill_resolution_type', 'bill_type', 'bill_type_label', 'committee_reports', 'congress', 'current_chamber', 'current_status', 'current_status_date', 'current_status_description', 'current_status_label', 'display_number', 'docs_house_gov_postdate', 'id', 'introduced_date', 'is_alive', 'is_current', 'link', 'lock_title', 'major_actions', 'noun', 'number', 'related_bills', 'scheduled_consideration_date', 'senate_floor_schedule_postdate', 'sliplawnum', 'sliplawpubpriv', 'source', 'source_link', 'sponsor', 'sponsor_role', 'text_incorporation', 'title', 'title_without_number', 'titles'])


### Filtering vetoed bills
Now that you've got a Dask Bag prepared with congressional bills as dictionaries we can use filtering and mapping tools to find all of the bills since the 93rd congress that were vetoed by the sitting President and later overridden by congress.

The bills_dicts Dask Bag from the previous exercise is provided for you. Your job is to filter the bills to retain those where the current_status key is 'enacted_veto_override'. You'll then print the titles of the bills using .pluck. To help you do this, the following function has been defined for you:


In [10]:
# Compare the value of the 'current_status' key to 'enacted_veto_override'
def veto_override(d):
    return d['current_status'] == 'enacted_veto_override'

# Filter the bills: overridden
overridden = bills_dicts.filter(veto_override)

# Print the number of bills retained
print(overridden.count().compute())

# Get the value of the 'title' key
titles = overridden.pluck('title')

# Compute and print the titles
print(titles.compute())

3
['H.R. 12471 (93rd): A bill to amend section 552 of title 5, United States Code, known as the Freedom of Information Act.', 'H.R. 6198 (97th): A bill to amend the manufacturing clause of the copyright law.', 'H.R. 6863 (97th): Supplemental Appropriations Act, 1982']


### Computing the average bill's lifespan
Some congressional bills can take years to get through committees, floor reading, voting and presidential signatures.

Each bill in the bills_dicts Dask Bag has 'current_status_date' and 'introduced_date' keys. Your job is to write a function that returns the number of days that have passed between these two dates. You'll then apply this function over the bills where the 'current_status' is 'enacted_signed'. Finally, you'll compute the average number of days. Pandas has been imported for you as pd and the bills_dicts Dask Bag has been provided.

In [None]:
# Define a function lifespan that takes a dictionary d as input
def lifespan(d):
    # Convert to datetime
    current = pd.to_datetime(d['current_status_date'])
    intro = pd.to_datetime(d['introduced_date'])

    # Return the number of days
    return (current - intro).days

# Filter bills_dicts: days
days = bills_dicts.filter(lambda s:s['current_status']=='enacted_signed').map(lifespan)

# Print the mean value of the days Bag
print(days.mean().compute())