Skip to content

Commit

Permalink
feat(analytics): all scripts for analytics calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
jkrumm committed Jan 22, 2024
1 parent 70819fb commit d269027
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 12 deletions.
48 changes: 43 additions & 5 deletions analytics/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

from flask import Flask, request, abort

from scripts.calc_behaviour import calc_behaviour
from scripts.calc_historical import calc_historical
from scripts.calc_location_and_user_agent import calc_location_and_user_agent
from scripts.calc_traffic import calc_traffic
from scripts.calc_votes import calc_votes
from scripts.update_read_model import update_read_model
from util.log_util import logger
from util.number_util import r

app = Flask(__name__)

Expand All @@ -14,22 +20,54 @@


@app.route("/")
def hello_world():
def run_script():
token = request.headers.get('Authorization')

if token != ANALYTICS_SECRET_TOKEN or ANALYTICS_SECRET_TOKEN is None:
abort(401)

start_time = time.time()

results = {}

try:
update_read_model()
except Exception as e:
logger.error(f"Script update_read_model failed", {"error": e})

try:
results["calc_traffic"] = calc_traffic()
except Exception as e:
logger.error(f"Script calc_traffic failed", {"error": e})

try:
results["calc_votes"] = calc_votes()
except Exception as e:
logger.error(f"Script calc_votes failed", {"error": e})

try:
results["calc_behaviour"] = calc_behaviour()
except Exception as e:
logger.error(f"Script calc_behaviour failed", {"error": e})

try:
results["calc_historical"] = calc_historical()
except Exception as e:
logger.error(f"Script calc_historical failed", {"error": e})

try:
results["calc_location_and_user_agent"] = calc_location_and_user_agent()
except Exception as e:
logger.error(f"Script calc_location_and_user_agent failed", {"error": e})

duration = r(time.time() - start_time)
data_size_in_gb = r(sum(
os.path.getsize(f"./data/{f}") for f in os.listdir("./data") if os.path.isfile(f"./data/{f}")) / 1024 / 1024)
logs_size_in_gb = r(sum(
os.path.getsize(f"./logs/{f}") for f in os.listdir("./logs") if os.path.isfile(f"./logs/{f}")) / 1024 / 1024)
logger.info("Script executed successfully!",
{"duration": duration, "data_size_in_gb": data_size_in_gb, "logs_size": logs_size_in_gb})

logger.flush()

return {
"message": 'Script executed successfully!',
"time_taken": f"{time.time() - start_time:.2f}s"
}
return results
39 changes: 39 additions & 0 deletions analytics/scripts/calc_behaviour.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import pandas as pd

from util.log_util import logger


def calc_behaviour():
# load page view data with columns 'user_id' and 'route'
df_page_views = pd.read_parquet("./data/fpp_page_views.parquet", columns=["route", "room_id"])

# amount of page views for each route
page_views = df_page_views.groupby("route", observed=False).size().to_dict()

# load event data with column 'event'
df_events = pd.read_parquet("./data/fpp_events.parquet", columns=["event"])

# amount of events for each event
events = df_events.groupby("event", observed=False).size().to_dict()

# load vote data with column 'room_name'
df_votes = pd.read_parquet("./data/fpp_votes.parquet", columns=["room_id"])

# load rooms data with columns 'id' and 'name'
df_rooms = pd.read_parquet("./data/fpp_rooms.parquet", columns=["id", "name"])

# join votes and rooms data on df_votes.'room_id' and df_rooms.'id'
df_votes = df_votes.join(df_rooms, on="room_id", how="left")

# amount of votes for each room
votes = df_votes.groupby("name").size().to_dict()

behaviour = {
"page_views": page_views,
"events": events,
"votes": votes
}

logger.debug("Behaviour calculated", behaviour)

return behaviour
46 changes: 46 additions & 0 deletions analytics/scripts/calc_historical.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pandas as pd

from util.log_util import logger


def calc_historical():
# load user data with columns 'created_at'
df_users = pd.read_parquet("./data/fpp_users.parquet", columns=["created_at"])

