Skip to content

Commit

Permalink
feat(analytics): use env config for data dir
Browse files Browse the repository at this point in the history
  • Loading branch information
jkrumm committed May 27, 2024
1 parent 39808bb commit 16af339
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 31 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ All "personal" data is stored only in the visitors local storage.
7. Run `doppler run -- npm run dev`

### Run analytics locally
1. Activate venv with `source venv/bin/activate`
1. Activate venv with `source .venv/bin/activate`
2. Install packages `python3 -m pip install -r requirements.txt`
3. Run Flask app in dev mode `doppler run -- flask run --debug`
10 changes: 6 additions & 4 deletions analytics/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from flask import Flask, request, abort
from flask_wtf.csrf import CSRFProtect

from config import ANALYTICS_SECRET_TOKEN, DATA_DIR
from room.calc_room_stats import calc_room_stats
from scripts.calc_behaviour import calc_behaviour
from scripts.calc_historical import calc_historical
Expand All @@ -19,7 +20,6 @@
csrf = CSRFProtect(app)

load_dotenv()
ANALYTICS_SECRET_TOKEN = os.getenv("ANALYTICS_SECRET_TOKEN")


@app.route("/")
Expand All @@ -31,7 +31,6 @@ def run_script():
logger.error("Unauthorized request", {"token": token})
abort(401)


results = {}
failed_reason = None

Expand Down Expand Up @@ -81,8 +80,11 @@ def run_script():
logger.flush()
return {"error": failed_reason, "duration": duration}

