In [1]:
import os
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from decimal import Decimal
from s3fs import S3FileSystem
from datetime import datetime
from dotenv import load_dotenv

In [2]:
load_dotenv()

True


# 1. Create Bucket

In [3]:
session = boto3.Session(profile_name="mentha")
s3_client = session.client("s3")

In [4]:
# s3_client.list_buckets()["Buckets"]

# s3_client.create_bucket(Bucket="mentha-athena-datastore",
#                         ACL="public-read",
#                         CreateBucketConfiguration={
#                             "LocationConstraint": "ap-northeast-2"
#                         }
# )

s3_client.list_buckets()["Buckets"]

[{'Name': 'aws-glue-assets-305045122135-ap-northeast-2',
  'CreationDate': datetime.datetime(2022, 5, 19, 14, 23, 19, tzinfo=tzutc())},
 {'Name': 'mentha-athena-datastore',
  'CreationDate': datetime.datetime(2022, 10, 2, 5, 52, 13, tzinfo=tzutc())},
 {'Name': 'mentha-athena-query-result',
  'CreationDate': datetime.datetime(2022, 5, 22, 4, 57, 56, tzinfo=tzutc())},
 {'Name': 'mentha-datastore',
  'CreationDate': datetime.datetime(2022, 10, 2, 6, 33, tzinfo=tzutc())},
 {'Name': 'mentha-sample-datastore',
  'CreationDate': datetime.datetime(2022, 5, 22, 5, 40, 57, tzinfo=tzutc())}]

# 2. Convert csv(local) to parquet(s3)

## 2.1. Read csv

In [5]:
def decimal_from_value(value):
    if value == "null":
        return pd.NA
    else:
        return Decimal(value)

event_metadata = {
    "column_list": ["identity_adid", "os", "model", "country", "event_name", "log_id", "server_datetime", "quantity", "price"],
    "dtype_dict": {
        "identity_adid": "str",
        "os": "str",
        "model": "str",
        "country": "str",
        "event_name": "str",
        "log_id": "str",
        "quantity": "Int64",
        "server_datetime": "str"
    },
    "convert_dict": {
        "price": decimal_from_value
    },
}

attribution_metadata = {
    "column_list": ["partner", "campaign", "server_datetime", "tracker_id", "log_id", "attribution_type", "identity_adid"],
    "dtype_dict": {
        "partner": "str",
        "campaign": "str",
        "tracker_id": "str",
        "log_id": "str",
        "attribution_type": "Int64",
        "identity_adid": "str",
        "server_datetime": "str"
    }
}

In [6]:
df_event = pd.read_csv("data/event.csv",
                       names=event_metadata["column_list"],
                       dtype=event_metadata["dtype_dict"],
                       converters=event_metadata["convert_dict"]
)

In [7]:
df_attribution = pd.read_csv("data/attribution.csv",
                             names=attribution_metadata["column_list"],
                             dtype=attribution_metadata["dtype_dict"],
)

### server_datetime 에러 케이스 확인
- event 테이블에서 "0001-01-01 00:00:00.0"인 경우 Null로 대체하자

In [8]:
df_event["server_datetime"] = pd.to_datetime(df_event["server_datetime"], errors = "coerce")
df_attribution["server_datetime"] = pd.to_datetime(df_attribution["server_datetime"], errors = "coerce")

In [9]:
df_event["date"] = df_event["server_datetime"].dt.date
df_attribution["date"] = df_attribution["server_datetime"].dt.date

In [10]:
df_event.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 17966166 entries, 0 to 17966165
Data columns (total 10 columns):
 #   Column           Dtype         
---  ------           -----         
 0   identity_adid    object        
 1   os               object        
 2   model            object        
 3   country          object        
 4   event_name       object        
 5   log_id           object        
 6   server_datetime  datetime64[ns]
 7   quantity         Int64         
 8   price            object        
 9   date             object        
dtypes: Int64(1), datetime64[ns](1), object(8)
memory usage: 1.4+ GB


In [11]:
df_event.head(5)