# load page view data with columns 'viewed_at'
df_page_views = pd.read_parquet("./data/fpp_page_views.parquet", columns=["viewed_at"])

# load estimation data with columns 'estimated_at'
df_estimations = pd.read_parquet("./data/fpp_estimations.parquet", columns=["estimated_at"])

# create a list of dates from 19th of January 2024 until today
start_date = pd.to_datetime("2024-01-19")
end_date = pd.to_datetime("today")
date_range = pd.date_range(start_date, end_date)

# create a list of dicts with the date and the amount of users, page views and estimations per date
historical = []
for date in date_range:
# find the amount of users created on this date
users = len(df_users[df_users["created_at"].dt.date == date.date()])

# find the amount of page views on this date
page_views = len(df_page_views[df_page_views["viewed_at"].dt.date == date.date()])

# find the amount of estimations on this date
estimations = len(df_estimations[df_estimations["estimated_at"].dt.date == date.date()])

# parse the date to an iso date string (yyyy-mm-dd)
date = date.date().isoformat()

# add the date and the amount of users, page views and estimations to the list
historical.append({
"date": date,
"users": users,
"page_views": page_views,
"estimations": estimations
})

logger.debug("Historical calculated", historical)

return historical
34 changes: 34 additions & 0 deletions analytics/scripts/calc_location_and_user_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pandas as pd

from util.log_util import logger


def calc_location_and_user_agent():
# load user data with columns 'user_id' and 'location'
df_users = pd.read_parquet("./data/fpp_users.parquet",
columns=["device", "os", "browser", "country", "region", "city"])

device = df_users.groupby("device", observed=False).size().to_dict()

os = df_users.groupby("os", observed=False).size().to_dict()

browser = df_users.groupby("browser", observed=False).size().to_dict()

country = df_users.groupby("country", observed=False).size().to_dict()

region = df_users.groupby("region", observed=False).size().to_dict()

city = df_users.groupby("city", observed=False).size().to_dict()

location_and_user_agent = {
"device": device,
"os": os,
"browser": browser,
"country": country,
"region": region,
"city": city
}

logger.debug("Location and user agent calculated", location_and_user_agent)

return location_and_user_agent
73 changes: 73 additions & 0 deletions analytics/scripts/calc_traffic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import pandas as pd

from util.log_util import logger
from util.number_util import r


def calc_traffic():
# load page view data with columns 'user_id' and 'viewed_at'
df_page_views = pd.read_parquet("./data/fpp_page_views.parquet", columns=["user_id", "viewed_at"])
# rename 'viewed_at' to 'activity_at'
df_page_views.rename(columns={"viewed_at": "activity_at"}, inplace=True)

# count unique users
unique_users = df_page_views["user_id"].nunique()

# count total page views
page_views = len(df_page_views)

# BOUNCE RATE
# load estimation data with columns 'user_id' and 'estimated_at'
df_estimations = pd.read_parquet("./data/fpp_estimations.parquet", columns=["user_id", "estimated_at"])
# rename 'viewed_at' to 'activity_at'
df_estimations.rename(columns={"estimated_at": "activity_at"}, inplace=True)

# calculate bounce rate by counting unique users who did not estimate and dividing by unique users
bounce_rate = r(1 - (df_estimations["user_id"].nunique() / unique_users))

# DURATION

# Step 1: Join data
df_joined = pd.concat([df_page_views, df_estimations])

# Step 2: Sort data
df_joined.sort_values(by=["user_id", "activity_at"], inplace=True)

# Step 3: Reset index
df_joined.reset_index(inplace=True, drop=True)

# Step 4: Calculate time difference between each row and the previous row
df_joined["time_diff"] = df_joined["activity_at"].diff()

# Step 5: Calculate if we need to start a new session
# A new session starts if the time difference is more than 10 minutes or if the user_id is different
df_joined["new_session"] = (df_joined["time_diff"] > pd.Timedelta(minutes=10)) | \
(df_joined["user_id"] != df_joined["user_id"].shift(1))

# Step 6: Calculate session number
df_joined["session"] = df_joined["new_session"].cumsum()

