In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Rearc Data Analysis") \
    .getOrCreate()


### Load CSV


In [0]:
df_time_series = spark.read.option("header", True).csv("pr.data.0.Current.csv")
df_time_series.show(5)


### Load JSON

In [0]:
df_population = spark.read.option("multiline", True).json("population.json")
df_population.printSchema()


In [0]:
df_population_flat = df_population.selectExpr("explode(data) as data") \
                                   .select("data.Year", "data.Population")
df_population_flat = df_population_flat.withColumn("Year", df_population_flat["Year"].cast("int")) \
                                       .withColumn("Population", df_population_flat["Population"].cast("long"))
df_population_flat.show(5)


### Filtering Years [2013–2018] and Compute Mean & Std Dev

In [0]:
from pyspark.sql.functions import col, avg, stddev

df_filtered_pop = df_population_flat.filter((col("Year") >= 2013) & (col("Year") <= 2018))
df_filtered_pop.select(avg("Population").alias("Mean_Population"),
                       stddev("Population").alias("StdDev_Population")).show()


###  Clean and Prepare Time Series Data

In [0]:
from pyspark.sql.functions import trim

df_time_series_clean = df_time_series.select(
    trim(col("series_id")).alias("series_id"),
    trim(col("year")).cast("int").alias("year"),
    trim(col("period")).alias("period"),
    trim(col("value")).cast("float").alias("value")
).filter(~col("period").like("M%"))  # ignore monthly if needed
df_time_series_clean.show(5)


### Find Best Year per Series ID

In [0]:
from pyspark.sql.functions import sum as _sum

df_yearly_sum = df_time_series_clean.groupBy("series_id", "year").agg(_sum("value").alias("yearly_value"))

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window.partitionBy("series_id").orderBy(col("yearly_value").desc())

df_best_year = df_yearly_sum.withColumn("rn", row_number().over(windowSpec)) \
                            .filter(col("rn") == 1) \
                            .drop("rn")
df_best_year.show()


In [0]:
df_target = df_time_series_clean.filter(
    (col("series_id") == "PRS30006032") & (col("period") == "Q01")
)

df_combined = df_target.join(
    df_population_flat,
    df_target.year == df_population_flat.Year,
    "left"
).select(
    df_target["series_id"], df_target["year"], df_target["period"],
    df_target["value"], df_population_flat["Population"]
)

df_combined.show()
