# **Set up and install**



## *Set up*

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.0.3-bin-hadoop2.7"

In [2]:
print(os.environ["SPARK_HOME"])

spark-3.0.3-bin-hadoop2.7


In [3]:
print(os.environ["JAVA_HOME"])

/usr/lib/jvm/java-11-openjdk-amd64


In [4]:
import findspark
findspark.init()

In [5]:
findspark.find()

'spark-3.0.3-bin-hadoop2.7'

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Test")\
        .config("spark.driver.extraClassPath", "elasticsearch-hadoop-7.6.2/dist/elasticsearch-spark-20_2.11-7.6.2.jar") \
        .config("spark.executor.memory", "4g")\
        .config("spark.driver.memory", "4g")\
        .config("spark.cores.max", "2")\
        .getOrCreate()

In [7]:
spark

In [8]:
!curl -X GET "localhost:9200/"

{
  "name" : "ctr730583",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "5QKgMmS8QNm-Td-GAv_0Vw",
  "version" : {
    "number" : "7.6.2",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "ef48eb35cf30adf4db14086e8aabd07ef6fb113f",
    "build_date" : "2020-03-26T06:34:37.794943Z",
    "build_snapshot" : false,
    "lucene_version" : "8.4.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}


In [9]:
import json

es_write_conf = { "es.nodes": "localhost", "es.batch.size.entries": "1" ,"es.port" : "9200", "es.resource" : "testindex/testdoc", "es.input.json" : "yes", "es.mapping.id": "doc_id" }

data = [ {'some_key': 'some_value', 'doc_id': 123}, {'some_key': 'some_value', 'doc_id': 456}, {'some_key': 'some_value', 'doc_id': 789} ]
rdd = spark.sparkContext.parallelize(data)

def format_data(x):
    return (x['doc_id'], json.dumps(x))

rdd = rdd.map(lambda x: format_data(x))

# rdd.saveAsNewAPIHadoopFile( \
#     path='-', \
#     outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", \
#     keyClass="org.apache.hadoop.io.NullWritable", \
#     valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", \
#     conf=es_write_conf)

In [10]:
rdd.collect()

[(123, '{"some_key": "some_value", "doc_id": 123}'),
 (456, '{"some_key": "some_value", "doc_id": 456}'),
 (789, '{"some_key": "some_value", "doc_id": 789}')]

In [11]:
rdd.saveAsNewAPIHadoopFile( \
    path='-', \
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", \
    keyClass="org.apache.hadoop.io.NullWritable", \
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", \
    conf=es_write_conf)

In [12]:
!curl -X GET 'localhost:9200/testindex/_search'

{"took":58,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"testindex","_type":"testdoc","_id":"123","_score":1.0,"_source":{"some_key": "some_value", "doc_id": 123}},{"_index":"testindex","_type":"testdoc","_id":"456","_score":1.0,"_source":{"some_key": "some_value", "doc_id": 456}},{"_index":"testindex","_type":"testdoc","_id":"789","_score":1.0,"_source":{"some_key": "some_value", "doc_id": 789}}]}}

In [13]:
!curl -X DELETE localhost:9200/testindex

{"acknowledged":true}

## *Install sample data*

In [14]:
#!wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip
#!unzip ml-latest-small.zip
PATH_TO_DATA = "ml-latest-small"

## *Install elasticsearch from pypi to use a client instance*

In [15]:
#!pip install elasticsearch

In [16]:
import time
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}], timeout=60, max_retries=10, retry_on_timeout=True)

In [17]:
es.info()

{'name': 'ctr730583',
 'cluster_name': 'elasticsearch',
 'cluster_uuid': '5QKgMmS8QNm-Td-GAv_0Vw',
 'version': {'number': '7.6.2',
  'build_flavor': 'default',
  'build_type': 'tar',
  'build_hash': 'ef48eb35cf30adf4db14086e8aabd07ef6fb113f',
  'build_date': '2020-03-26T06:34:37.794943Z',
  'build_snapshot': False,
  'lucene_version': '8.4.0',
  'minimum_wire_compatibility_version': '6.8.0',
  'minimum_index_compatibility_version': '6.0.0-beta1'},
 'tagline': 'You Know, for Search'}

