diff --git a/analytics/app.py b/analytics/app.py index 0fa3a83..6cf9992 100644 --- a/analytics/app.py +++ b/analytics/app.py @@ -4,8 +4,14 @@ from flask import Flask, request, abort +from scripts.calc_behaviour import calc_behaviour +from scripts.calc_historical import calc_historical +from scripts.calc_location_and_user_agent import calc_location_and_user_agent +from scripts.calc_traffic import calc_traffic +from scripts.calc_votes import calc_votes from scripts.update_read_model import update_read_model from util.log_util import logger +from util.number_util import r app = Flask(__name__) @@ -14,7 +20,7 @@ @app.route("/") -def hello_world(): +def run_script(): token = request.headers.get('Authorization') if token != ANALYTICS_SECRET_TOKEN or ANALYTICS_SECRET_TOKEN is None: @@ -22,14 +28,46 @@ def hello_world(): start_time = time.time() + results = {} + try: update_read_model() except Exception as e: logger.error(f"Script update_read_model failed", {"error": e}) + try: + results["calc_traffic"] = calc_traffic() + except Exception as e: + logger.error(f"Script calc_traffic failed", {"error": e}) + + try: + results["calc_votes"] = calc_votes() + except Exception as e: + logger.error(f"Script calc_votes failed", {"error": e}) + + try: + results["calc_behaviour"] = calc_behaviour() + except Exception as e: + logger.error(f"Script calc_behaviour failed", {"error": e}) + + try: + results["calc_historical"] = calc_historical() + except Exception as e: + logger.error(f"Script calc_historical failed", {"error": e}) + + try: + results["calc_location_and_user_agent"] = calc_location_and_user_agent() + except Exception as e: + logger.error(f"Script calc_location_and_user_agent failed", {"error": e}) + + duration = r(time.time() - start_time) + data_size_in_gb = r(sum( + os.path.getsize(f"./data/{f}") for f in os.listdir("./data") if os.path.isfile(f"./data/{f}")) / 1024 / 1024) + logs_size_in_gb = r(sum( + os.path.getsize(f"./logs/{f}") for f in os.listdir("./logs") if os.path.isfile(f"./logs/{f}")) / 1024 / 1024) + logger.info("Script executed successfully!", + {"duration": duration, "data_size_in_gb": data_size_in_gb, "logs_size": logs_size_in_gb}) + logger.flush() - return { - "message": 'Script executed successfully!', - "time_taken": f"{time.time() - start_time:.2f}s" - } + return results diff --git a/analytics/scripts/calc_behaviour.py b/analytics/scripts/calc_behaviour.py new file mode 100644 index 0000000..d86634f --- /dev/null +++ b/analytics/scripts/calc_behaviour.py @@ -0,0 +1,39 @@ +import pandas as pd + +from util.log_util import logger + + +def calc_behaviour(): + # load page view data with columns 'user_id' and 'route' + df_page_views = pd.read_parquet("./data/fpp_page_views.parquet", columns=["route", "room_id"]) + + # amount of page views for each route + page_views = df_page_views.groupby("route", observed=False).size().to_dict() + + # load event data with column 'event' + df_events = pd.read_parquet("./data/fpp_events.parquet", columns=["event"]) + + # amount of events for each event + events = df_events.groupby("event", observed=False).size().to_dict() + + # load vote data with column 'room_name' + df_votes = pd.read_parquet("./data/fpp_votes.parquet", columns=["room_id"]) + + # load rooms data with columns 'id' and 'name' + df_rooms = pd.read_parquet("./data/fpp_rooms.parquet", columns=["id", "name"]) + + # join votes and rooms data on df_votes.'room_id' and df_rooms.'id' + df_votes = df_votes.join(df_rooms, on="room_id", how="left") + + # amount of votes for each room + votes = df_votes.groupby("name").size().to_dict() + + behaviour = { + "page_views": page_views, + "events": events, + "votes": votes + } + + logger.debug("Behaviour calculated", behaviour) + + return behaviour \ No newline at end of file diff --git a/analytics/scripts/calc_historical.py b/analytics/scripts/calc_historical.py new file mode 100644 index 0000000..de599a7 --- /dev/null +++ b/analytics/scripts/calc_historical.py @@ -0,0 +1,46 @@ +import pandas as pd + +from util.log_util import logger + + +def calc_historical(): + # load user data with columns 'created_at' + df_users = pd.read_parquet("./data/fpp_users.parquet", columns=["created_at"]) + + # load page view data with columns 'viewed_at' + df_page_views = pd.read_parquet("./data/fpp_page_views.parquet", columns=["viewed_at"]) + + # load estimation data with columns 'estimated_at' + df_estimations = pd.read_parquet("./data/fpp_estimations.parquet", columns=["estimated_at"]) + + # create a list of dates from 19th of January 2024 until today + start_date = pd.to_datetime("2024-01-19") + end_date = pd.to_datetime("today") + date_range = pd.date_range(start_date, end_date) + + # create a list of dicts with the date and the amount of users, page views and estimations per date + historical = [] + for date in date_range: + # find the amount of users created on this date + users = len(df_users[df_users["created_at"].dt.date == date.date()]) + + # find the amount of page views on this date + page_views = len(df_page_views[df_page_views["viewed_at"].dt.date == date.date()]) + + # find the amount of estimations on this date + estimations = len(df_estimations[df_estimations["estimated_at"].dt.date == date.date()]) + + # parse the date to an iso date string (yyyy-mm-dd) + date = date.date().isoformat() + + # add the date and the amount of users, page views and estimations to the list + historical.append({ + "date": date, + "users": users, + "page_views": page_views, + "estimations": estimations + }) + + logger.debug("Historical calculated", historical) + + return historical diff --git a/analytics/scripts/calc_location_and_user_agent.py b/analytics/scripts/calc_location_and_user_agent.py new file mode 100644 index 0000000..1a8bf06 --- /dev/null +++ b/analytics/scripts/calc_location_and_user_agent.py @@ -0,0 +1,34 @@ +import pandas as pd + +from util.log_util import logger + + +def calc_location_and_user_agent(): + # load user data with columns 'user_id' and 'location' + df_users = pd.read_parquet("./data/fpp_users.parquet", + columns=["device", "os", "browser", "country", "region", "city"]) + + device = df_users.groupby("device", observed=False).size().to_dict() + + os = df_users.groupby("os", observed=False).size().to_dict() + + browser = df_users.groupby("browser", observed=False).size().to_dict() + + country = df_users.groupby("country", observed=False).size().to_dict() + + region = df_users.groupby("region", observed=False).size().to_dict() + + city = df_users.groupby("city", observed=False).size().to_dict() + + location_and_user_agent = { + "device": device, + "os": os, + "browser": browser, + "country": country, + "region": region, + "city": city + } + + logger.debug("Location and user agent calculated", location_and_user_agent) + + return location_and_user_agent diff --git a/analytics/scripts/calc_traffic.py b/analytics/scripts/calc_traffic.py new file mode 100644 index 0000000..652636b --- /dev/null +++ b/analytics/scripts/calc_traffic.py @@ -0,0 +1,73 @@ +import pandas as pd + +from util.log_util import logger +from util.number_util import r + + +def calc_traffic(): + # load page view data with columns 'user_id' and 'viewed_at' + df_page_views = pd.read_parquet("./data/fpp_page_views.parquet", columns=["user_id", "viewed_at"]) + # rename 'viewed_at' to 'activity_at' + df_page_views.rename(columns={"viewed_at": "activity_at"}, inplace=True) + + # count unique users + unique_users = df_page_views["user_id"].nunique() + + # count total page views + page_views = len(df_page_views) + + # BOUNCE RATE + # load estimation data with columns 'user_id' and 'estimated_at' + df_estimations = pd.read_parquet("./data/fpp_estimations.parquet", columns=["user_id", "estimated_at"]) + # rename 'viewed_at' to 'activity_at' + df_estimations.rename(columns={"estimated_at": "activity_at"}, inplace=True) + + # calculate bounce rate by counting unique users who did not estimate and dividing by unique users + bounce_rate = r(1 - (df_estimations["user_id"].nunique() / unique_users)) + + # DURATION + + # Step 1: Join data + df_joined = pd.concat([df_page_views, df_estimations]) + + # Step 2: Sort data + df_joined.sort_values(by=["user_id", "activity_at"], inplace=True) + + # Step 3: Reset index + df_joined.reset_index(inplace=True, drop=True) + + # Step 4: Calculate time difference between each row and the previous row + df_joined["time_diff"] = df_joined["activity_at"].diff() + + # Step 5: Calculate if we need to start a new session + # A new session starts if the time difference is more than 10 minutes or if the user_id is different + df_joined["new_session"] = (df_joined["time_diff"] > pd.Timedelta(minutes=10)) | \ + (df_joined["user_id"] != df_joined["user_id"].shift(1)) + + # Step 6: Calculate session number + df_joined["session"] = df_joined["new_session"].cumsum() + + # Step 7: Calculate duration of each session + df_joined['session_start'] = df_joined.groupby('session')['activity_at'].transform('min') + df_joined['session_end'] = df_joined.groupby('session')['activity_at'].transform('max') + df_joined['session_duration'] = df_joined['session_end'] - df_joined['session_start'] + + # Step 8: Add 10 seconds to the duration of each session + df_joined["adjusted_duration"] = df_joined["session_duration"] + pd.Timedelta(seconds=10) + + # Step 9: Ensure each session is counted once + unique_sessions = df_joined.drop_duplicates(subset='session') + + # Step 10: Calculate average duration of each session in minutes + average_duration = r(unique_sessions['adjusted_duration'].mean().total_seconds() / 60) + + traffic = { + "unique_users": unique_users, + "page_views": page_views, + "bounce_rate": bounce_rate, + "average_duration": average_duration + } + + logger.debug("Traffic calculated", traffic) + + return traffic diff --git a/analytics/scripts/calc_votes.py b/analytics/scripts/calc_votes.py new file mode 100644 index 0000000..8cd146a --- /dev/null +++ b/analytics/scripts/calc_votes.py @@ -0,0 +1,50 @@ +import pandas as pd + +from util.log_util import logger +from util.number_util import r + + +def calc_votes(): + # load votes data + df_votes = pd.read_parquet("./data/fpp_votes.parquet", + columns=["avg_estimation", "max_estimation", "min_estimation", "amount_of_estimations", + "amount_of_spectators", "duration"]) + + # count total votes + total_votes = len(df_votes) + + # count total estimations + total_estimations = int(df_votes["amount_of_estimations"].sum()) + + # avg amount of estimations per vote + avg_estimations_per_vote = r(total_estimations / total_votes) + + # avg amount of spectators per vote + avg_spectators_per_vote = r(df_votes["amount_of_spectators"].mean()) + + # avg duration per vote + avg_duration_per_vote = r(df_votes["duration"].mean()) + + # avg estimation + avg_estimation = r(df_votes["avg_estimation"].mean()) + + # avg min estimation + avg_min_estimation = r(df_votes["min_estimation"].mean()) + + # avg max estimation + avg_max_estimation = r(df_votes["max_estimation"].mean()) + + votes = { + "total_votes": total_votes, + "total_estimations": total_estimations, + "avg_estimations_per_vote": avg_estimations_per_vote, + "avg_spectators_per_vote": avg_spectators_per_vote, + "avg_duration_per_vote": avg_duration_per_vote, + "avg_estimation": avg_estimation, + "avg_min_estimation": avg_min_estimation, + "avg_max_estimation": avg_max_estimation + } + + logger.debug("Votes calculated", votes) + + return votes diff --git a/analytics/scripts/update_read_model.py b/analytics/scripts/update_read_model.py index 63e849a..a7456b7 100644 --- a/analytics/scripts/update_read_model.py +++ b/analytics/scripts/update_read_model.py @@ -68,8 +68,8 @@ def upsert_table(cursor, table_name, dtypes_def): if table_name != "fpp_users": df = pd.concat([df_mysql, df_parquet]) df.to_parquet(parquet_file) - - df_mysql.to_parquet(parquet_file) + else: + df_mysql.to_parquet(parquet_file) logger.info(f"Upserted records for in read model", { "table_name": table_name, diff --git a/analytics/util/log_util.py b/analytics/util/log_util.py index 1da19bb..704507d 100644 --- a/analytics/util/log_util.py +++ b/analytics/util/log_util.py @@ -45,11 +45,14 @@ def flush(self): log_filename = './logs/logs_' + time.strftime("%Y%m%d") + '.txt' # Creates file name like 'logs_20220101.txt' with open(log_filename, 'a') as log_file: for log in self.logs: - log_line = f"[{log['timestamp']}][{log['level']}]: {log['msg']}" - if log['properties']: - log_line += f" - {json.dumps(log['properties'])}" - print(log_line) # Print to console - log_file.write(log_line + '\n') # Write to file + try: + log_line = f"[{log['timestamp']}][{log['level']}]: {log['msg']}" + if log['properties']: + log_line += f" - {json.dumps(log['properties'])}" + print(log_line) # Print to console + log_file.write(log_line + '\n') # Write to file + except Exception as e: + print(f"Failed to write log to file: {e}") self.logs = [] # Clear logs in memory delete_old_logs(self) diff --git a/analytics/util/number_util.py b/analytics/util/number_util.py new file mode 100644 index 0000000..8731b32 --- /dev/null +++ b/analytics/util/number_util.py @@ -0,0 +1,2 @@ +def r(number): + return float(round(number, 2))