data_size_in_gb = r(sum(
os.path.getsize(f"./data/{f}") for f in os.listdir("./data") if os.path.isfile(f"./data/{f}")) / 1024 / 1024)
data_size_in_gb = round(sum(
os.path.getsize(os.path.join(DATA_DIR, f))
for f in os.listdir(DATA_DIR)
if os.path.isfile(os.path.join(DATA_DIR, f))
) / 1024 / 1024 / 1024, 2)
logs_size_in_gb = r(sum(
os.path.getsize(f"./logs/{f}") for f in os.listdir("./logs") if os.path.isfile(f"./logs/{f}")) / 1024 / 1024)
logger.info("Script executed successfully!",
Expand Down
4 changes: 4 additions & 0 deletions analytics/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import os

ANALYTICS_SECRET_TOKEN = os.getenv("ANALYTICS_SECRET_TOKEN")
DATA_DIR = os.getenv('DATA_DIR', './data')
5 changes: 4 additions & 1 deletion analytics/room/calc_room_stats.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os

import pandas as pd

from config import DATA_DIR
from scripts.update_read_model import update_votes_read_model
from util.log_util import logger
from util.number_util import r
Expand All @@ -13,7 +16,7 @@ def calc_room_stats(room_id):
logger.error("calc_room_stats -> update_votes_read_model failed", {"error": e})

# find votes for room and filter by room_id
votes = pd.read_parquet(f"./data/fpp_votes.parquet",
votes = pd.read_parquet(os.path.join(DATA_DIR, "fpp_votes.parquet"),
columns=["room_id", "avg_estimation", "max_estimation", "min_estimation",
"amount_of_estimations", "duration", "amount_of_spectators"],
filters=[("room_id", "==", int(room_id))])
Expand Down
11 changes: 7 additions & 4 deletions analytics/scripts/calc_behaviour.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
import os

import pandas as pd

from config import DATA_DIR
from util.log_util import logger


def calc_behaviour():
# load page view data with columns 'user_id' and 'route'
df_page_views = pd.read_parquet("./data/fpp_page_views.parquet", columns=["route", "room_id"])
df_page_views = pd.read_parquet(os.path.join(DATA_DIR, "fpp_page_views.parquet"), columns=["route", "room_id"])

# amount of page views for each route
routes = df_page_views.groupby("route", observed=False).size().to_dict()

# load event data with column 'event'
df_events = pd.read_parquet("./data/fpp_events.parquet", columns=["event"])
df_events = pd.read_parquet(os.path.join(DATA_DIR, "fpp_events.parquet"), columns=["event"])

# amount of events for each event
events = df_events.groupby("event", observed=False).size().to_dict()

# load vote data with column 'room_name'
df_votes = pd.read_parquet("./data/fpp_votes.parquet", columns=["room_id"])
df_votes = pd.read_parquet(os.path.join(DATA_DIR, "fpp_votes.parquet"), columns=["room_id"])

# load rooms data with columns 'id' and 'name'
df_rooms = pd.read_parquet("./data/fpp_rooms.parquet", columns=["id", "name"])
df_rooms = pd.read_parquet(os.path.join(DATA_DIR, "fpp_rooms.parquet"), columns=["id", "name"])

# join votes and rooms data on df_votes.'room_id' and df_rooms.'id'
df_votes = df_votes.join(df_rooms, on="room_id", how="left")
Expand Down
11 changes: 7 additions & 4 deletions analytics/scripts/calc_historical.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import os

import pandas as pd

from config import DATA_DIR
from util.log_util import logger


def calc_historical():
# load user data with columns 'created_at'
df_users = pd.read_parquet("./data/fpp_users.parquet", columns=["created_at"])
df_users = pd.read_parquet(os.path.join(DATA_DIR, "fpp_users.parquet"), columns=["created_at"])

# load page view data with columns 'viewed_at'
df_page_views = pd.read_parquet("./data/fpp_page_views.parquet", columns=["viewed_at"])
df_page_views = pd.read_parquet(os.path.join(DATA_DIR, "fpp_page_views.parquet"), columns=["viewed_at"])

# load estimation data with columns 'estimated_at'
df_estimations = pd.read_parquet("./data/fpp_estimations.parquet", columns=["estimated_at"])
df_estimations = pd.read_parquet(os.path.join(DATA_DIR, "fpp_estimations.parquet"), columns=["estimated_at"])

# load votes data with columns 'voted_at'
df_votes = pd.read_parquet("./data/fpp_votes.parquet", columns=["voted_at"])
df_votes = pd.read_parquet(os.path.join(DATA_DIR, "fpp_votes.parquet"), columns=["voted_at"])

# create a list of dates from 19th of January 2024 until today
start_date = pd.to_datetime("2024-01-19")
Expand Down
8 changes: 5 additions & 3 deletions analytics/scripts/calc_location_and_user_agent.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import os
import pandas as pd

from config import DATA_DIR
from util.log_util import logger


def calc_location_and_user_agent():
# load user data with columns 'user_id' and 'location'
df_users = pd.read_parquet("./data/fpp_users.parquet",
df_users = pd.read_parquet(os.path.join(DATA_DIR, "fpp_users.parquet"),
columns=["device", "os", "browser", "country", "region", "city"])

device = df_users.groupby("device", observed=False).size().to_dict()

os = df_users.groupby("os", observed=False).size().to_dict()
operating_system = df_users.groupby("os", observed=False).size().to_dict()

browser = df_users.groupby("browser", observed=False).size().to_dict()

Expand All @@ -26,7 +28,7 @@ def calc_location_and_user_agent():

location_and_user_agent = {
"device": device,
"os": os,
"os": operating_system,
"browser": browser,
"country": country,
"country_region": country_region,
Expand Down
8 changes: 6 additions & 2 deletions analytics/scripts/calc_traffic.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import os

import pandas as pd

from config import DATA_DIR
from util.log_util import logger
from util.number_util import r


def calc_traffic():
# load page view data with columns 'user_id' and 'viewed_at'
df_page_views = pd.read_parquet("./data/fpp_page_views.parquet", columns=["user_id", "viewed_at"])
df_page_views = pd.read_parquet(os.path.join(DATA_DIR, "fpp_page_views.parquet"), columns=["user_id", "viewed_at"])

# rename 'viewed_at' to 'activity_at'
df_page_views.rename(columns={"viewed_at": "activity_at"}, inplace=True)

Expand All @@ -18,7 +22,7 @@ def calc_traffic():

# BOUNCE RATE
# load estimation data with columns 'user_id' and 'estimated_at'
df_estimations = pd.read_parquet("./data/fpp_estimations.parquet", columns=["user_id", "estimated_at"])
df_estimations = pd.read_parquet(os.path.join(DATA_DIR, "fpp_estimations.parquet"), columns=["user_id", "estimated_at"])
# rename 'viewed_at' to 'activity_at'
df_estimations.rename(columns={"estimated_at": "activity_at"}, inplace=True)

Expand Down
5 changes: 4 additions & 1 deletion analytics/scripts/calc_votes.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import os

import pandas as pd

from config import DATA_DIR
from util.log_util import logger
from util.number_util import r


def calc_votes():
# load votes data
df_votes = pd.read_parquet("./data/fpp_votes.parquet",
df_votes = pd.read_parquet(os.path.join(DATA_DIR, "fpp_votes.parquet"),
columns=["avg_estimation", "max_estimation", "min_estimation", "amount_of_estimations",
"amount_of_spectators", "duration"])

Expand Down
15 changes: 4 additions & 11 deletions analytics/scripts/update_read_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import MySQLdb
import pandas as pd

from config import DATA_DIR
from util.log_util import logger


Expand Down Expand Up @@ -33,9 +34,9 @@ def upsert_table(cursor, table_name, dtypes_def):
upsert_users(cursor)
return

parquet_file = f"./data/{table_name}.parquet"
parquet_file = os.path.join(DATA_DIR, f"{table_name}.parquet")

# Load existing Parquet file (if exists)
# Load existing Parquet file (if it exists)
if os.path.isfile(parquet_file):
df_parquet = pd.read_parquet(parquet_file)
if not df_parquet.empty:
Expand Down Expand Up @@ -65,14 +66,6 @@ def upsert_table(cursor, table_name, dtypes_def):
if table_name == "fpp_votes":
df_mysql['was_auto_flip'] = df_mysql['was_auto_flip'].map({0: False, 1: True})

# Debug info
# logger.debug(df_mysql.head())
# logger.debug(df_mysql.dtypes)
# logger.debug({
# "df_parquet": len(df_parquet),
# "df_mysql": len(df_mysql)
# })

# Merge new data from MySQL with existing data in Parquet
df = pd.concat([df_mysql, df_parquet])
df.to_parquet(parquet_file)
Expand All @@ -84,7 +77,7 @@ def upsert_table(cursor, table_name, dtypes_def):


def upsert_users(cursor):
parquet_file = f"./data/fpp_users.parquet"
parquet_file = os.path.join(DATA_DIR, "fpp_users.parquet")

# Load existing Parquet file (if exists)
if os.path.isfile(parquet_file):
Expand Down

0 comments on commit 16af339

Please sign in to comment.