## Define Required Variables
Since I am using Databricks Community Edition, I cannot use `dbutils.secrets` to store secret key, or mount S3 bucket using `deutils.fs.mount`. Therefore, I have to declare the following variables explicitly.
- AWS_ACCESS_KEY
- AWS_SECRET_KEY
- BUCKET_NAME
- REGION

In [0]:
AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = ""
BUCKET_NAME = ""
REGION = ""

## Create Spark Instance And Test S3 Connection

In [0]:
spark.conf.set("fs.s3a.access.key", AWS_ACCESS_KEY)
spark.conf.set("fs.s3a.secret.key", AWS_SECRET_KEY)
# spark.conf.set("fs.s3a.endpoint", f"s3.{REGION}.amazonaws.com")

In [0]:
base_path = f"s3a://{BUCKET_NAME}/ucsd"
base_path_raw = f"{base_path}/raw"
base_path_cleaned = f"{base_path}/cleaned"
base_path_final_table = f"{base_path}/final_table"
base_path_final = f"{base_path}/final"

try:
    df = spark.read.csv(f"{base_path_raw}/2024Winter", header=True, inferSchema=True)
    df.show(3)
except Exception as e:
    print(f"Table read failed: {e}")

+-------------+--------------+--------+------+--------------------+---------+--------+-----+-----------+
|         time|subj_course_id|sec_code|sec_id|                prof|available|waitlist|total|enrolled_ct|
+-------------+--------------+--------+------+--------------------+---------+--------+-----+-----------+
|1704976213683|        AAS 10|     A01|303104|Butler; Elizabeth...|        1|       4|   34|         33|
|1704976213683|        AAS 10|     A02|303108|Butler; Elizabeth...|        1|       6|   34|         33|
|1704976214231|        AAS 11|     A01|303127|Butler; Elizabeth...|        1|       0|   34|         33|
+-------------+--------------+--------+------+--------------------+---------+--------+-----+-----------+
only showing top 3 rows



In [0]:
df.select('prof').show(truncate=False)

+----------------------------+
|prof                        |
+----------------------------+
|Butler; Elizabeth Annette   |
|Butler; Elizabeth Annette   |
|Butler; Elizabeth Annette   |
|Butler; Elizabeth Annette   |
|Wade; Jon P                 |
|Staff                       |
|Staff                       |
|Zak; Alexander & Wade; Jon P|
|Vespa; Emanuel Ignacio      |
|Myerston Santana; Jacobo    |
|Rao; Ramesh R               |
|Lovett-Barron; Matthew Rod  |
|Bronstein; Phoebe Malan     |
|Danks; David J              |
|Vespa; Emanuel Ignacio      |
|Theodorakis; Emmanuel       |
|Amir; On                    |
|Schmidt; Thomas Rainer      |
|Treichler; Emily Brockway   |
|Root; Cory Matthew          |
+----------------------------+
only showing top 20 rows



## Import Necessory Libs

In [0]:
import json
import os
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum, when, lit
from pyspark.sql.functions import sum, avg, max, min, count, countDistinct, first, last, mean, stddev, collect_list, collect_set, approx_count_distinct, expr
from pyspark.sql.functions import col, from_unixtime, to_timestamp, date_format, row_number
from pyspark.sql.functions import split




## Check Raw Data

#### Check and analyze the raw data
For this part, I strongluy recommend to reference my another data-analyze project, which cleaned and analyzed the UCSD registration data in detail: 

#### Define and store quarter information.
- `year_quarter`: list includes the year - quarter registration information that we want to clean and store.
- `passtag`: list includes the tag for each pass time, this will be the key of hte passtime. This will also be the horizontal axis of the final graph.
- `passtime`: actual registration date of each quarter, corresponds to the value in apsstag. 

Generate a json data file:
- key: `year-quarter`
- value: `passtag` : `passtime` for corresponding quarter.
- stored: `s3/ucsd/final/passtimes.json`

In [0]:
year_quarter = ["2024-Winter",
        "2024-Spring",
        "2024-Fall",
        "2025-Winter",
        "2025-Spring"]
passtag = [
    'Prior', \
    'First Pass Priorities & Seniors Start', 'First Pass Juniors Start', 'First Pass Sophmores Start', 'First Pass First-Year Start', \
    'Second Pass Priorities & Seniors Start', 'Second Pass Juniors Start', 'Second Pass Sophmores Start', 'Second Pass First-Year Start', \
    'Quarter Start', 'A Week After Quarter Start']
