#Sentence extractor for The New York Times

In [1]:
import logging
import math
import os
from collections import defaultdict
from datetime import timedelta, date, datetime
from dateutil import parser
from time import sleep, time

import requests
import yaml
from joblib import Parallel, delayed
from pymongo import MongoClient
from pymongo.errors import BulkWriteError, DuplicateKeyError

##Logging

In [2]:
try:
    os.remove('search_ft.log')
except:
    pass

In [3]:
logging.getLogger().handlers = []
logging.getLogger('requests.packages.urllib3').setLevel(logging.WARNING)
logging.basicConfig(filename='search_ft.log', level=logging.INFO, format='%(asctime)s %(message)s')

In [4]:
def write_log(*args, status=None):
    record = '{} ==> {}'.format(args, status)
    logging.info(record)

##MongoDB

In [5]:
client = MongoClient()
client.drop_database('ft')
db = client.ft

In [6]:
def insert_documents(docs, q):
    try:
        inserted = db.articles.insert_many(docs, ordered=False)
        total_inserted = len(inserted.inserted_ids)
        write_log(q, status='INSERTION OK {}'.format(total_inserted))
    except BulkWriteError as e:
        for err in e.details['writeErrors']:
            write_log(q, status='INSERTION EXCEPTION {}'.format(err['errmsg']))
        write_log(q, status='INSERTION OK {}'.format(e.details['nInserted']))
    except Exception as e:
        write_log(q, status='INSERTION EXCEPTION {}'.format(e))

##Search terms

In [7]:
def preprocess_term(term):
    terms = []
    
    curated_term = term.lower()
    curated_term = curated_term.replace(' & ', ' ')
    curated_term = curated_term.replace(' and ', ' ')
    terms.append(curated_term)
    if '-' in curated_term:
        terms.append(curated_term.replace('-', ''))
#         terms.append(curated_term.replace('-', ' ')) # same result

    for term in terms:
        if term.endswith('corporation'):
            terms.append(term[:-12])
        elif term.endswith('corp'):
            terms.append(term[:-5])
        elif term.endswith('company'):
            terms.append(term[:-8])
        elif term.endswith('inc.'):
            terms.append(term[:-5])
        elif term.endswith('inc'):
            terms.append(term[:-4])
        elif term.endswith('.com'):
            terms.append(term[:-4])
    
#     terms = list(map(lambda x: '"{}"'.format(x), terms))
    return terms

In [8]:
term_dict = {}
for i in range(1, 4):
    term_dict[i] = {}
    st_file = open('search_terms_{}.txt'.format(i))
    term_list = map(lambda x: x.strip(), st_file.readlines())
    for term in term_list:
        term_dict[i][term] = preprocess_term(term)

In [9]:
with open('search_terms.yml', 'w') as search_term_file:
    search_term_file.write(yaml.dump(term_dict, default_flow_style=False))

In [10]:
with open('search_terms.yml') as search_term_file:
    term_yaml = yaml.load(search_term_file.read())
    #
    for key in term_yaml:
        d = term_yaml[key]
        for k in d:
            d[k] = list(map(lambda x: '"{}"'.format(x), d[k]))

##NYTimes API keys

In [11]:
# One API key for each core
api_keys = [
        "mtqjgqpd63c5aky6ujz7mfe4",
]

In [12]:
def next_multiple(n, m):
    # 4, 17 ==> 20
    rest = m % n
    return m if rest == 0 else m + n - rest

def chunks(l, n_chunks):
    l = list(l)
    size = len(l)
    n = next_multiple(n_chunks, size) // n_chunks
    for i in range(0, size, n):
        yield l[i:i + n]

In [13]:
# [(search_term, original_term, term_type)]
search_terms = [(t, k2, k1)
                   for k1 in term_yaml
                       for k2 in term_yaml[k1]
                           for t in term_yaml[k1][k2]
              ]

In [14]:
# TEST

# api_keys = [
#     "3439a9084efa80c4f5fb1d290dfc1b44:11:70233981", # my api key
#     "a5c709f3168b829711241b243457e9d6:13:70235641", # the other api key
# ]

# search_terms = [
#     ('"entrepreneur"', 'entrepreneur', 1),
#     ('"executive"', 'executive', 1),
#     ('"google"', 'google', 2),
#     ('"amazon"', 'amazon.com', 2),
#     ('"ebay"', 'e-bay', 3),
#     ('"facebook"', 'facebook', 3),
# ]

In [15]:
search_terms_by_api_key = defaultdict(list)
for i in range(1, 4):
    search_terms_i = list(filter(lambda x: x[2] == i, search_terms))
    for t in zip(api_keys, chunks(search_terms_i, len(api_keys))):
        search_terms_by_api_key[t[0]].extend(t[1])

