In [None]:
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import desc, asc, max, sum, avg, col, round

In [None]:
spark = SparkSession.builder.appName("test").getOrCreate()

In [None]:
def last_n_days_df(df, n_days):
    start = datetime.today() - timedelta(days=n_days)
    return df.filter((df.year >= start.year) & ((df.month > start.month) | ((df.month == start.month) & (df.day >= start.day))))

In [None]:
def latest_week_df(df):
    max_date = df.select(max(df.date)).first()[0]
    return df.filter(df.date == max_date)

In [None]:
def filter_by_thresholds(df, min_points = 30, min_commits = 1):
    above_points = df.filter(df.points >= min_points).select(["email", "points"]).sort(desc("points"))
    below_points = df.filter(df.points < min_points).select(["email", "points"]).sort(desc("points"))
    above_commits = df.filter(df.commits >= min_commits).select(["email", "commits"]).sort(desc("commits"))
    below_commits = df.filter(df.commits < min_commits).select(["email", "commits"]).sort(desc("commits"))

    return above_points, below_points, above_commits, below_commits

In [None]:
def print_latest_week_stats(df):
    latest = latest_week_df(df)
    above_points, below_points, above_commits, below_commits = filter_by_thresholds(latest)
    
    print("=== > 30 POINTS LAST WEEK === ")
    above_points.show(100, False)

    print("=== > 1 COMMIT LAST WEEK === ")
    above_commits.show(100, False)

    print("=== LOW POINTS LAST WEEK === ")
    below_points.sort(desc("points")).show(100, False)

    print("=== LOW COMMITS LAST WEEK === ")
    below_commits.sort(desc("points")).show(100, False)

    return latest

In [None]:
def print_last_n_days_stats(df, n_days):
    recent = last_n_days_df(df, n_days)
    columns = { 
        "points": round(col("avg(points)"), 2),
        "commits": round(col("avg(commits)"), 2) 
    }
    recent_avgs = recent.groupBy('email') \
                        .agg({"points": "avg", "commits": "avg"}) \
                        .withColumns(columns)

    above_points, below_points, above_commits, below_commits = filter_by_thresholds(recent_avgs)

    print(f"=== > 30 POINTS / WEEK LAST {n_days} DAYS ===")
    above_points.show(100, False)

    print(f"=== > 1 COMMIT / WEEK LAST {n_days} DAYS ===")
    above_commits.show(100, False)

    print(f"=== LOW POINTS LAST {n_days} DAYS ===")
    below_points.show(100, False)

    print(f"=== LOW COMMITS LAST {n_days} DAYS ===")
    below_commits.show(100, False)
    
    return recent_avgs.select(["email", "points", "commits"])

In [None]:
def averages(df):
    columns = {
        "points": round(col("avg(points)"), 2),
        "commits": round(col("avg(commits)"), 2)
    }
    return df.select(avg("points"), avg("commits")) \
             .withColumns(columns) \
             .select(["points", "commits"])

In [None]:
def totals_as_of_date(df, end_date):
    columns = { 
        "points": round(col("sum(points)"), 2),
        "commits": round(col("sum(commits)"), 2) 
    }
    is_earlier_year = df.year < end_date.year
    is_same_year_earlier_month = (df.year == end_date.year) & (df.month < end_date.month)
    is_same_year_month_earlier_day = (df.year == end_date.year) & (df.month == end_date.month) & (df.day <= end_date.day)
    return df.filter(is_earlier_year | is_same_year_earlier_month | is_same_year_month_earlier_day) \
             .groupBy('email') \
             .agg({ "points": "sum", "commits": "sum" }) \
             .withColumns(columns)
    # return df.filter((df.year <= end_date.year) & ((df.month <= end_date.month) | ((df.month == end_date.month) & (df.day >= end_date.day))))

In [None]:
df = spark.read.parquet("codetrack/data")

In [None]:
latest = print_latest_week_stats(df)

In [None]:
last_month = print_last_n_days_stats(df, 30)

In [None]:
# TODO: filter by who is attending
print("=== CLASS AVERAGES, LAST WEEK ===")
last_week = latest_week_df(df)
averages(last_week).show()

print("=== CLASS AVERAGES, LAST 30 DAYS ===")
last_30 = last_n_days_df(df, 30)
averages(last_30).show()

In [None]:
# Print total points earned up to a given date
# and save to a csv file
# Note that this is a sum of weekly scores which are updated at 12:00 midnight on Mondays
# If your given date is after Monday, the total will not reflect points earned after 11:59 pm on the last Sunday.
end_date = datetime(2024, 5, 1)
totals = totals_as_of_date(df, end_date)
totals.select(["email", "points", "commits"]).sort(asc("email")).show(10000, False)
totals.select(["email", "points", "commits"]).sort(asc("email")).write.csv('./temp.csv')