## Bulk Upload Test Data

Some sample Twitter data with what looks like Civil Rights-related tweets.

Sources:  
[Python Elastic Search](https://elasticsearch-py.readthedocs.io/en/master/api.html)

In [1]:
import pandas as pd
import os
import json

from elasticsearch import (
    Elasticsearch,
    helpers
)
from datetime import datetime
from io import StringIO
from configparser import ConfigParser
from sqlalchemy import create_engine

data_dir = os.path.join(os.pardir,'data')
config_file = os.path.join(os.pardir,'config','config.ini')

def get_ini_vals(ini_file, section):
    config = ConfigParser()
    config.read(ini_file)
    return config[section]

es_creds = get_ini_vals(config_file, 'elasticsearch')
mysql_creds = get_ini_vals(config_file, 'mysql')

In [2]:
# connect to elastic search
es = Elasticsearch(
    [es_creds['host']],
    http_auth=('',''),
    port = es_creds['port'],
    use_ssl=False
)
print(es.info())

{'version': {'build_date': '2017-02-09T22:05:32.386Z', 'lucene_version': '6.4.1', 'build_hash': 'db0d481', 'number': '5.2.1', 'build_snapshot': False}, 'cluster_uuid': 'lf55i5JCSBq8nk_ZrTyTKQ', 'tagline': 'You Know, for Search', 'cluster_name': 'elasticsearch', 'name': 'pvnM2mO'}


In [3]:
# print all indices
indices=es.indices.get_alias().keys()
print(indices)

dict_keys(['17.03.10', '17.03.13', '17.03.09', '17.03.15', '17.03.17', '17.03.14', '17.03.07', '17.03.12', '17.03.06', '.kibana', 'shakespeare', '17.03.08', '17.03.11', 's-17.03.08', 's-17.03.09', 'twitter-ross', '17.03.16', 'twitter', 's-17.03.10'])


In [15]:
# read sample twitter data
data = os.path.join(data_dir, 'static_data', 'data4.txt')

content = []
with open(data, 'r') as f:
    for line in f:
        content.append(json.loads(line))

content[:5]
df = pd.DataFrame(content)
df.head(3)

Unnamed: 0,keywords,search_date,source,text,url
0,"[donation, civil rights]",02/19/2017,Google,We are the individual donor's first source for...,https://www.charitynavigator.org/index.cfm?bay...
1,"[donation, civil rights]",02/19/2017,Google,Find ratings and read reviews of Civil Rights ...,http://greatnonprofits.org/categories/view/civ...
2,"[donation, civil rights]",02/19/2017,Google,"Nov 15, 2016 ... Traffic to the site was so he...",http://time.com/money/4566160/trump-election-c...


## Helper Functions for ES

It's safer to use a generator function to do bulk uploads in the cases of large files (although here we've already read in all the data). Within the generator function, we have the ability to parse each item/line of the dataset to create an index and document in the format we want. For now, the data is uploaded almost as-is for testing.

In this example, the documents are uploaded to the shared ES cluster under the index 'twitter'.

In [26]:
def tweet_to_es(tweet_collection):
    """generator function for parsing each row of data.
    returns an index and data. Can be potentially used 
    """
    for tweet_dict in tweet_collection:
        # create a timestamp  for elastic search timestamp
        idx = datetime.now().isoformat()

        # Keep dictionary as-is
        yield idx, tweet_dict   
        
def es_bulk_add(es, collection: list):
    """Can read in a raw file byte stream,
    collection just needs to be processed so that it iterates
    over each document.
    """
    bulk = ({
            "_index" : "twitter",
            "_type"  : "tweet",
            "_id"    : idx,
            "_source": tweet_d,
        } for idx, tweet_d in tweet_to_es(collection)
    )
    
    try:
        helpers.bulk(es, bulk)
    except:
        raise
        
es_bulk_add(es, content)   

## Sample Query

Inspired by: https://qbox.io/blog/python-scripts-interact-elasticsearch-examples

Note the use of single and double quotes. Also works without the `index` parameter.

