## Indexing Tweets
* In this activity, students should read about some important aspects of ElasticSearch, namely configuring mapping and settings of ES indices 
* Introducing custom analyzers in the settings segment of an ES index of tweets
* prepare settings and mappings for an index
* (bulk) inserting tweets using elasticsearch api

In [2]:
# This command is used to install elasticsearch library.
# Use it once and comment it out again
# !pip install elasticsearch

from elasticsearch import Elasticsearch, helpers
from dateutil import parser
import configparser

In [15]:
config = configparser.ConfigParser()
config.read('config.ini')

es_host = config.get('elasticsearch', 'host')
es_username = config.get('elasticsearch', 'username')
es_password = config.get('elasticsearch', 'password')

es = Elasticsearch(
    [es_host],
    basic_auth=(es_username, es_password),
    verify_certs=False
)

#name of the created index
index_name = "tweets"

#### Custom analyzers in ES
In ES, the textual elements of docuements can be analyzed before being indexed. This analysis process can be done by the default "standard" analyzer, or a developer can build his own custom analyzer in the setting segment.
To have a better idea about what analyzer can do with text, try out the following analyzer configuration.

__Example-1__
```json
GET /_analyze 
{
  "tokenizer": "whitespace",
  "analyzer": "text_processing",
  "filter": ["lowercase", "stop"],
  "char_filter": ["html_strip"],
  "text": "text to be analyzed. It contains <html></html>"
}
```
__Example-2__
```json
GET /_analyze 
{
  "tokenizer": "standard",
  "filter": [
    "lowercase",
    {
      "type": "ngram",
      "min_gram": 3,
      "max_gram": 4
    }
  ],
  "analyzer": "text_processing",
  "char_filter": ["html_strip"],
  "text": "text to be analyzed. It contains <html></html>"
}
```

## [1]  Answer each of the following questions:
In the below cell, we build the settings and mappings of an index (tweets). This code snippet should be called once to create and configure the index.
1) talk about the ngram_filter filter and the purpose of using it:
    The ngram filter is useful for search for partial of word matching,
    this happen from tokenize each tokem into the partial word from min and max ngram that specified in the setting,
    to specify the boundaries word range.

    The ngram_filter filter will tokenize each token into the substring based on the min and max scale specified.
    Example of tokenization process in ngram_filter if min_gram is 3 and max_gram is 4
    "mohammad"
    (3 min_gram) "moh," "oha," "ham," "amm," "mma," and "mad"
    (4 max_gram) "moha," "oham," "hamm," "amma," and "mmad"

2) The analyzer `text_processing` is used to analyze the `text` field of the tweet. Update the above cell to link this analyzer with the `text` field:
    This will link the setting text field with analyze text_processing 
    "text": {
        "type": "text",
        "analyzer": "text_processing" 
    },

3) describe the purpose of the `ignore_above` fields:
    Ensure that field value contain this ignore_above property does not exceed the specified size and limit the size of the field 
    and will be truncated when add this value to the index or to the this field value with the size more than the specified size.

In [16]:
# Create index with settings and mapping

# This test is done during development only. 
if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name)
    

# index settings
configurations = {
        "settings": {
            "analysis": {
                "filter": {
                    "ngram_filter": {
                        "type": "ngram",
                        "min_gram": 3,
                        "max_gram": 4
                    }
                },
                "analyzer": {
                    "text_processing": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": [
                            "lowercase",
                            "ngram_filter"
                        ]
                    }
                }
            }
        },
        "mappings": {
            "properties": {
                "date": {
                    "type": "date",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "flag": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "id": {
                    "type": "keyword",
                    "ignore_above": 256
                },
                "target": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "text": {
                    "type": "text",
                    "analyzer": "text_processing" 
                },
                "user": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                }
            }
        }
    }


# create index
es.indices.create(index=index_name, ignore=400, body=configurations)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'tweets'})

## [2] fixing an error wit the date field
The below cell shows how to insert one cell into the created index. However, it throws an exception when trying to insert the date value. Investigate the reason and fix it.


In [17]:


# inserting records

# target: the polarity of the tweet (0 = negative, 2 = neutral, 4 = positive)
# ids: The id of the tweet ( 2087)
# date: the date of the tweet (Sat May 16 23:58:44 UTC 2009)
# flag: The query (lyx). If there is no query, then this value is NO_QUERY.
# user: the user that tweeted (robotickilldozr)
# text: the text of the tweet (Lyx is cool)
    

tweet = {
  "target": "4",
  "id": "2193602064",
  "date": parser.parse("Tue Jun 16 08:40:49 PDT 2009"),
  "flag": "NO_QUERY",
  "user": "tinydiamondz",
  "text": "Happy 38th Birthday to my boo of alll time!!! Tupac Amaru Shakur"
}

res = es.index(index=index_name, id=tweet['id'], body=tweet)



print(res)

# Now check http://localhost:9200/tweets/_mappings

{'_index': 'tweets', '_id': '2193602064', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}


## [3] Bulk insertion

    #1) download the twitter file and compress it using gzip command
    #2) read the tweets one by one using the above code, then complete it towards performing bulk insertion of tweets (bulks of 10000 tweets)
    
https://www.kaggle.com/kazanova/sentiment140/data#

In [20]:
import csv, gzip, warnings


warnings.filterwarnings("ignore")

file_path = "d:\\docs-tfidf\\tweets_ds.csv.gz"

batch_size = 10000
actions = []

with gzip.open(file_path, 'rt', encoding='utf-8') as f:
    csv_reader = csv.DictReader(f)
    x = 0
    
    for i, row in enumerate(csv_reader, start=1):
        try:
            action = {
                "_op_type": "index",
                "_index": index_name,
                "_id": row["id"],
                "_source": {
                    "target": row["target"],
                    "id": row["id"],
                    "date": parser.parse(row["date"]),
                    "flag": row["flag"],
                    "user": row["user"],
                    "text": row["text"]
                }
            }

            actions.append(action)

            if i % batch_size == 0:
                try:
                    helpers.bulk(es, actions)
                    print(f"Successfully loaded ({len(actions)}) tweets to index!")
                    x += len(actions)
                    actions = []

                    if x >= 24000:
                        print("\nExceeded the maximum size specified 24000\n")
                        break

                except Exception as e:
                    print(f"Error while indexing batch: {e}")

        except Exception as e:
            print(f"Error processing row {i}: {e}")

if actions:
    try:
        helpers.bulk(es, actions)
        print(f"Successfully loaded ({len(actions)}) tweets to index!")

    except Exception as e:
        print(f"Error while indexing remaining batch: {e}")

Successfully loaded (10000) tweets to index!
Successfully loaded (10000) tweets to index!
Successfully loaded (10000) tweets to index!

Exceeded the maximum size specified 24000

Successfully loaded (10000) tweets to index!


In [6]:
# Create query to get users names send tweets between (15:00:00 and 16:00:00 time In 2009-04-18 date)
#  and with (text match sad word) and limit the size to 5 elements

query = {
    "query": {
        "bool": {
            "must": [
                {
                "range": {
                    "date": {
                    "gte": "2009-04-18T15:00:00",
                    "lte": "2009-04-18T16:00:00"
                    }}
                },
                {
                "match": {
                    "text": "sad"
                }}
            ] }
        },
        "size": 5
    }

results = es.search(index=index_name, body=query)

for hit in results["hits"]["hits"]:
    print(hit["_source"]["user"])

alexandraalice
thespiderman86
smartie91
xoxkaylac
kristinmarie521