##Dates

In [16]:
def month_duration(d):
    if d.month in [1, 3, 5, 7, 8, 10, 12]:
        ndays = 31
    elif d.month in [4, 6, 9, 11]:
        ndays = 30
    else: # d.month == 2
        if d.year % 400 == 0 or d.year % 4 == 0 and d.year % 100 != 0: # lap-year
            ndays = 29
        else:
            ndays = 28
    return ndays

def n_days(d, n_months):
    ndays = 0
    new_d = d
    for _ in range(n_months):
        m_duration = month_duration(d)
        d += timedelta(m_duration)
        ndays += m_duration
    return ndays - 1

def date_ranges(begin_date, end_date, n_months=1):
    aux_date = begin_date
    while aux_date < end_date:
        ndays = n_days(aux_date, n_months)
        yield (aux_date, min(aux_date + timedelta(ndays), end_date))
        aux_date += timedelta(ndays + 1)

##Timer

In [17]:
LAST_REQUEST = time()

In [18]:
def wait(f, *args, t=9):
    global LAST_REQUEST
    now = time()
    elapsed_time = now - LAST_REQUEST
    if elapsed_time < t:
        sleep(t - elapsed_time)
    LAST_REQUEST = time()
    return f(*args)

##Query

In [19]:
class Query:
    def __init__(self, term, begin_date, end_date, page, api_key):
        self.term = term
        self.begin_date = begin_date
        self.end_date = end_date
        self.page = page
        self.api_key =api_key
    
    def __repr__(self):
        return 'Q<{}, {}, {}, {}, {}>'.format(self.term, self.begin_date, self.end_date, self.page, self.api_key)

##Downloader

In [347]:
import pytz
from datetime import timedelta, date, datetime, tzinfo
BEGIN_DATE = datetime(1999, 1, 1)
END_DATE = date(2014, 12, 31)

In [350]:
datetime.combine(END_DATE, datetime.min.time()).isoformat() + 'Z'

'2014-12-31T00:00:00Z'

In [349]:
print(type(BEGIN_DATE))
print(type(END_DATE))

<class 'datetime.datetime'>
<class 'datetime.date'>


In [348]:
str(BEGIN_DATE)

'1999-01-01 00:00:00'

In [41]:
url = 'http://api.ft.com/content/search/v1?apiKey=mtqjgqpd63c5aky6ujz7mfe4'
r1 = requests.post(url, json={'queryString': 'executive'})
r1

<Response [200]>

In [62]:
url = 'http://api.ft.com/content/search/v1'
r2 = requests.post(url, params={'apiKey': 'mtqjgqpd63c5aky6ujz7mfe4'}, json={'queryString': 'entrepreneur'})
r2

<Response [200]>

In [368]:
url = 'http://api.ft.com/content/search/v1'
r3 = requests.post(url, params={'apiKey': 'mtqjgqpd63c5aky6ujz7mfe4'}, json={
        'queryString': '"entrepreneur" AND initialPublishDateTime:>2014-01-01T00:00:00Z AND initialPublishDateTime:<2014-01-04T00:00:00Z',
#         'queryString': '"executive"',
        'resultContext': {
            'aspects': ["title","lifecycle","location","summary","editorial"],
            'offset': 0,
        }
    })
r3

<Response [200]>

In [369]:
r3.reason

'OK'

In [370]:
r3.json()['results'][0]['results']

