In [1]:
import json
import time

from datetime import datetime
from elasticsearch import Elasticsearch
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [2]:
SELECT_OTHER_BODY = {
    "query": {
        "range": {
            "@timestamp": {
                "gte": 0,
                "lte": 0
            }
        }
    }
}
LOG_LIST = ('conn', 'dns', 'files', 'http', 'tds', 'tds_sqlbatch', 'ssl')
    
class ElasticSearch(object):
    
    def __init__(self, host, port, user, password):
        self._els_host = host
        self._els_port = port
        self._user = user
        self._password = password
        self._es = None
    
    def connect(self):
        if self._es is None:
            elastic = ('http://%s:%s@%s:%s' %(self._user, self._password, self._els_host, self._els_port))
            self._es = Elasticsearch([elastic])
            
    def __ts2utc(self, src):
        return 'T'.join(str(datetime.utcfromtimestamp(int(src))).split(' '))
    
    def __local2utcts(self, src):
        return int(str(time.mktime(datetime.strptime(src, '%Y-%m-%d %H:%M:%S').timetuple()))[:-2])
        
    def extract_data(self, time_start, time_end, index='', cut=10000):
        """
        extract data from elasticsearch and return in json format
        input: utc timestamp
        """
        self.connect()
        query_body = SELECT_OTHER_BODY
        query_body['query']['range']["@timestamp"]["gte"] = self.__ts2utc(self.__local2utcts(time_start))
        query_body['query']['range']['@timestamp']["lte"] = self.__ts2utc(self.__local2utcts(time_end))
        result = self._es.search(index=index, body=query_body, size=cut)
        
        return result

In [3]:
class Etl(object):
    """
    extract data from current elasticsearch and save it into spark
    """
    
    def __init__(self, sc):
        self.sc = sc
        self.spark = SparkSession(sc)
#         INIT('ETL Instantiated')
        
    def run_etl(self, els, start_time, end_time, log_list=LOG_LIST, save=False):
        """
        extract log from els with given index
        input format: "YYYY-MM-DD 00:00:00"
        """  
        def jsonToDataFrame(json, spark, schema=None):
            reader = spark.read
            if schema:
                reader.schema(schema)
            return reader.json(sc.parallelize([json]))
        
        result = {}
        for index in log_list:
            es_dict = els.extract_data(
                start_time, end_time, index)['hits']['hits']
            DF = jsonToDataFrame(json.dumps(es_dict), self.spark)
            source = DF.select("_source.*")
            if save is True:
                source.write.parquet("data_test/{}".format(index), 'overwrite')
            result[index] = source
            
        return result

In [4]:
els = ElasticSearch("192.168.7.15", 9200, "elastic", "helios12$")
sc = SparkContext()
etl = Etl(sc)

In [5]:
results = etl.run_etl(
    els, "2018-01-01 00:00:00", "2018-07-17 00:00:00")

In [6]:
results.keys()

dict_keys(['conn', 'dns', 'files', 'http', 'tds', 'tds_sqlbatch', 'ssl'])