# Setup

In [9]:
#!docker-compose up

In [36]:
!pip install boto3 pandas sqlalchemy sqlalchemy-access pyhive thrift requests sasl thrift_sasl



# Example query

1. Uploading the sample data on s3
2. Creating a table and update metadata with MSCK REPAIR
3. Retrieving results and save them in a dataframe.

In [1]:
import sqlalchemy_access as sa_a
from sqlalchemy import *
from sqlalchemy.engine import create_engine
import boto3
import botocore
import pandas as pd

# 0. Global set up
mock_s3_client = boto3.client("s3", region_name="eu-west-1", endpoint_url="http://localhost:5000")
mock_hive_engine = create_engine('hive://localhost:10000')

# 1. Create mothena bucket 
try:
    mock_s3_client.create_bucket(
        Bucket="mothena", CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}
    )
except botocore.exceptions.ClientError:
    pass

# 2. Upload file
file = open(
        "sample_data/ny_data.parquet",
        "rb",
    )
mock_s3_client.put_object(
        Bucket="mothena",
        Key="ny/downloaded_at=2020-10-15/properties.parquet",
        Body=file.read(),
    )

# 3. Create table
mock_hive_engine.execute("DROP TABLE IF EXISTS ny_data")
mock_hive_engine.execute(
    """
    CREATE EXTERNAL TABLE ny_data(
        address string, 
        residential_units int, 
        sale_price int, 
        sale_date string
    )
    PARTITIONED BY ( 
        downloaded_at string
    )
    STORED AS PARQUET LOCATION 's3a://mothena/ny'
    """
)

# 4. Update metadata
mock_hive_engine.execute("MSCK REPAIR TABLE ny_data")

# 5. Query results
df = pd.read_sql_query("SELECT * FROM ny_data", mock_hive_engine)
df.head()

Unnamed: 0,address,residential_units,sale_price,sale_date,downloaded_at
0,231 EAST 7TH,2,0,02/10/2020,2020-10-15
1,243 EAST 7TH STREET,3,4350000,16/07/2020,2020-10-15
2,262 EAST 7TH STREET,4,600000,12/12/2019,2020-10-15
3,272 EAST 7TH STREET,24,1000000,26/06/2020,2020-10-15


# 1. Quering apache logs

Athena monitoruje wybrany folder s3 z logami serwera Apache. Pliki logów są rozproszone (wiele plików). Athena saknuje je w poszukiwaniu zdefiniowanych wzorów i pozwala na relacyjne wyszukiwania.

1. Tworzymy definicje schematu
2. Uzupełniamy logi
3. Tworzymy zapytanie

In [2]:
!head sample_data/apache_logs