Unnamed: 0,identity_adid,os,model,country,event_name,log_id,server_datetime,quantity,price,date
0,984549936,8.9,8.9,jp,abx:login,c21efdb8-b6e5-4ccc-a474-aff72a62c248,2018-05-18 12:23:15.303,,,2018-05-18
1,885033552,8.9,8.9,gb,abx:login,b4470f3b-4bb9-43ef-9248-25b503fa5660,2018-05-18 12:32:46.395,,,2018-05-18
2,768602461,7.1,7.1,ge,abx:firstopen,372dfecc-a27f-4a16-8e31-eccf34b8855f,2018-05-18 12:34:55.196,,,2018-05-18
3,1666798466,3.4,3.4,gb,abx:end_session,08730bdc-2895-4061-8399-f45df94d3fd0,2018-05-18 12:30:23.945,,,2018-05-18
4,683694696,7.1,7.1,kr,abx:start_session,a9556df7-f6ee-4600-af5b-89a44f18673c,2018-05-18 12:31:14.824,,,2018-05-18


In [12]:
df_attribution.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3760185 entries, 0 to 3760184
Data columns (total 8 columns):
 #   Column            Dtype         
---  ------            -----         
 0   partner           object        
 1   campaign          object        
 2   server_datetime   datetime64[ns]
 3   tracker_id        object        
 4   log_id            object        
 5   attribution_type  Int64         
 6   identity_adid     object        
 7   date              object        
dtypes: Int64(1), datetime64[ns](1), object(6)
memory usage: 233.1+ MB


In [13]:
df_attribution.head(5)

Unnamed: 0,partner,campaign,server_datetime,tracker_id,log_id,attribution_type,identity_adid,date
0,,,2018-05-03 07:19:24.813,,bdb8fc95-4f66-4d1d-8186-d10e86fe6433,0,764796223,2018-05-03
1,,,2018-05-03 10:25:11.034,,67c41325-a700-4f98-ad72-108025e9af8d,0,2126194985,2018-05-03
2,,,2018-05-03 10:26:08.081,,0e41af66-3f17-4bde-91db-806296209ad1,0,738518810,2018-05-03
3,,,2018-05-03 22:38:15.378,,a5f7ed1f-5d4e-4adf-96ce-e94f6820c2c2,0,595719449,2018-05-03
4,,,2018-05-03 04:14:55.453,,1e1aae33-282d-4dc2-9267-22fbd4ee2798,0,302402748,2018-05-03


## 2.2. Convert csv to parquet

In [15]:
event_table = pa.Table.from_pandas(df_event)
attribution_table = pa.Table.from_pandas(df_attribution)
# event_output_path = "s3://mentha-athena-datastore/sample/event"
# attribution_output_path = "s3://mentha-athena-datastore/sample/attribution"

In [16]:
pq.write_to_dataset(event_table,
                    root_path="data/event/",
                    partition_cols=["date"]
)

pq.write_to_dataset(attribution_table,
                    root_path="data/attribution/",
                    partition_cols=["date"]
)

In [None]:
# pq.write_table(event_table, "data/event.parquet")
# pq.write_table(attribution_table, "data/attribution.parquet")
# # https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_metadata.html

In [None]:
# pq.read_metadata("data/event.parquet")
# read_event_schema = pq.read_schema("data/event.parquet")
# read_event_schema

## 2.3. Upload parquet files to S3
- ref: https://stackoverflow.com/questions/25380774/upload-a-directory-to-s3-with-boto

In [17]:
def upload_folder_to_s3(s3bucket, inputDir, s3Path):
    print("Uploading results to s3 initiated...")
    print("Local Source:", inputDir)
    os.system("ls -ltR " + inputDir)

    print("Dest  S3path:", s3Path)

    try:
        for path, subdirs, files in os.walk(inputDir):
            for file in files:
                dest_path = path.replace(inputDir,"")
                __s3file = os.path.normpath(s3Path + '/' + dest_path + '/' + file)
                __local_file = os.path.join(path, file)
                print("upload : ", __local_file, " to Target: ", __s3file, end="")
                s3bucket.upload_file(__local_file, __s3file)
                print(" ...Success")
    except Exception as e:
        print(" ... Failed!! Quitting Upload!!")
        print(e)
        raise e

In [18]:
s3 = boto3.resource("s3")
s3bucket = s3.Bucket("mentha-athena-datastore")

In [None]:
upload_folder_to_s3(s3bucket, "data/attribution", "sample/attribution")
upload_folder_to_s3(s3bucket, "data/event", "sample/event")