# **Insert**

## *Spark insert to ES*

### Load rating data

In [18]:
ratings = spark.read.csv(PATH_TO_DATA + "/ratings.csv", header=True, inferSchema=True)
ratings.cache()
print("Number of ratings: {}".format(ratings.count()))
print("Sample of ratings:")
ratings.show(5)

Number of ratings: 100836
Sample of ratings:
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



### Insert by saveAsNewAPIHadoopFile()

In [19]:
rddtest2 = ratings.rdd.map(lambda x : json.dumps({'userId': x[0], 'movieId': x[1], 'rating': x[2], 'timestamp': x[3]}))

In [20]:
rddtest3 = rddtest2.zipWithIndex().map(lambda x: (x[1], x[0]))

In [21]:
es_write_conf = { "es.nodes": "localhost","es.port" : "9200", "es.resource" : "testindex3/testdoc", "es.input.json" : "yes" }
current_time = time.time()
rddtest3.saveAsNewAPIHadoopFile( \
    path='-', \
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", \
    keyClass="org.apache.hadoop.io.NullWritable", \
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", \
    conf=es_write_conf)
print("Time spark insert to es by saveAsNewAPIHadoopFIle(): {}".format(time.time() - current_time))

Time spark insert to es by saveAsNewAPIHadoopFIle(): 7.358740568161011


In [22]:
es.search(index="testindex3", q="*", size=3)

{'took': 43,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 10000, 'relation': 'gte'},
  'max_score': 1.0,
  'hits': [{'_index': 'testindex3',
    '_type': 'testdoc',
    '_id': 'Fhnj430BX_kIPVutskDL',
    '_score': 1.0,
    '_source': {'userId': 1,
     'movieId': 1,
     'rating': 4.0,
     'timestamp': 964982703}},
   {'_index': 'testindex3',
    '_type': 'testdoc',
    '_id': 'Fxnj430BX_kIPVutskDL',
    '_score': 1.0,
    '_source': {'userId': 1,
     'movieId': 3,
     'rating': 4.0,
     'timestamp': 964981247}},
   {'_index': 'testindex3',
    '_type': 'testdoc',
    '_id': 'GBnj430BX_kIPVutskDL',
    '_score': 1.0,
    '_source': {'userId': 1,
     'movieId': 6,
     'rating': 4.0,
     'timestamp': 964982224}}]}}

## *Normally insert to ES*

In [23]:
import pandas as pd
df = pd.read_csv(PATH_TO_DATA + '/ratings.csv')
print(df)

        userId  movieId  rating   timestamp
0            1        1     4.0   964982703
1            1        3     4.0   964981247
2            1        6     4.0   964982224
3            1       47     5.0   964983815
4            1       50     5.0   964982931
...        ...      ...     ...         ...
100831     610   166534     4.0  1493848402
100832     610   168248     5.0  1493850091
100833     610   168250     5.0  1494273047
100834     610   168252     5.0  1493846352
100835     610   170875     3.0  1493846415

[100836 rows x 4 columns]


In [24]:
documents = df.to_dict(orient='records')

In [25]:
from elasticsearch.helpers import bulk

current_time = time.time()
print(bulk(es, documents, index='test-ratings', doc_type='test-ratings-type', raise_on_error=True))
normally_insert_time = time.time() - current_time
print(normally_insert_time)



(100836, [])
8.93069839477539


In [26]:
es.search(index="test-ratings", q="*", size=3)

{'took': 11,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 10000, 'relation': 'gte'},
  'max_score': 1.0,
  'hits': [{'_index': 'test-ratings',
    '_type': 'test-ratings-type',
    '_id': '-hrj430BX_kIPVut0Mnf',
    '_score': 1.0,
    '_source': {'userId': 1,
     'movieId': 1,
     'rating': 4.0,
     'timestamp': 964982703}},
   {'_index': 'test-ratings',
    '_type': 'test-ratings-type',
    '_id': '-xrj430BX_kIPVut0Mng',
    '_score': 1.0,
    '_source': {'userId': 1,
     'movieId': 3,
     'rating': 4.0,
     'timestamp': 964981247}},
   {'_index': 'test-ratings',
    '_type': 'test-ratings-type',
    '_id': '_Brj430BX_kIPVut0Mng',
    '_score': 1.0,
    '_source': {'userId': 1,
     'movieId': 6,
     'rating': 4.0,
     'timestamp': 964982224}}]}}

