## find Spark 

In [77]:
import findspark
findspark.init() # find spark 

## import 

In [78]:
from pyspark.sql import SparkSession
import re
import findspark
import json
import pandas as pd 
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from __future__ import print_function

## 함수 정의 

### 1. Log Data Parsing 함수 

In [79]:
# log data parsing 
def parsing(row):
    row_ = dict()
    ts_pattern = re.compile( r"\[(\d+-\d+-\d+) ((\d+:\d+:\d+),\d+)\]") 

    if "INFO" in row :
        row_["Timestamp"] = ts_pattern.match(row).group(1) +" "+ ts_pattern.match(row).group(3)
        row_["Status"] = "INFO" # Status Parsing 
        row_["Message"] = row.split("INFO - ")[1] # Message Parsing        
        
    elif "WARNING" in row :
        row_["Timestamp"] = ts_pattern.match(row).group(1) +" "+ ts_pattern.match(row).group(3)
        row_["Status"] = "WARNING" # Status Parsing 
        row_["Message"] = row.split("WARNING - ")[1] # Message Parsing 
        
        
    elif "ERROR" in row :
        row_["Timestamp"] = ts_pattern.match(row).group(1) +" "+ ts_pattern.match(row).group(3)
        row_["Status"] = "ERROR" # Status Parsing 
        row_["Message"] = row.split("ERROR - ")[1] # Message Parsing   
    
    return row_    


### 2. Elastic Search에 bulk로 저장하는 함수 

In [80]:
def bulk_insert(host, port, df, index):
    es = Elasticsearch(host = host,port = port)

    data = [
      {
        "_index": index,
        "_source": {
            "datetime": x[0],
            "log-level": x[1],
            "message":x[2]}
      }
        for x in zip(df['Datetime'],df['Status'],df['Message'])
    ]

    helpers.bulk(es, data)

## Main 

### 1. Spark Sesion 생성 및 Spark Data Frame으로 Data Read 

In [81]:
spark = SparkSession.builder.master('local[2]').appName('airflow log test').getOrCreate()    
data = spark.read.text("hdfs://localhost:9870//airflow_logs/scheduler/2022-02-06/MySQL_to_CSV.py.log") # change path # change path

In [110]:
type(data)

pyspark.sql.dataframe.DataFrame

In [111]:
for i in data.collect():
    print(i)

Row(value='[2022-02-06 08:05:25,949] {processor.py:163} INFO - Started process (PID=688) to work on /opt/airflow/dags/MySQL_to_CSV.py')
Row(value='[2022-02-06 08:05:26,052] {processor.py:620} INFO - Processing file /opt/airflow/dags/MySQL_to_CSV.py for tasks to queue')
Row(value='[2022-02-06 08:05:26,098] {logging_mixin.py:109} INFO - [2022-02-06 08:05:26,063] {dagbag.py:500} INFO - Filling up the DagBag from /opt/airflow/dags/MySQL_to_CSV.py')
Row(value='[2022-02-06 08:05:56,267] {logging_mixin.py:109} INFO - [2022-02-06 08:05:56,229] {timeout.py:36} ERROR - Process timed out, PID: 688')
Row(value='[2022-02-06 08:05:56,702] {logging_mixin.py:109} INFO - [2022-02-06 08:05:56,296] {dagbag.py:334} ERROR - Failed to import: /opt/airflow/dags/MySQL_to_CSV.py')
Row(value='Traceback (most recent call last):')
Row(value='  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 331, in _load_modules_from_file')
Row(value='    loader.exec_module(new_module)')
Row

### 2. Log Data Parsing 후 Pandas Data Frame 형태로 변형 

In [112]:
dict_list = []
ts_pattern = re.compile(r'\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3})]') # timestamp

for index in range(len(data.collect()) - 1):
    row = data.collect()[index]['value']

    if ts_pattern.match(row):
         dict_list.append(parsing(row))
                
df = pd.DataFrame(dict_list)   # pandas DataFrame 변경   
df["Datetime"] = pd.to_datetime(df["Timestamp"],format="%Y-%m-%d %H:%M:%S", errors = 'coerce') 

### 3. es에 데이터 적재 

In [2]:
index_name="airflow_log_2022-02-06"
index_name

'airflow_log_2022-02-06'

In [114]:
bulk_insert("localhost", "9200", df, index_name) # host, port, data, index

### 4.  es 데이터 저장 확인 

In [107]:
es = Elasticsearch(host = 'localhost',port = '9200')
index = "test_ver0.6"
body = "select * from " + index_name

res = es.search(index=index_name)

In [108]:
res

{'took': 2,
 'timed_out': False,
 '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0},
 'hits': {'total': {'value': 1500, 'relation': 'eq'},
  'max_score': 1.0,
  'hits': [{'_index': 'airflow_log_2022-02-06',
    '_type': '_doc',
    '_id': 'VUBj0H4BbTliU3W3TL5T',
    '_score': 1.0,
    '_source': {'datetime': '2022-02-05T06:41:59',
     'log-level': 'INFO',
     'message': 'Started process (PID=218) to work on /opt/airflow/dags/MySQL_to_CSV.py'}},
   {'_index': 'airflow_log_2022-02-06',
    '_type': '_doc',
    '_id': 'VkBj0H4BbTliU3W3TL5T',
    '_score': 1.0,
    '_source': {'datetime': '2022-02-05T06:41:59',
     'log-level': 'INFO',
     'message': 'Processing file /opt/airflow/dags/MySQL_to_CSV.py for tasks to queue'}},
   {'_index': 'airflow_log_2022-02-06',
    '_type': '_doc',
    '_id': 'V0Bj0H4BbTliU3W3TL5T',
    '_score': 1.0,
    '_source': {'datetime': '2022-02-05T06:41:59',
     'log-level': 'INFO',
     'message': '[2022-02-05 06:41:59,141] {dagbag.py:500

### 5. spark stop 

In [76]:
 spark.stop()