In [None]:
# s3_client.upload_file(Filename="data/event.parquet",
#                       Bucket="mentha-athena-datastore",
#                       Key="sample/event/event.parquet"
# )

In [None]:
# s3_client.upload_file(Filename="data/attribution.parquet",
#                       Bucket="mentha-athena-datastore",
#                       Key="sample/attribution/attribution.parquet"
# )

In [None]:
# s3 = S3FileSystem()
pq.write_to_dataset(event_table, root_path=event_output_path, filesystem=s3)
# pq.write_to_dataset(attribution_table, root_path=attribution_output_path, filesystem=s3)

# 3. Athena

## 3.1. Create Athena tables

In [19]:
from pyathena import connect

In [20]:
cursor = connect(s3_staging_dir="s3://mentha-athena-query-result/sample",
                 region_name="ap-northeast-2",
                 schema_name="data_store"
).cursor()

In [21]:
create_event_table = """
CREATE EXTERNAL TABLE IF NOT EXISTS `data_store`.`event` (
  `identity_adid` string,
  `os` string,
  `model` string,
  `country` string,
  `event_name` string,
  `log_id` string,
  `server_datetime` timestamp,
  `quantity` int,
  `price` decimal(10, 1)
)
PARTITIONED BY (date date)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES ('serialization.format' = '1')
LOCATION 's3://mentha-athena-datastore/sample/event/'
TBLPROPERTIES ('has_encrypted_data' = 'false');
"""

create_event_partition = """
MSCK REPAIR TABLE `data_store`.`event`;
"""

In [22]:
create_attribution_table = """
CREATE EXTERNAL TABLE IF NOT EXISTS `data_store`.`attribution` (
  `partner` string,
  `campaign` string,
  `server_datetime` timestamp,
  `tracker_id` string,
  `log_id` string,
  `attribution_type` int,
  `identity_adid` string
)
PARTITIONED BY (date date)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES ('serialization.format' = '1')
LOCATION 's3://mentha-athena-datastore/sample/attribution/'
TBLPROPERTIES ('has_encrypted_data' = 'false');
"""

create_attribution_partition = """
MSCK REPAIR TABLE `data_store`.`attribution`;
"""

In [26]:
# cursor.execute("drop table event")
# cursor.execute("drop table attribution")

In [27]:
cursor.execute(create_event_table)
cursor.execute(create_attribution_table)

<pyathena.cursor.Cursor at 0x7fd368985580>

In [28]:
cursor.execute(create_event_partition)
cursor.execute(create_attribution_partition)

<pyathena.cursor.Cursor at 0x7fd368985580>

In [29]:
cursor.execute("show tables")
cursor.fetchall()

[('attribution',), ('event',), ('timestamp_error',), ('whether',)]

In [31]:
cursor.execute("select * from event limit 2")
cursor.fetchall()

[('825810027',
  '7.1',
  '7.1',
  'ge',
  'abx:firstopen',
  '4239bfd7-d073-4e57-b4bf-059e90a7fc0f',
  datetime.datetime(2018, 5, 22, 23, 55, 59, 325000),
  None,
  None,
  datetime.date(2018, 5, 22)),
 ('1530000398',
  '3.4',
  '3.4',
  'kr',
  'custom:battle',
  '7624f64e-bab3-46b2-a970-e89abfd36d3f',
  datetime.datetime(2018, 5, 22, 23, 56, 42, 933000),
  None,
  None,
  datetime.date(2018, 5, 22))]

In [32]:
cursor.execute("select * from attribution limit 2")
cursor.fetchall()

[('8MCosUQMik2Muvd-MU0lew',
  'campaign2',
  datetime.datetime(2018, 6, 13, 0, 1, 33, 733000),
  'LI7IlqzGk0uq3JdkfpKWZg',
  '60f2cad9-7572-492a-b25e-367b0cb7e704',
  1,
  '1360564381',
  datetime.date(2018, 6, 13)),
 ('8MCosUQMik2Muvd-MU0lew',
  'campaign2',
  datetime.datetime(2018, 6, 13, 0, 1, 3, 894000),
  'LI7IlqzGk0uq3JdkfpKWZg',
  '450683da-5fe5-4c79-8984-1e59f2269e85',
  1,
  '536133847',
  datetime.date(2018, 6, 13))]