In [5]:
import os
import pickle
from datetime import date, timedelta
import importlib

from retry import retry
import pandas as pd
from getpass import getpass

from melitk.fda2 import inventory

from app.data.utils.sparksql import SparkSQL
from app.data.utils.load_query import load_format
from app.conf.settings import DEFAULT_PARAMS

# Access to the databases

In [None]:
melidata_user = ""
melidata_pass = getpass()
spark = SparkSQL(user=melidata_user, password=melidata_pass)

# Data

## Read

In [None]:
@retry(EOFError, tries = 5, delay = 60, backoff = 2)
def run_query(query):
    return spark.run_query(query)

In [None]:
logs_path = 'logs.txt'

In [None]:
#if os.path.exists(logs_path):
#    os.remove(logs_path)

In [None]:
#df = pd.DataFrame()
params = DEFAULT_PARAMS.copy()
filename = '../src/app/data/creatives/queries/performance_per_hour.sql'
date_start = str2date(DEFAULT_PARAMS['start_date'])
date_end = str2date(DEFAULT_PARAMS['end_date'])

In [None]:
dates = []
date_iter = date_start
while date_iter < date_end:
    for i in range(0, 24):
        dates.append(f"{date2str(date_iter)} {i:02d}")
    date_iter += timedelta(days=1)
dates = [f"{date2str(date_end)} 00"] + dates[::-1]

In [None]:
with open(logs_path, 'r') as file:
    temp = file.readlines()
    temp = [x.replace('\n', '') for x in temp]

In [None]:
dates = [x for x in dates if x not in temp]

In [None]:
dates.pop(0)

In [None]:
dates = [temp[-1]] + dates

In [None]:
dates[:5]

In [None]:
for i in range(1, len(dates) + 1):
    params.update({'start_date': dates[i], 'end_date': dates[i - 1]})
    query = load_format(filename, params)
    df_temp = run_query(query)
    df = pd.concat([df, df_temp])
    with open(logs_path, 'a') as file:
        file.write(dates[i] + '\n')

In [None]:
df.sort_values(['cday', 'chour', 'content_source', 'campaign_id', 'line_item_id', 'creative_id'], inplace = True)

## Processing

In [None]:
df = df[~pd.isnull(df['campaign_id'])]
df = df[~pd.isnull(df['line_item_id'])]
df = df[~pd.isnull(df['creative_id'])]

In [None]:
df['campaign_id'] = df['campaign_id'].astype(int)
df['line_item_id'] = df['line_item_id'].astype(int)
df['creative_id'] = df['creative_id'].astype(int)
#df['creative_id'] = df['creative_id'].map(lambda x: str(x)[:4]).astype(int)

In [None]:
df.shape

## Correct

In [None]:
my_fury_new_token = ""
inventory.init(token = f"Bearer {my_fury_new_token}")

In [None]:
artifact_id = 'f3de9ae8-ea64-4ed1-aae3-e3116575202c'
df_bytes = inventory.get(artifact_id = artifact_id).load_to_bytes()
df = pickle.loads(df_bytes)

In [None]:
df.head()

In [None]:
weird_case = df[df['line_item_id']==3669]['creative_id'].max()

In [None]:
df[df['creative_id'] == weird_case]

In [None]:
df['campaign_id'] = df['campaign_id'].astype(int)
df['line_item_id'] = df['line_item_id'].astype(int)
df['creative_id'] = df['creative_id'].map(lambda x: str(x)[:4]).astype(int)

In [None]:
columns = ['cday', 'chour', 'content_source', 'campaign_id', 'line_item_id', 'creative_id']
df = df.groupby(columns).sum().reset_index()

In [None]:
df.shape

## Save

In [None]:
df.to_csv('data.csv', index = False)

In [None]:
df = pd.read_csv('data.csv', low_memory = False)

In [None]:
my_fury_new_token = ""
inventory.init(token = f"Bearer {my_fury_new_token}")

In [None]:
# serialize
data_bytes = pickle.dumps(df)

# Create fda artifact
artifact_name = 'clicks_displays_per_day'
version = '1.0.5'
artifact = inventory.create_artifact(artifact_name, version = version, type_ = 'fda.Bytes')
artifact.save_from_bytes(data = data_bytes)