In [None]:
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
import json
from pyspark.sql.functions import col, year, avg, stddev, sum as _sum, max as _max
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import broadcast

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

s3_csv_path = "s3://rearc-data-quest-s3/bls-data/pr.data.0.Current"
s3_json_path = "s3://rearc-data-quest-s3/population-data/datausa_20250618143145.json"


schema_csv = StructType([
    StructField("series_id", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("period", StringType(), True),
    StructField("value", DoubleType(), True),
    StructField("footnote_codes", StringType(), True)
])

df_bls = spark.read.option("delimiter", "\t").option("header", "false") \
    .schema(schema_csv).csv(s3_csv_path)

df_bls = df_bls.filter(~col("period").like("M%")) 


df_json_raw = spark.read.option("multiline", "true").json(s3_json_path)
df_pop = df_json_raw.selectExpr("explode(data) as row").select(
    col("row.ID Year").alias("year"),
    col("row.Population").alias("population")
).withColumn("year", col("year").cast("int")).withColumn("population", col("population").cast("long"))


df_pop_filtered = df_pop.filter((col("year") >= 2013) & (col("year") <= 2018))
df_pop_filtered.describe("population").show()

df_best = df_bls.groupBy("series_id", "year") \
    .agg(_sum("value").alias("year_sum"))

df_best_year = df_best.groupBy("series_id") \
    .agg(_max("year_sum").alias("max_sum")) \
    .join(df_best, on="series_id") \
    .filter(col("year_sum") == col("max_sum")) \
    .select("series_id", "year", "year_sum")

df_best_year.show()


target_series = "PRS30006032"
target_period = "Q01"

df_target = df_bls.filter((col("series_id") == target_series) & (col("period") == target_period)) \
    .select("series_id", "year", "period", "value")

df_joined = df_target.join(broadcast(df_pop), on="year", how="left")
df_joined.show()


print("Analysis complete.")