# Step 7: Calculate duration of each session
df_joined['session_start'] = df_joined.groupby('session')['activity_at'].transform('min')
df_joined['session_end'] = df_joined.groupby('session')['activity_at'].transform('max')
df_joined['session_duration'] = df_joined['session_end'] - df_joined['session_start']

# Step 8: Add 10 seconds to the duration of each session
df_joined["adjusted_duration"] = df_joined["session_duration"] + pd.Timedelta(seconds=10)

# Step 9: Ensure each session is counted once
unique_sessions = df_joined.drop_duplicates(subset='session')

# Step 10: Calculate average duration of each session in minutes
average_duration = r(unique_sessions['adjusted_duration'].mean().total_seconds() / 60)

traffic = {
"unique_users": unique_users,
"page_views": page_views,
"bounce_rate": bounce_rate,
"average_duration": average_duration
}

logger.debug("Traffic calculated", traffic)

return traffic
50 changes: 50 additions & 0 deletions analytics/scripts/calc_votes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pandas as pd

from util.log_util import logger
from util.number_util import r


def calc_votes():
# load votes data
df_votes = pd.read_parquet("./data/fpp_votes.parquet",
columns=["avg_estimation", "max_estimation", "min_estimation", "amount_of_estimations",
"amount_of_spectators", "duration"])

# count total votes
total_votes = len(df_votes)

# count total estimations
total_estimations = int(df_votes["amount_of_estimations"].sum())

# avg amount of estimations per vote
avg_estimations_per_vote = r(total_estimations / total_votes)

# avg amount of spectators per vote
avg_spectators_per_vote = r(df_votes["amount_of_spectators"].mean())

# avg duration per vote
avg_duration_per_vote = r(df_votes["duration"].mean())

# avg estimation
avg_estimation = r(df_votes["avg_estimation"].mean())

# avg min estimation
avg_min_estimation = r(df_votes["min_estimation"].mean())

# avg max estimation
avg_max_estimation = r(df_votes["max_estimation"].mean())

votes = {
"total_votes": total_votes,
"total_estimations": total_estimations,
"avg_estimations_per_vote": avg_estimations_per_vote,
"avg_spectators_per_vote": avg_spectators_per_vote,
"avg_duration_per_vote": avg_duration_per_vote,
"avg_estimation": avg_estimation,
"avg_min_estimation": avg_min_estimation,
"avg_max_estimation": avg_max_estimation
}

logger.debug("Votes calculated", votes)

return votes
4 changes: 2 additions & 2 deletions analytics/scripts/update_read_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ def upsert_table(cursor, table_name, dtypes_def):
if table_name != "fpp_users":
df = pd.concat([df_mysql, df_parquet])
df.to_parquet(parquet_file)

df_mysql.to_parquet(parquet_file)
else:
df_mysql.to_parquet(parquet_file)

logger.info(f"Upserted records for in read model", {
"table_name": table_name,
Expand Down
13 changes: 8 additions & 5 deletions analytics/util/log_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,14 @@ def flush(self):
log_filename = './logs/logs_' + time.strftime("%Y%m%d") + '.txt' # Creates file name like 'logs_20220101.txt'
with open(log_filename, 'a') as log_file:
for log in self.logs:
log_line = f"[{log['timestamp']}][{log['level']}]: {log['msg']}"
if log['properties']:
log_line += f" - {json.dumps(log['properties'])}"
print(log_line) # Print to console
log_file.write(log_line + '\n') # Write to file
try:
log_line = f"[{log['timestamp']}][{log['level']}]: {log['msg']}"
if log['properties']:
log_line += f" - {json.dumps(log['properties'])}"
print(log_line) # Print to console
log_file.write(log_line + '\n') # Write to file
except Exception as e:
print(f"Failed to write log to file: {e}")
self.logs = [] # Clear logs in memory
delete_old_logs(self)

Expand Down
2 changes: 2 additions & 0 deletions analytics/util/number_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def r(number):
return float(round(number, 2))

0 comments on commit d269027

Please sign in to comment.