83.149.9.216 - - [17/May/2015:10:05:03 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
83.149.9.216 - - [17/May/2015:10:05:43 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-dashboard3.png HTTP/1.1" 200 171717 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
83.149.9.216 - - [17/May/2015:10:05:47 +0000] "GET /presentations/logstash-monitorama-2013/plugin/highlight/highlight.js HTTP/1.1" 200 26185 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"
83.149.9.216 -

In [3]:
first_file = open(
        "sample_data/apache_logs",
        "rb",
)
mock_s3_client.put_object(
        Bucket="mothena",
        Key="apache-log-folder/first_logs",
        Body=first_file.read(),
)

second_file = open(
        "sample_data/single_log",
        "rb",
)
mock_s3_client.put_object(
        Bucket="mothena",
        Key="apache-log-folder/second_logs",
        Body=second_file.read(),
)

{'ResponseMetadata': {'RequestId': '4QLVPQU1ZO5H05ITLHN5V4ZBWP9GDCO57LIM2DNTWLHJ6KNIW3LW',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'etag': '"52d2d7a35c6d2b1310511e2c0393ed73"',
   'last-modified': 'Sat, 08 Jan 2022 12:11:30 GMT',
   'content-length': '0',
   'x-amzn-requestid': '4QLVPQU1ZO5H05ITLHN5V4ZBWP9GDCO57LIM2DNTWLHJ6KNIW3LW',
   'content-type': 'text/html; charset=utf-8',
   'access-control-allow-origin': '*',
   'server': 'Werkzeug/2.0.2 Python/3.7.12',
   'date': 'Sat, 08 Jan 2022 12:11:30 GMT'},
  'RetryAttempts': 0},
 'ETag': '"52d2d7a35c6d2b1310511e2c0393ed73"'}

In [4]:
mock_hive_engine.execute("DROP TABLE IF EXISTS apache_logs")
mock_hive_engine.execute(
    """
    CREATE EXTERNAL TABLE apache_logs(
        client_ip string,
        request_received_time string,
        client_request string,
        server_status string,
        returned_obj_size string,
        metadata string
    )
    ROW FORMAT SERDE 
        'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
       'input.regex'='(.*) - - \\\\[(.*)\\\\] "(.*)" (\\\\d*) (\\\\d*) (.*)'
    )
    STORED AS INPUTFORMAT
       'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
       'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
        's3a://mothena/apache-log-folder'
    """
)
mock_hive_engine.execute("MSCK REPAIR TABLE apache_logs")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1227adb20>

In [5]:
df = pd.read_sql_query("SELECT * FROM apache_logs", mock_hive_engine)
df.head(10)

Unnamed: 0,client_ip,request_received_time,client_request,server_status,returned_obj_size,metadata
0,83.149.9.216,17/May/2015:10:05:03 +0000,GET /presentations/logstash-monitorama-2013/im...,200,203023,"""http://semicomplete.com/presentations/logstas..."
1,83.149.9.216,17/May/2015:10:05:43 +0000,GET /presentations/logstash-monitorama-2013/im...,200,171717,"""http://semicomplete.com/presentations/logstas..."
2,83.149.9.216,17/May/2015:10:05:47 +0000,GET /presentations/logstash-monitorama-2013/pl...,200,26185,"""http://semicomplete.com/presentations/logstas..."
3,83.149.9.216,17/May/2015:10:05:12 +0000,GET /presentations/logstash-monitorama-2013/pl...,200,7697,"""http://semicomplete.com/presentations/logstas..."
4,83.149.9.216,17/May/2015:10:05:07 +0000,GET /presentations/logstash-monitorama-2013/pl...,200,2892,"""http://semicomplete.com/presentations/logstas..."
5,83.149.9.216,17/May/2015:10:05:34 +0000,GET /presentations/logstash-monitorama-2013/im...,200,430406,"""http://semicomplete.com/presentations/logstas..."
6,83.149.9.216,17/May/2015:10:05:57 +0000,GET /presentations/logstash-monitorama-2013/cs...,200,38720,"""http://semicomplete.com/presentations/logstas..."
7,83.149.9.216,17/May/2015:10:05:50 +0000,GET /presentations/logstash-monitorama-2013/cs...,200,41820,"""http://semicomplete.com/presentations/logstas..."
8,83.149.9.216,17/May/2015:10:05:24 +0000,GET /presentations/logstash-monitorama-2013/im...,200,52878,"""http://semicomplete.com/presentations/logstas..."
9,83.149.9.216,17/May/2015:10:05:50 +0000,GET /presentations/logstash-monitorama-2013/im...,200,321631,"""http://semicomplete.com/presentations/logstas..."


## 404 requests

In [55]:
df = pd.read_sql_query(
    """
    SELECT request_received_time, client_ip, server_status
    FROM apache_logs
    WHERE server_status = '404'
    """, mock_hive_engine)
df.head(10)

Unnamed: 0,request_received_time,client_ip,server_status
0,17/May/2015:10:05:22 +0000,66.249.73.185,404
1,17/May/2015:11:05:05 +0000,208.91.156.11,404
2,17/May/2015:13:05:25 +0000,111.199.235.239,404
3,17/May/2015:13:05:32 +0000,111.199.235.239,404
4,17/May/2015:13:05:04 +0000,208.91.156.11,404
5,17/May/2015:13:05:28 +0000,144.76.194.187,404
6,17/May/2015:13:05:37 +0000,144.76.194.187,404
7,17/May/2015:15:05:00 +0000,208.91.156.11,404
8,17/May/2015:16:05:23 +0000,208.91.156.11,404
9,17/May/2015:16:05:50 +0000,74.208.180.23,404


## distributed files search

In [7]:
!head sample_data/single_log

0.0.0.0 - - [17/May/2015:10:05:03 +0000] "GET /test/request" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

In [8]:
df = pd.read_sql_query("SELECT * FROM apache_logs WHERE client_ip = '0.0.0.0'", mock_hive_engine)
df.head()

Unnamed: 0,client_ip,request_received_time,client_request,server_status,returned_obj_size,metadata
0,0.0.0.0,17/May/2015:10:05:03 +0000,GET /test/request,200,203023,"""http://semicomplete.com/presentations/logstas..."


# 2. Analyzing Wikipedia Clickstreams

Amazon Athena pozwala na scalanie wielu plików przchowywanych w buckecie S3 i traktowanie ich jako jednej tabeli w relacyjnej bazie daych oraz pozwala na wykonywanie na niej zapytań SQL.

In [27]:
!wget -O sample_data/7563832.tsv.gz https://figshare.com/ndownloader/files/7563832

--2022-01-08 12:38:51--  https://figshare.com/ndownloader/files/7563832
Resolving figshare.com (figshare.com)... 54.76.172.109, 52.210.36.187
Connecting to figshare.com (figshare.com)|54.76.172.109|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://s3-eu-west-1.amazonaws.com/pfigshare-u-files/7563832/2017_01_en_clickstream.tsv.gz?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIYCQYOYV5JSSROOA/20220108/eu-west-1/s3/aws4_request&X-Amz-Date=20220108T113851Z&X-Amz-Expires=10&X-Amz-SignedHeaders=host&X-Amz-Signature=31c15907f4f08b0017ccfafe6ec141c52675ceeded44774cc51528774398b6af [following]
--2022-01-08 12:38:51--  https://s3-eu-west-1.amazonaws.com/pfigshare-u-files/7563832/2017_01_en_clickstream.tsv.gz?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIYCQYOYV5JSSROOA/20220108/eu-west-1/s3/aws4_request&X-Amz-Date=20220108T113851Z&X-Amz-Expires=10&X-Amz-SignedHeaders=host&X-Amz-Signature=31c15907f4f08b0017ccfafe6ec141c52675ceeded44774cc515287

In [28]:
!gzip -d sample_data/7563832.tsv.gz

This project contains data sets containing counts of (**referer**, **resource**) pairs extracted from the request logs of Wikipedia. A referer is an HTTP header field that identifies the address of the webpage that linked to the resource being requested. The data shows how people get to a Wikipedia article and what links they click on. In other words, it gives a weighted network of articles, where each edge weight corresponds to how often people navigate from one page to another.

## Format

The current data includes the following 4 fields:

* prev: the result of mapping the referrer URL to the fixed set of values described above
* curr: the title of the article the client requested
* type: describes (prev, curr)
    * link: if the referrer and request are both articles and the referrer links to the request
    * external: if the referrer host is not en(.m)?.wikipedia.org
    * other: if the referrer and request are both articles but the referrer does not link to the request. This can happen when clients search or spoof their refer.
* n: the number of occurrences of the (referrer, resource) pair

In [29]:
!head -n 1000000 sample_data/7563832.tsv | tail

other-search	Monica_Attard	external	171
other-empty	Monica_Attard	external	104
Jacques-Louis_David	Collège_des_Quatre-Nations	link	33
other-empty	Collège_des_Quatre-Nations	external	193
Louis_Le_Vau	Collège_des_Quatre-Nations	link	21
Adrien-Marie_Legendre	Collège_des_Quatre-Nations	link	12
Paris	Collège_des_Quatre-Nations	link	49
other-search	Collège_des_Quatre-Nations	external	206
other-internal	Collège_des_Quatre-Nations	external	39
Charles-Augustin_de_Coulomb	Collège_des_Quatre-Nations	link	32


## simulating data lake

In [15]:
!mkdir data_lake
!head -n 1000000 sample_data/7563832.tsv | tail -n +2 > sample_data/7563832_chunk.tsv
!split -l 10000 sample_data/7563832_chunk.tsv data_lake/chunk
!ls data_lake

chunkaa chunkak chunkau chunkbe chunkbo chunkby chunkci chunkcs chunkdc chunkdm
chunkab chunkal chunkav chunkbf chunkbp chunkbz chunkcj chunkct chunkdd chunkdn
chunkac chunkam chunkaw chunkbg chunkbq chunkca chunkck chunkcu chunkde chunkdo
chunkad chunkan chunkax chunkbh chunkbr chunkcb chunkcl chunkcv chunkdf chunkdp
chunkae chunkao chunkay chunkbi chunkbs chunkcc chunkcm chunkcw chunkdg chunkdq
chunkaf chunkap chunkaz chunkbj chunkbt chunkcd chunkcn chunkcx chunkdh chunkdr
chunkag chunkaq chunkba chunkbk chunkbu chunkce chunkco chunkcy chunkdi chunkds
chunkah chunkar chunkbb chunkbl chunkbv chunkcf chunkcp chunkcz chunkdj chunkdt
chunkai chunkas chunkbc chunkbm chunkbw chunkcg chunkcq chunkda chunkdk chunkdu
chunkaj chunkat chunkbd chunkbn chunkbx chunkch chunkcr chunkdb chunkdl chunkdv


In [16]:
from pathlib import Path

for child in Path('./data_lake/').iterdir():
    if child.is_file():
        key = "wiki/" + child.name
        mock_s3_client.put_object(
                Bucket="mothena",
                Key=key,
                Body=child.read_text(),
        )

In [46]:
mock_hive_engine.execute("DROP TABLE IF EXISTS wikipedia")
mock_hive_engine.execute(
    """
    CREATE EXTERNAL TABLE wikipedia(
        prev string,
        curr string,
        type string,
        n int
    )
    ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY '\t'
    STORED AS TEXTFILE
    LOCATION
        's3a://mothena/wiki'
    """
)
mock_hive_engine.execute("MSCK REPAIR TABLE wikipedia")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x122792040>

In [48]:
df = pd.read_sql_query("SELECT * FROM wikipedia", mock_hive_engine)
df.head(10)

Unnamed: 0,prev,curr,type,n
0,Greek_language,Aristotle,link,59
1,Exaggeration,Aristotle,link,11
2,Euclid,Aristotle,link,39
3,Lucid_dream,Aristotle,link,36
4,Oceanography,Aristotle,link,19
5,Thespis,Aristotle,link,34
6,Animal_rights,Aristotle,link,20
7,History_of_economic_thought,Aristotle,link,33
8,Greek_words_for_love,Aristotle,link,14
9,Praxis_(process),Aristotle,link,19


Jak widzimy nna powyszym przykładzie udało się wczytać 100 plików przechowywanych w buckecie S3 do jednej tabelii i wykonać na nich podstawowe zapytanie w języku SQL.

# 3. Splunk-like search engine

Kolejnym przykładem zostosowania Amazon Atheny jest analiza logów aplikacji, mozliwe jest przeszukiwanie oraz filtrowanie logów podobne do funkcjonalności jakie daje nam wykorzystanie swerwisu Splunk.

Splunk Inc. that produces software for searching, monitoring, and analyzing machine-generated data via a Web-style interface.

Its software helps capture, index and correlate real-time data in a searchable repository, from which it can generate graphs, reports, alerts, dashboards and visualizations. Splunk uses machine data for identifying data patterns, providing metrics, diagnosing problems and providing intelligence for business operations. Splunk is a horizontal technology used for application management, security and compliance, as well as business and web analytics.

## splunk search example

![image.png](attachment:image.png)

## our infrastructre

![image.png](attachment:image.png)

1. Lambda pracuje we własnym środowisku. Logi zapisywane są za pomocą serwisu Cloud Watch
2. Uruchomiony rule zapisuje wszystkie logi pod wskazany adres s3
3. AWS Glue i Athena definiują schemat i budują data lake dla logów
4. Search API pozwala na wyszukiwanie wybranych faz w logach (np błędów)

Takie rozwiązanie jest natywnym chmurowym rozwiązaniem dla silnika logującego, po ktorym można łatwo wyszukiwać logów. 

In [73]:
!head sample_data/lambda_log

2021-12-06T16:35:23.243+01:00	INFO main - ApiFactory:47 - Endpoint configuration: EndpointConfig[name='abc', competitions=[e8648b99-xxxx-xxxx-xxxx-ca44f3372da6], props=https://test.api.market/adjusted?balanced=true&lineType=fraction, stats=https://test.api/results?dataConfirmed=all]	2021/12/06/[$LATEST]2e9e360e83e472ebbb383a01bd1b04a
2021-12-06T16:35:26.073+01:00	INFO parallel-1 - Connector:41 - Received heartbeat for fixture: c5ede0d0-93fe-xxxx-xxxx-33861c12884e	2021/12/06/[$LATEST]2e9e360e83ec472ebbb383a01bd1b04a
2021-12-06T16:35:23.285+01:00	INFO parallel-2 - HttpApiClient:58 - Http GET request from: abc, for endpoint: https://api/188	021/12/06/[$LATEST]e6b1cad446d047adac5e8b348340f802
2021-12-07T03:43:19.848+01:00	ERROR reactor-http-epoll-2 - Connector:46 - Request failed from: abc, reason: internal server error, for endpoint: https://api/props/alternates?2C112%2C113%2C104%2C106%2C114%2C108%2C109&apikey=test-f6a48a75c5d014071	2021/12/07/[$LATEST]4fdea933975a46e59fae1c77a2d4e87


In [74]:
lambda_file = open(
        "sample_data/lambda_log",
        "rb",
)
mock_s3_client.put_object(
        Bucket="mothena",
        Key="lambda/lambda_log",
        Body=lambda_file.read(),
)

{'ResponseMetadata': {'RequestId': 'B409IOB8MGCBOX1OZ6SKM3BFKIXA9BSTGHYRE1LKRG1O51KXWPXR',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'etag': '"a2df5624707ef5438e2fe7bcad0f187f"',
   'last-modified': 'Sat, 08 Jan 2022 13:15:15 GMT',
   'content-length': '0',
   'x-amzn-requestid': 'B409IOB8MGCBOX1OZ6SKM3BFKIXA9BSTGHYRE1LKRG1O51KXWPXR',
   'content-type': 'text/html; charset=utf-8',
   'access-control-allow-origin': '*',
   'server': 'Werkzeug/2.0.2 Python/3.7.12',
   'date': 'Sat, 08 Jan 2022 13:15:15 GMT'},
  'RetryAttempts': 0},
 'ETag': '"a2df5624707ef5438e2fe7bcad0f187f"'}

In [75]:
mock_hive_engine.execute("DROP TABLE IF EXISTS lambda_logs")
mock_hive_engine.execute(
    """
    CREATE EXTERNAL TABLE lambda_logs(
        time_stamp string,
        message string,
        log_stream_name string
    )
    ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY '\t'
    STORED AS TEXTFILE
    LOCATION
        's3a://mothena/lambda'
    """
)
mock_hive_engine.execute("MSCK REPAIR TABLE lambda_logs")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1449e02e0>

In [79]:
df = pd.read_sql_query("SELECT * FROM lambda_logs", mock_hive_engine)
df.head(10)

Unnamed: 0,time_stamp,message,log_stream_name
0,2021-12-06T16:35:19.097+01:00,WARNING: sun.reflect.Reflection.getCallerClass...,2021/12/06/[$LATEST]2e9e360e83ec472ebb383a01bd...
1,2021-12-06T16:35:19.105+01:00,WARNING: sun.reflect.Reflection.getCallerClass...,2021/12/06/[$LATEST]fe5e5f3e00a44dc7b54592fcf9...
2,2021-12-06T16:35:19.126+01:00,WARNING: sun.reflect.Reflection.getCallerClass...,2021/12/06/[$LATEST]03cb804820d24969abb28c0b72...
3,2021-12-06T16:35:19.140+01:00,WARNING: sun.reflect.Reflection.getCallerClass...,2021/12/06/[$LATEST]e6b1cad446d047dac5e8b34834...
4,2021-12-06T16:35:23.243+01:00,INFO main - ApiFactory:47 - Endpoint configura...,2021/12/06/[$LATEST]2e9e360e83e472ebbb383a01bd...
5,2021-12-06T16:35:26.073+01:00,INFO parallel-1 - Connector:41 - Received hear...,2021/12/06/[$LATEST]2e9e360e83ec472ebbb383a01b...
6,2021-12-06T16:35:23.285+01:00,INFO parallel-2 - HttpApiClient:58 - Http GET ...,021/12/06/[$LATEST]e6b1cad446d047adac5e8b34834...
7,2021-12-07T03:43:19.848+01:00,ERROR reactor-http-epoll-2 - Connector:46 - Re...,2021/12/07/[$LATEST]4fdea933975a46e59fae1c77a2...
8,2021-12-06T16:35:32.271+01:00,INFO parallel-2 - DynamoReader:58 - Dynamo get...,2021/12/06/[$LATEST]232c2897e7ef4250bead3c94c8...
9,2021-12-06T16:35:32.498+01:00,INFO parallel-2 - DynamoReader:58 - Dynamo get...,2021/12/06/[$LATEST]2e9e360e83ec472ebbb383a01b...


Teraz mamy dostęp do logów z  naszej aplikacji w postacji tabelii na której mozemy wykonywać zapytania SQL.
Przykładem zastosowania jest wyszukiwanie logów konkretnego typu jak np. ERROR, WARN, które następnie mogą zostać poddane dogłębnej analizie

## looking for ERRORS

In [80]:
df = pd.read_sql_query(
    """
    SELECT * FROM lambda_logs 
    WHERE message 
    LIKE '%%ERROR%%'
    """, mock_hive_engine)
df.head(10)

Unnamed: 0,time_stamp,message,log_stream_name
0,2021-12-07T03:43:19.848+01:00,ERROR reactor-http-epoll-2 - Connector:46 - Re...,2021/12/07/[$LATEST]4fdea933975a46e59fae1c77a2...


# teardown

In [59]:
#!docker-compose down

# Wnioski

Amazon Athena ma szerokie zastosowanie w środowisku chmurowym. 
Jej głównymi zaletami są:
* mozliwość scalania danych(zarówno nieustrukturyzowanych jak i ustrukturyzowanych) z wielu plików i traktowanie ich jako danych z jednej tabeli
* mozliwość wykonywania zapytań SQL bez konieczności wczytywania tych danych
* integracja z Amazon QuickSight w celu wizualizacji
* mozliwość integracji z AWS Glue (ETL and data discovery features)