In [23]:
results = es.search(index='s-17.03.10',
           q='text: "womens" OR "san jose" AND "march"',
           size=1,
           request_timeout=30)
print(results)

"""
{
  "match": {
    "text": {
      "analyzer": "womens rights",
      "fuzziness": "2",
      "fuzzy_transpositions": "true",
      "operator": "AND"
    }
  }
}

"""

{'timed_out': False, 'hits': {'hits': [{'_index': 's-17.03.10', '_id': 'AVq1jkyuMNAj8foZ2nBB', '_score': 21.79158, '_source': {'@version': '1', 'in_reply_to_screen_name': None, 'id': 839990891800645633, 'in_reply_to_user_id_str': None, 'truncated': False, 'created_at': 'Fri Mar 10 00:06:49 +0000 2017', 'retweeted': False, 'coordinates': None, 'entities': {'user_mentions': [], 'hashtags': [], 'symbols': [], 'urls': [{'display_url': 'bit.ly/2mk08LK', 'expanded_url': 'http://bit.ly/2mk08LK', 'url': 'https://t.co/znZ2xWrqIR', 'indices': [73, 96]}]}, 'timestamp_ms': '1489104409538', 'favorite_count': 0, 'favorited': False, 'possibly_sensitive': False, 'lang': 'en', 'is_quote_status': False, '@timestamp': '2017-03-10T00:06:49.000Z', 'in_reply_to_status_id': None, 'user': {'profile_text_color': '000000', 'notifications': None, 'profile_background_tile': False, 'id': 3253630807, 'location': 'San Jose, CA', 'follow_request_sent': None, 'profile_background_image_url_https': 'https://abs.twimg.co

'\n{\n  "match": {\n    "text": {\n      "analyzer": "womens rights",\n      "fuzziness": "2",\n      "fuzzy_transpositions": "true",\n      "operator": "AND"\n    }\n  }\n}\n\n'

In [22]:
for hit in results['hits']['hits']:
    print(hit['_score'], hit['_source'], hit['_id'])

18.07563 {'@version': '1', 'in_reply_to_screen_name': None, 'id': 840038570911973376, 'in_reply_to_user_id_str': None, 'truncated': False, 'created_at': 'Fri Mar 10 03:16:17 +0000 2017', 'retweeted': False, 'coordinates': None, 'entities': {'user_mentions': [], 'hashtags': [], 'symbols': [], 'urls': [{'display_url': 'washingtonpost.com/news/energy-en…', 'expanded_url': 'https://www.washingtonpost.com/news/energy-environment/wp/2017/03/09/this-climate-lawsuit-could-change-everything-no-wonder-the-trump-administration-doesnt-want-it-going-to-trial/?tid=sm_fb', 'url': 'https://t.co/upATPnYHKi', 'indices': [0, 23]}]}, 'timestamp_ms': '1489115777124', 'favorite_count': 0, 'favorited': False, 'possibly_sensitive': False, 'lang': 'und', 'is_quote_status': False, '@timestamp': '2017-03-10T03:16:17.000Z', 'in_reply_to_status_id': None, 'user': {'profile_text_color': '3C3940', 'notifications': None, 'profile_background_tile': True, 'id': 123157926, 'location': 'San Jose, California', 'follow_req

In [6]:
queries = [
    'text: "black lives" donation',   # donation not field-specific
    'text: "#BLM" march',
    #'text: "womens rights" AND "march"',
    #'text: "climate" AND "donate"',
    'text: "climate" AND ("march" OR "protest")',
    'text: "climate" AND ("sign" OR "petition")',
    'text: "#adaywithoutwomen" AND "protest"',
    'text: "immigration" OR "dreamer" AND "petition"',
    'text: "call representative" OR "call senator"',
    'text: "#globalwarming"',
]

for query in queries:
    results = es.search(index="s-17.03.10", 
                        q=query, 
                        size=3,
                        request_timeout=30
                       )     # result set limit
    print("query %s results" % query)
    for hit in results['hits']['hits']:
        print(len(hit))
        print(hit['_id'], hit['_score'], 
              #hit['_source']['text'], hit['_source']['created_at'])
              hit['_source']
              )
    print("\n")

query text: "black lives" donation results
5
AVq1xg61MNAj8foZ3Aj- 12.334102 {'contributors': None, 'source': '<a href="http://twitter.com" rel="nofollow">Twitter Web Client</a>', 'in_reply_to_screen_name': None, 'in_reply_to_status_id_str': None, 'timestamp_ms': '1489108063683', 'entities': {'user_mentions': [], 'urls': [{'display_url': 'donate.climaterealityproject.org/checkout/donat…', 'expanded_url': 'https://donate.climaterealityproject.org/checkout/donation?eid=122106', 'indices': [110, 133], 'url': 'https://t.co/8cVJBMEMTd'}], 'hashtags': [], 'symbols': []}, 'created_at': 'Fri Mar 10 01:07:43 +0000 2017', '@timestamp': '2017-03-10T01:07:43.000Z', 'possibly_sensitive': False, 'in_reply_to_user_id': None, 'id_str': '840006218395467776', 'favorite_count': 0, 'filter_level': 'low', 'in_reply_to_status_id': None, 'truncated': False, 'coordinates': None, 'text': 'I just made a donation to The Climate Reality Project for Q1 2017-100 Days Campaign-Email 3 Low Dollar-Classy https://t.co/8

## MySQL

For practice, insert:
#ClimateMarch

Get ready for another big March.

The People’s Climate March on DC:…  Fri Mar 10 02:15:38 +0000 2017



In [44]:
# connect to openatrium database
engine = create_engine("""mysql+pymysql://{user}:{password}@{host}:{port}/{db}""".format(user=mysql_creds['user'],
                                                                                 password=mysql_creds['password'],
                                                                                 host=mysql_creds['host'],
                                                                                 port=mysql_creds['port'],
                                                                                 db=mysql_creds['database']
                                                                                )
                      )

conn = engine.connect()

# table fields
rzst_event_header =[
    'elasticsearch_id',
    'title',
    'body_value',
    'location_text',
    'event_datetime_from',
    'event_datetime_to',
    'score',
    'insert_dt',
    'event_type',
    'pri_action_type',
    'sec_action_type'
]


# need to create a flow that parses out data we need first...
insert_vals= dict.fromkeys(rzst_event_header, "")

#def parse_es_result(result):
#    from pandas.io.json import json_normalize
#    
#    for hit in result['hits']['hits']:
#        print()

In [67]:
from datetime import datetime


selected_result1 = {
    'elasticsearch_id': 'AVq1_7F8MNAj8foZ3ZKg', # result['hits']['hits']['_id']
    'title': 'Sign the Petition!',              # not sure what we'd parse
    # result['hits']['hits']['_source']['text']
    'body_value': 'Join the fight to defend climate progress! - Sign the Petition! https://t.co/oM9NDmoN8S via @Change', 
    'location_text': '',
    'event_datetime_from': '',
    'event_datetime_to': '',
    'score': 16.670967,                                  # result['hits']['hits']['_score']
    'insert_dt': datetime.now().date(),
    'event_type': 'petition',
    'pri_issue_type': 'climate',
    'sec_issue_type': ''
    
}

selected_result2 = {
    'elasticsearch_id': 'AVq15Dw5MNAj8foZ3Nfy', # result['hits']['hits']['_id']
    'title': 'Climate March DC',              # not sure what we'd parse
    # result['hits']['hits']['_source']['text']
    'body_value': '#ClimateMarch\nGet ready for another big March.\n The People\'s Climate March on DC:', 
    'location_text': 'DC',
    'event_datetime_from': 'April 29th 2017',
    'event_datetime_to': 'April 29th 2017',
    'score': 10.8778515,                                  # result['hits']['hits']['_score']
    'insert_dt': datetime.now().date(),
    'event_type': 'protest',
    'pri_issue_type': 'climate',
    'sec_issue_type': ''
    
}


pd.DataFrame([selected_result1, selected_result2]).to_sql('rzst_event',conn,if_exists='append',index=False )

  result = self._query(query)
  result = self._query(query)
  result = self._query(query)
  result = self._query(query)
