Skip to content

Commit

Permalink
feat(analytics): python setup & read model sync & logger
Browse files Browse the repository at this point in the history
  • Loading branch information
jkrumm committed Jan 22, 2024
1 parent 68aeb0e commit 70819fb
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 0 deletions.
95 changes: 95 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,98 @@ yarn-error.log*

# Sentry Auth Token
.sentryclirc

### Python ###
*.parquet

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt
*logs.txt
*logs_*

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

#pdm.lock
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Environments
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
*idea/
35 changes: 35 additions & 0 deletions analytics/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
import time
from dotenv import load_dotenv

from flask import Flask, request, abort

from scripts.update_read_model import update_read_model
from util.log_util import logger

app = Flask(__name__)

load_dotenv()
ANALYTICS_SECRET_TOKEN = os.getenv("ANALYTICS_SECRET_TOKEN")


@app.route("/")
def hello_world():
token = request.headers.get('Authorization')

if token != ANALYTICS_SECRET_TOKEN or ANALYTICS_SECRET_TOKEN is None:
abort(401)

start_time = time.time()

try:
update_read_model()
except Exception as e:
logger.error(f"Script update_read_model failed", {"error": e})

logger.flush()

return {
"message": 'Script executed successfully!',
"time_taken": f"{time.time() - start_time:.2f}s"
}
Empty file added analytics/data/.gitkeep
Empty file.
Empty file added analytics/logs/.gitkeep
Empty file.
101 changes: 101 additions & 0 deletions analytics/scripts/update_read_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import os
import MySQLdb
import pandas as pd

from util.log_util import logger

connection = MySQLdb.connect(
host=os.getenv("DB_HOST"),
user=os.getenv("DB_USERNAME"),
passwd=os.getenv("DB_PASSWORD"),
db=os.getenv("DB_NAME"),
autocommit=True,
ssl_mode="VERIFY_IDENTITY",
ssl={
"ca": "/etc/ssl/cert.pem"
}
)


def upsert_table(cursor, table_name, dtypes_def):
parquet_file = f"./data/{table_name}.parquet"

# Load existing Parquet file (if exists)
if os.path.isfile(parquet_file):
df_parquet = pd.read_parquet(parquet_file)
if not df_parquet.empty:
last_id = df_parquet.index.max() # Assuming 'id' is the column with IDs
else:
last_id = 0
else:
df_parquet = pd.DataFrame()
last_id = 0

# TODO: improve user update logic
# Read new data from MySQL
if table_name != "fpp_users":
cursor.execute(f"SELECT * FROM {table_name} WHERE id > {last_id}")
else:
cursor.execute("SELECT * FROM fpp_users ORDER BY created_at DESC")

missing_records = cursor.fetchall()
missing_records_column_names = [i[0] for i in cursor.description]
df_mysql = pd.DataFrame(missing_records, columns=missing_records_column_names)

amount_of_new_records = len(df_mysql)

if amount_of_new_records == 0:
logger.debug(f"No new records for table", {"table_name": table_name})
return

if table_name != "fpp_users":
df_mysql.set_index('id', inplace=True)

df_mysql = df_mysql.astype(dtypes_def)

if table_name == "fpp_votes":
df_mysql['was_auto_flip'] = df_mysql['was_auto_flip'].map({0: False, 1: True})

# Debug info
# logger.debug(df_mysql.head())
# logger.debug(df_mysql.dtypes)
# logger.debug({
# "df_parquet": len(df_parquet),
# "df_mysql": len(df_mysql)
# })

# Merge new data from MySQL with existing data in Parquet
if table_name != "fpp_users":
df = pd.concat([df_mysql, df_parquet])
df.to_parquet(parquet_file)

df_mysql.to_parquet(parquet_file)

logger.info(f"Upserted records for in read model", {
"table_name": table_name,
"amount_of_new_records": amount_of_new_records
})


def update_read_model():
logger.debug("update_read_model called!")

# Create cursor and use it to execute SQL command
cursor = connection.cursor()
cursor.execute("select @@version")
version = cursor.fetchone()

if version:
logger.debug(f"Running version: ${version}")
else:
logger.debug('Not connected to db')

upsert_table(cursor, "fpp_estimations",
{'user_id': 'str', 'room_id': 'int16', 'estimation': 'int16', 'spectator': 'int16'})
upsert_table(cursor, "fpp_events", {'user_id': 'str', 'event': 'category'})
upsert_table(cursor, "fpp_page_views", {'user_id': 'str', 'route': 'category', 'room_id': 'Int16'})
upsert_table(cursor, "fpp_rooms", {'number': 'int16', 'name': 'str'})
upsert_table(cursor, "fpp_votes", {'room_id': 'int16', 'min_estimation': 'int16', 'max_estimation': 'int16',
'amount_of_estimations': 'int16', 'amount_of_spectators': 'int16',
'duration': 'int16'})
upsert_table(cursor, "fpp_users", {})
57 changes: 57 additions & 0 deletions analytics/util/log_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import glob
import json
import os
import time
import traceback

LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")


def delete_old_logs(self):
one_week_ago = time.time() - 7 * 24 * 60 * 60 # Time stamp one week ago
for log_file in glob.glob('./logs/logs_*.txt'):
if os.path.getmtime(log_file) < one_week_ago:
os.remove(log_file)


class LoggerClass:
def __init__(self):
self.logs = []

def debug(self, msg, properties=None):
if LOG_LEVEL == "DEBUG":
self._log('DEBUG', msg, properties)

def info(self, msg, properties=None):
self._log('INFO', msg, properties)

def warn(self, msg, properties=None):
self._log('WARN', msg, properties)

def error(self, msg, properties=None):
if properties and isinstance(properties.get("error"), BaseException):
properties["error"] = self._serialize_exception(properties["error"])
self._log('ERROR', msg, properties)

def _serialize_exception(self, e):
return "".join(traceback.format_exception(None, e, e.__traceback__))

def _log(self, level, msg, properties=None):
log_entry = {"level": level, "msg": msg, "properties": properties,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())}
self.logs.append(log_entry)

def flush(self):
log_filename = './logs/logs_' + time.strftime("%Y%m%d") + '.txt' # Creates file name like 'logs_20220101.txt'
with open(log_filename, 'a') as log_file:
for log in self.logs:
log_line = f"[{log['timestamp']}][{log['level']}]: {log['msg']}"
if log['properties']:
log_line += f" - {json.dumps(log['properties'])}"
print(log_line) # Print to console
log_file.write(log_line + '\n') # Write to file
self.logs = [] # Clear logs in memory
delete_old_logs(self)


logger = LoggerClass()

0 comments on commit 70819fb

Please sign in to comment.