passtimes = [
  ['2023-11-13', '2023-11-14', '2023-11-16', '2023-11-17', '2023-11-18', \
  '2023-11-21', '2023-11-24', '2023-11-25', '2023-11-27', '2024-01-04', '2024-01-11'],

  ['2024-02-16', '2024-02-17', '2024-02-20', '2024-02-21', '2024-02-22', \
  '2024-02-26', '2024-02-28', '2024-02-29', '2024-03-01', '2024-03-27', '2024-04-04'],

  ['2024-05-23', '2024-05-24', '2024-05-27', '2024-05-28', '2024-05-29', \
  '2024-06-01', '2024-06-04', '2024-06-05', '2024-06-06', '2024-09-23', '2024-09-30'],

  ['2024-11-11', '2024-11-12', '2024-11-14', '2024-11-15', '2024-11-16', \
  '2024-11-19', '2024-11-21', '2024-11-22', '2024-11-23', '2025-01-02', '2025-01-18'],

  ['2025-02-14', '2025-02-15', '2025-02-18', '2025-02-19', '2025-02-20', \
  '2025-02-24', '2025-02-26', '2025-02-27', '2025-02-28', '2025-03-26', '2025-04-02'],
]

assert(len(passtag) == len(passtimes[0]))

result = {
  quarter: dict(zip(passtag, times))
  for quarter, times in zip(year_quarter, passtimes)
}

# json 最后的储存路径
output_path = f"{base_path_final}/passtimes.json"
# 生成json字符串
json_string = json.dumps(result, indent=2)
# 将json字符串写入指定文件
dbutils.fs.put(output_path, json_string, overwrite=True)
file_content = dbutils.fs.head(output_path)
print(file_content)