[{'apiUrl': 'http://api.ft.com/content/items/v1/b7a2a7f2-72d6-11e3-b05b-00144feabdc0',
  'aspectSet': 'article',
  'editorial': {'byline': 'By Henry Mance, Media Correspondent'},
  'id': 'b7a2a7f2-72d6-11e3-b05b-00144feabdc0',
  'lifecycle': {'initialPublishDateTime': '2014-01-01T21:00:25Z',
   'lastPublishDateTime': '2014-01-01T21:00:25Z'},
  'location': {'uri': 'http://www.ft.com/cms/s/0/b7a2a7f2-72d6-11e3-b05b-00144feabdc0.html'},
  'modelVersion': '1',
  'summary': {'excerpt': 'Richard Desmond has asked advisers to work on a possible sale of Channel 5, the British free-to-air TV broadcaster, that'},
  'title': {'title': 'Richard Desmond explores Channel 5 sale options'}},
 {'apiUrl': 'http://api.ft.com/content/items/v1/bde1a771-814a-3e45-8443-220537ed8ba1',
  'aspectSet': 'blogPost',
  'editorial': {'byline': 'Samantha Pearson'},
  'id': 'bde1a771-814a-3e45-8443-220537ed8ba1',
  'lifecycle': {'initialPublishDateTime': '2014-01-02T23:26:40Z',
   'lastPublishDateTime': '2014-01-02T23

In [357]:
len(r3.json()['results'][0]['results'])

11

In [63]:
r1.json() == r2.json()

False

In [67]:
r = r2.json()

In [68]:
r['query']

{'queryContext': {'curations': ['ARTICLES',
   'BLOGS',
   'PAGES',
   'PODCASTS',
   'VIDEOS']},
 'queryString': 'entrepreneur',
 'resultContext': {'contextual': False,
  'highlight': False,
  'maxResults': 100,
  'offset': 0}}

In [21]:
def search(q):
    try:
        base_url = 'http://api.ft.com/content/search/v1'
        payload = {'apiKey': q.api_key}
        data = {'queryString': q.term}
        r = requests.post(base_url, params=payload, json=data)
        response = r.json()
        
        if response['status'] == 'OK':
            write_log(q, status='SEARCH OK {}'.format(response['response']['meta']['hits']))
        elif response['status'] == 'ERROR':
            write_log(q, status='SEARCH ERROR {}'.format(response['errors']))
        else:
            write_log(q, status='SEARCH {}'.format(response['status']))
    except ValueError as e:
        if str(e) == 'Expecting value: line 1 column 1 (char 0)':
            write_log(q, status='SEARCH ERROR api-key')
        else:
            write_log(q, status='SEARCH EXCEPTION {}'.format(e))
        response = {'status': 'ERROR'}
    except Exception as e:
        write_log(q, status='SEARCH EXCEPTION {}'.format(e))
        response = {'status': 'ERROR'}
    return response

In [22]:
def get_documents_by_page(q, term, total_results):
    n_pages = math.ceil(total_results / 10)
    for page in range(n_pages):
        q.page = page
        response = wait(search, q)
        
        if response['status'] == 'OK':
            docs = response['response']['docs']
            for doc in docs:
                doc.update({'q': q.__dict__, 'original_term': term[1], 'term_type': term[2]})
            insert_documents(docs, q)

In [23]:
def get_documents_by_query(q, term):
    response = wait(search, q)
    
    if response['status'] == 'OK':
        total_results = response['response']['meta']['hits']
        if total_results <= 1010:
            get_documents_by_page(q, term, total_results)
        else:
            bd = parser.parse(q.begin_date)
            ed = parser.parse(q.end_date)
            half = (ed - bd) // 2
            
            begin_date1 = q.begin_date
            end_date1 = (bd + timedelta(half.days)).strftime("%Y%m%d")
            q1 = Query(q.term, begin_date1, end_date1, 0, q.api_key)
            get_documents_by_query(q1, term)
            
            begin_date2 = (bd + timedelta(half.days + 1)).strftime("%Y%m%d")
            end_date2 = q.end_date
            q2 = Query(q.term, begin_date2, end_date2, 0, q.api_key)
            get_documents_by_query(q2, term)

In [24]:
def download_by_date_ranges(term, api_key):
    for r in date_ranges(BEGIN_DATE, END_DATE, 1):
        begin_date = r[0].strftime("%Y%m%d")
        end_date = r[1].strftime("%Y%m%d")
        q = Query(term[0], begin_date, end_date, 0, api_key)
        get_documents_by_query(q, term)

In [25]:
def download_by_term(term, api_key):
    begin_date = BEGIN_DATE.strftime("%Y%m%d")
    end_date = END_DATE.strftime("%Y%m%d")
    q = Query(term[0], begin_date, end_date, 0, api_key)
    response = wait(search, q)
    
    if response['status'] == 'OK':
        total_results = response['response']['meta']['hits']
        if total_results <= 1010:
            get_documents_by_page(q, term, total_results)
        else:
            download_by_date_ranges(term, api_key)

In [26]:
def download_by_key(terms, api_key):
    for term in terms:
        try:
            download_by_term(term, api_key)
        except Exception as e:
            write_log(e, status='DOWNLOAD EXCEPTION {} {}'.format(term, api_key))

In [27]:
def downloader(search_terms_by_api_key, api_keys):
    # Version parallel
    Parallel(n_jobs=8)(delayed(download_by_key)(search_terms_by_api_key[api_key], api_key) for api_key in api_keys)

    # Version sequencial
#     for api_key in api_keys:
#         download_by_key(search_terms_by_api_key[api_key], api_key)

In [None]:
downloader(search_terms_by_api_key, api_keys)