## *Parallel insert*

In [27]:
from elasticsearch.helpers import parallel_bulk
from collections import deque

current_time = time.time()
deque(parallel_bulk(es, documents, index="ratings-test-parallel", thread_count=2, queue_size=2), maxlen = 0)
print("Time parallel insert: {}".format(time.time() - current_time))

Time parallel insert: 4.729877710342407


In [28]:
!curl -XPUT "http://localhost:9200/ratings-test-parallel/_settings" -d '{ "index" : { "max_result_window" : 200000 } }' -H "Content-Type: application/json"

{"acknowledged":true}

In [29]:
res = es.search(index="ratings-test-parallel", size=200000)
len(res['hits']['hits'])

93500

# **Query**

## Spark query

In [30]:
q0 ="""{
  "match": {
      "movieId": 3
      }
}"""

q1 = """
  {
    "match": {
      "rating": 4.0
    }
  }
"""

q = """
  {
    "match_all": {}
  }
"""

### Use newApiHadoopRDD()

In [31]:
es_read_conf = {
    "es.nodes" : "localhost",
    "es.port" : "9200",
    "es.resource" : "testindex3/",
    "es.query" : q0
}
current_time = time.time()
for i in range(1000):
  es_rddtestest = spark.sparkContext.newAPIHadoopRDD(
        inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
        keyClass="org.apache.hadoop.io.NullWritable", 
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
        conf=es_read_conf)
print("Time run newApiHadoopRDD: {}".format(time.time() - current_time))
es_rddtestest.count()

Time run newApiHadoopRDD: 75.33798003196716


52

## Normally query

In [32]:
!curl -XPUT "http://localhost:9200/testindex3/_settings" -d '{ "index" : { "max_result_window" : 200000 } }' -H "Content-Type: application/json"

{"acknowledged":true}

In [33]:
current_time = time.time()
for i in range(1000):
  restestesetest = es.search(index="testindex3", query={"match": {
        "movieId": 3
        }}, size=52)
print("Time run normally query: {}".format(time.time() - current_time))
len(restestesetest['hits']['hits'])

Time run normally query: 3.1915009021759033


52

## *Multi query*

In [34]:
import json
import logging
from elasticsearch import exceptions as es_exceptions

def msearch(es_conn, queries):
    request = ''
    for each in queries:
        request += '%s \n' %json.dumps(each)
    return es.msearch(body = request)

queries = []
for i in range(1000):
  queries.append(
    {"index": "testindex3"}
  )
  queries.append(
      {"query": {
        "match": {
          "movieId": 3
          }
        },"size": 26818
      }
  )
# queries.append(
#     {"index": "ratings"}
# )
# queries.append(
#     {"query": {
#       "match_all": {}
#       },"size": 200000
#     }
# )
# queries.append(
#     {"index": "ratings"}
# )
# queries.append(
#     {"query": {
#       "match": {
#         "movieId": 3
#         }
#       },"size": 200000
#     }
# )
# queries.append(
#     {"index": "ratings"}
# )
# queries.append(
#   {"query": {
#       "match": {
#         "rating": 4.0
#       }
#     },"size": 200000
#   }
# )
current_time = time.time()
q_results = msearch(es, queries)
print("Test multi queries time: {}".format(time.time() - current_time))

Test multi queries time: 1.078413486480713


In [2]:
current_time = time.time()
q_results_2 = msearch(es, queries)
for i in q_results_2["responses"]:
    rdd_new = spark.sparkContext.parallelize(i['hits']['hits'])
print(time.time() - current_time)

In [35]:
len(q_results["responses"][0]['hits']['hits'])

52

In [36]:
len(q_results["responses"][1]['hits']['hits'])

52

In [37]:
!curl -X DELETE localhost:9200/testindex3

{"acknowledged":true}

In [38]:
!curl -X DELETE localhost:9200/ratings-test-parallel

{"acknowledged":true}

In [39]:
!curl -X DELETE localhost:9200/test-ratings

{"acknowledged":true}