Wrote 2705 bytes.
{
  "2024-Winter": {
    "Prior": "2023-11-13",
    "First Pass Priorities & Seniors Start": "2023-11-14",
    "First Pass Juniors Start": "2023-11-16",
    "First Pass Sophmores Start": "2023-11-17",
    "First Pass First-Year Start": "2023-11-18",
    "Second Pass Priorities & Seniors Start": "2023-11-21",
    "Second Pass Juniors Start": "2023-11-24",
    "Second Pass Sophmores Start": "2023-11-25",
    "Second Pass First-Year Start": "2023-11-27",
    "Quarter Start": "2024-01-04",
    "A Week After Quarter Start": "2024-01-11"
  },
  "2024-Spring": {
    "Prior": "2024-02-16",
    "First Pass Priorities & Seniors Start": "2024-02-17",
    "First Pass Juniors Start": "2024-02-20",
    "First Pass Sophmores Start": "2024-02-21",
    "First Pass First-Year Start": "2024-02-22",
    "Second Pass Priorities & Seniors Start": "2024-02-26",
    "Second Pass Juniors Start": "2024-02-28",
    "Second Pass Sophmores Start": "2024-02-29",
    "Second Pass First-Year Start":

In [0]:
# 把 pastimes 转为csv储存到S3
# 转置dataframe
df = pd.DataFrame(result)

# Transpose the DataFrame
df_transposed = df.T

# Reset index to make 'year_quarter' a column
df_transposed = df_transposed.reset_index()

# Rename the 'index' column to 'year_quarter'
df_transposed = df_transposed.rename(columns={'index': 'year_quarter'})

# Display the transformed DataFrame
print("Transformed DataFrame:")
print(df_transposed.head(3))

# Melt the DataFrame to have 'year_quarter', 'passtag', and 'pass_time' as columns
df_melted = df_transposed.melt(id_vars=['year_quarter'], var_name='passtag', value_name='pass_time')

# Display the melted DataFrame
print("\nMelted DataFrame:")
print(df_melted.head(20))

# 将df存储到S3
passtimes_path = f"{base_path_final_table}/passtimes/"
columns = ["year_quarter", "passtag", "pass_time"]
df_melted_spark = spark.createDataFrame(df_melted, columns).withColumnRenamed("pass_time", "passtime")
df_melted_spark = df_melted_spark.withColumn(
        "year",
        split(col("year_quarter"), "-").getItem(0)
    ).withColumn(
        "quarter",
        split(col("year_quarter"), "-").getItem(1)
    ).drop("year_quarter")

df_melted_spark.coalesce(1).write.csv(passtimes_path, header=True, mode="overwrite")


Transformed DataFrame:
  year_quarter       Prior First Pass Priorities & Seniors Start  \
0  2024-Winter  2023-11-13                            2023-11-14   
1  2024-Spring  2024-02-16                            2024-02-17   
2    2024-Fall  2024-05-23                            2024-05-24   

  First Pass Juniors Start First Pass Sophmores Start  \
0               2023-11-16                 2023-11-17   
1               2024-02-20                 2024-02-21   
2               2024-05-27                 2024-05-28   

  First Pass First-Year Start Second Pass Priorities & Seniors Start  \
0                  2023-11-18                             2023-11-21   
1                  2024-02-22                             2024-02-26   
2                  2024-05-29                             2024-06-01   

  Second Pass Juniors Start Second Pass Sophmores Start  \
0                2023-11-24                  2023-11-25   
1                2024-02-28                  2024-02-29   
2        

## Data Cleaning

#### Data cleaning process
- Read raw data for each year-quarter
- remove null values
- translate unix timestamp to readable time, which include date
- uniform the professor name for each course-section
  - ucsd may change the course professor during the registration, so we only take the final professor.
- aggerate the registration value (enrolled, waitlist, etc.) for each course-professor-date. We only need the data for each day.
  - enrolled: take the earlist value of the day
  - waitlist: take the earlist value of the day
  - total: take the earlist value of the day
- aggerate each section. Each course taught by each professor may have multiple sections. It is meaningless to keep multiple sections, so aggregate them.
  - total: sum
  - enrolled: sum
  - waitlist: sum
- clean the `total` value, only keep the resonable value
  - total is a special value in raw data, which include the following values:
    - 5 ~ 500 (estimate): this is a resonable value, which indicates the course size.
    - 9999: abnormal value, which means the course size has not yet been determined and can be divided into following situations:
      - the course size was determined to be a reasonable value later, and we will use this reasonable value as `total` (max value between 5 ~ 1000)
      - the course data remains unchanged at 9999. However, for some courses, although their total remains at 9999, their course enrolled data shows reasonable changes (increasing over time). Therefore, for courses with total=9999, their total is marked as -1 and handed over to the backend for processing.
- Add new columns:
  - `year`: year of the course
  - `quarter`: quarter of the course
  - `prof_first_name`: professor first name
  - `prof_last_name`: professor last name
  - `prof_middle_name`: can be null
  - `department`: like: `CSE`, `ASS`
  - `course_id`: like : `120`, `10`
- remove columns `available`, `subj_course_id`, `prof` 
  - Due to the abnormal value of `total` column, `available` is also untrustworthy, while it can be calculated using `total - enrolled - waitlisted`, it is better to remove it.

In [0]:
# 工具函数
# 计数每一列的null值
def count_null_values(df):
  null_counts = df.select([
      sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns
  ])
  return null_counts

# 出区每一列的Null值
def remove_null_values(df):
  df_without_null = df.na.drop()
  return df_without_null

# 将time列从UNIX时间戳转为可读时间
def transfer_timestamp(df):
  df_converted = df.withColumn("readable_time", (col("time")/1000).cast("timestamp"))
  df_converted = df_converted.withColumn("date", date_format(col("readable_time"), "yyyy-MM-dd"))
  return df_converted

# 消去不合理值，但是不管total的9999和0，因为数据库会变
def remove_weird_values(df):
  # df = df.filter(col("available") < 1000)
  return df

# 消去 prof=Staff 的值
# 对于每一门course+secid的组合，UCSD 可能不会在一开始指定教授，而是用Staff代替，
# 或者教授人选在中途发生改变，
# 在两种情况下，对于 subj_course_id + sec_id 的组合，prof列都发生了变化，即不统一
# 因此，对于所有的subj_course_id + sec_id组，以time排序，将prof改为最后一位教授的值
def uniform_prof(df):
  window_spec = Window.partitionBy("subj_course_id", "sec_id").orderBy("time") \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

  df_with_last_prof = df.withColumn("latest_prof", last("prof", ignorenulls=True).over(window_spec))
  df_result = df_with_last_prof.withColumn("prof", col("latest_prof")).drop("latest_prof")

  return df_result

# 聚合同一门课在每一天的数据
# 同一门课+同一个教授+同一个section判定为同一门课
# 删去 time，readable_time列
# 对于每一天的注册数据：取当天的起始数据（即当天 timestamp 最小的数据
# 然后：
# 聚合每一个section的课程：注册数据相加
# 最后，再次过滤
def aggerate_value(df):

  # 构造窗口
  window_spec = Window.partitionBy("subj_course_id", "sec_code", "sec_id", "prof", "date").orderBy(col("time").asc())
  # 添加行号
  df_temp = df.withColumn("row_number", row_number().over(window_spec))
  # 过滤，只取当天最早的数据
  df_temp = df_temp.filter(col("row_number") == 1).select(
    "subj_course_id", "prof", "date",
    "available", "total", "waitlist", "enrolled_ct"
  )

  # 聚合不同section
  df_temp = df_temp.groupBy("subj_course_id", "prof", "date").agg(
    sum("available").alias("available"),
    sum("total").alias("total"),
    sum("waitlist").alias("waitlist"),
    sum("enrolled_ct").alias("enrolled_ct"))


  # 设置窗口
  window_spec = Window.partitionBy("subj_course_id", "prof")

  # 计算每组中 total 在 [0, 998] 范围内的最大值
  # 如果有一个课程的total是-1，说明这个课程的total一直不正常，超出0~998
  # 但这个课程可能还是有被正常注册的（即enrolled_ct还在正常增长）
  # 需要后续处理时手动辨别过滤
  df_with_max = df_temp.withColumn(
      "max_total_valid",
      max(
          when(
            (col("total") >= 0) & (col("total") <= 998),
            col("total")
          ).otherwise(-1)
      ).over(window_spec)
  )

  # 替换 total 为该组最大值
  df_temp = df_with_max.withColumn("total", col("max_total_valid")).drop("max_total_valid")

  return df_temp

# 对于每一个 subj_course_id+prof 组合，检查其date是否包含passtime中的全部值
# 如果不包含，则删去 subj_course_id+prof 组合
def remove_data_not_in_passtime(df, passtime):

  # 将需要检查的日期放入set，以供检查
  required_dates = set(passtime)
  required_dates_count = len(required_dates)

  # 使用F.array创建新列，可以直接在表中比较
  required_dates_lit = F.array([lit(d) for d in required_dates])

  # 搜集唯一日期
  grouped_df = df.groupBy("subj_course_id", "prof").agg(
    F.collect_set("date").alias("existing_dates")
  )

  # 过滤出符合要求的组合
  valid_groups = grouped_df.filter(
    F.size(F.array_intersect(F.col("existing_dates"), required_dates_lit)) == required_dates_count
  ).select("subj_course_id", "prof")

  # 仅保留符合要求的组合
  df = df.join(valid_groups, ["subj_course_id", "prof"], "inner")

  return df

def rename_split_delete_columns(df):
  df = df.withColumn("year", lit(year))
  df = df.withColumn("quarter", lit(quarter))
  df = df.withColumn("department", split("subj_course_id", " ")[0])
  df = df.withColumn("course_id", split("subj_course_id", " ")[1])
  df = df.drop("subj_course_id")
  df = df.drop("available")
  return df

# 整体清理函数
def clean_data(df, year, quarter, passtime):
  df = remove_null_values(df)
  df = transfer_timestamp(df)
  df = remove_weird_values(df)
  df = uniform_prof(df)
  df = aggerate_value(df)
  df = remove_data_not_in_passtime(df, passtime)
  df = rename_split_delete_columns(df)
  
  return df

#### Clean raw data for each quarter

In [0]:
for i in range(len(year_quarter)):
# for i in [0]:
  year = year_quarter[i].split('-')[0]
  quarter = year_quarter[i].split('-')[1]
  passtime = passtimes[i]
  
  path = f"{base_path_raw}/{year}{quarter}/"
  df = spark.read.csv(
    path,
    header=True,       # 告诉Spark CSV文件包含头部行
    inferSchema=True   # 告诉Spark自动推断列的数据类型
  )

  output_path = f"{base_path_cleaned}/{year}{quarter}/"
  df = clean_data(df, year, quarter, passtime)
  df.coalesce(1).write.csv(output_path, mode='overwrite', header=True)


#### Combine the cleaned data into a final dataset

In [0]:
# 整合output_path下之前存储在每个文件夹里的csv文件
# 整合为一个单独的csv文件（或者orc），储存到f"{base_path}/final"下
def consolidate_data_files(base_path: str, output_path: str, output_format: str = 'csv'):
    """
    扫描基础路径下的所有子目录，整合数据文件，并保存到新位置。

    :param base_path: 包含各年份季度子文件夹的根路径 (例如 './UCSD_output_csv/')。
    :param output_format: 输出格式，可以是 'csv' 或 'orc' (推荐)。
    """
    print("--- [Function 1: Consolidate Files] ---")

    # 1. 使用通配符(*)读取所有子目录下的数据文件
    #    这个路径会匹配 base_path/2023_Q1/, base_path/2023_Q4/ 等所有目录
    input_glob_path = os.path.join(base_path, "*", "")
    print(f"Reading data from path pattern: {input_glob_path}")

    # 假设源文件是CSV格式
    # inferSchema=True 在大数据集上可能较慢，生产环境建议手动定义Schema
    df = spark.read.csv(input_glob_path, header=True, inferSchema=True)

    # 可选：如果你想知道每个文件来自哪个源文件夹，可以添加一列
    # df = df.withColumn("source_directory", input_file_name())

    print(f"Successfully read and consolidated {df.count()} rows.")

    # 2. 定义最终输出路径
    final_output_path = os.path.join(output_path, "final")
    print(f"Preparing to write data to: {final_output_path}")

    # 3. 根据指定格式写入数据
    #    coalesce(1) 将所有分区合并为1个，确保输出为单个文件（在'final'目录下）
    if output_format.lower() == 'csv':
        df.coalesce(1).write.csv(final_output_path, mode='overwrite', header=True)
        print(f"Successfully wrote consolidated data as CSV.")
    elif output_format.lower() == 'orc':
        # ORC是列式存储，性能更优，自带Schema，不需要header选项
        df.coalesce(1).write.orc(final_output_path, mode='overwrite')
        print(f"Successfully wrote consolidated data as ORC.")
    else:
        print(f"Error: Unsupported output format '{output_format}'. Please choose 'csv' or 'orc'.")

    print("--- [Function 1: Finished] ---\n")
    return final_output_path

# 对最终数据库进行采样，约15%，以JSON格式储存，供前端使用
def sample_and_save_as_json(consolidated_data_path: str, base_path: str, input_format: str = 'csv'):
    """
    读取整合后的数据，进行随机采样，并将结果保存为JSON格式。

    :param consolidated_data_path: 整合后数据文件的完整路径 (例如 './UCSD_output_csv/final')。
    :param base_path: 用于创建 'json_sample' 目录的根路径。
    :param input_format: 输入文件的格式，'csv' 或 'orc'。
    """
    print("--- [Function 2: Sample and Save as JSON] ---")
    print(f"Reading consolidated data from: {consolidated_data_path}")

    # 1. 根据指定格式读取整合后的数据
    if input_format.lower() == 'csv':
        df = spark.read.csv(consolidated_data_path, header=True, inferSchema=True)
    elif input_format.lower() == 'orc':
        df = spark.read.orc(consolidated_data_path)
    else:
        print(f"Error: Unsupported input format '{input_format}'.")
        return

    # 2. 对 DataFrame 进行 15% 的不放回随机采样
    #    这是一个近似值，结果可能略有浮动
    fraction_to_sample = 0.15
    df_sampled = df.sample(withReplacement=False, fraction=fraction_to_sample)

    print(f"Original row count: {df.count()}, Sampled row count: {df_sampled.count()}")

    # 3. 定义JSON输出路径
    json_output_path = os.path.join(base_path, "json_sample")
    print(f"Preparing to write JSON sample to: {json_output_path}")

    # 4. 将采样后的数据以JSON格式写入
    #    同样使用 coalesce(1) 输出单个文件
    df_sampled.coalesce(1).write.json(json_output_path, mode='overwrite')

    print("Successfully wrote sampled data as JSON.")
    print("--- [Function 2: Finished] ---\n")

In [0]:
# 整合各季度的数据为一个单独的csv文件，存储到base_path_final
consolidate_file_path = consolidate_data_files(base_path_cleaned, base_path_final)

# 随机采样15%作为JSON
sample_and_save_as_json(consolidate_file_path, base_path_final)

--- [Function 1: Consolidate Files] ---
Reading data from path pattern: s3a://ucsd-registration-s3-20250609193613565500000001/ucsd/cleaned/*/
Successfully read and consolidated 236880 rows.
Preparing to write data to: s3a://ucsd-registration-s3-20250609193613565500000001/ucsd/final/final
Successfully wrote consolidated data as CSV.
--- [Function 1: Finished] ---

--- [Function 2: Sample and Save as JSON] ---
Reading consolidated data from: s3a://ucsd-registration-s3-20250609193613565500000001/ucsd/final/final
Original row count: 236880, Sampled row count: 35592
Preparing to write JSON sample to: s3a://ucsd-registration-s3-20250609193613565500000001/ucsd/final/json_sample
Successfully wrote sampled data as JSON.
--- [Function 2: Finished] ---



## Conculsion
Now we have the following cleaned data:
- Cleaned registration data for each quarter: {base_path}/cleaned/yearQuarter
- Cleaned combined registration data for all quarters: {base_path}/final/final/
- Passtime and tags for each quarter: {base_path}/final/passtimes.json