In [0]:
spark.conf.set(
  "fs.azure.account.key.practicemyazure.dfs.core.windows.net",
  dbutils.secrets.get(scope="adf-secrets", key="blob-storage-key")
)

In [0]:
# from pyspark.sql.functions import split, trim, col


# df_raw_pr = spark.read.json("abfss://questblsdata@practicemyazure.dfs.core.windows.net/pr/pr.data.0.Current")
# df_raw_pr.show(truncate = False)
# # Split the tab-separated string into an array
# df_cleaned = df_raw_pr.select(
#     split(col(df_raw_pr.columns[0]), r'\t').alias("columns")
# )

# # Then explode it into proper columns
# df_final = df_cleaned.select(
#     trim(col("columns")[0]).alias("series_id"),
#     trim(col("columns")[1]).alias("year"),
#     trim(col("columns")[2]).alias("period"),
#     trim(col("columns")[3]).alias("value"),
#     trim(col("columns")[4]).alias("footnote_codes")
# )

# df_final.show()

In [0]:
from pyspark.sql.functions import trim, regexp_replace, col

df_json = spark.read.json("abfss://questblsdata@practicemyazure.dfs.core.windows.net/pr/pr.data.0.Current")

# Clean up column names and trim values
df_final = df_json.select(
    trim(col("Prop_0")).alias("series_id"),
    trim(col("Prop_1")).alias("year"),
    trim(col("Prop_2")).alias("period"),
    trim(col("Prop_3")).alias("value"),
    trim(regexp_replace(col("Prop_4"), r"\r", "")).alias("footnote_codes")
)

display(df_final) #.show(truncate=False)

series_id,year,period,value,footnote_codes
series_id,year,period,value,footnote_codes
PRS30006011,1995,Q01,2.6,
PRS30006011,1995,Q02,2.1,
PRS30006011,1995,Q03,0.9,
PRS30006011,1995,Q04,0.1,
PRS30006011,1995,Q05,1.4,
PRS30006011,1996,Q01,-0.2,
PRS30006011,1996,Q02,-0.3,
PRS30006011,1996,Q03,-0.1,
PRS30006011,1996,Q04,0.2,


In [0]:
from pyspark.sql.functions import split, trim, col

from pyspark.sql.functions import explode, col

df_raw_datausa = spark.read.json("abfss://questblsdata@practicemyazure.dfs.core.windows.net/datausa/")
df_raw_datausa.show(truncate = False)

from pyspark.sql.functions import explode, col

# Step 1: Explode the 'data' array
df_exploded = df_raw_datausa.select(explode(col("data")).alias("population_data"))

# Step 2: Select the named fields from the struct
df_clean = df_exploded.select(
    col("population_data.`ID Nation`").alias("geo_id"),
    col("population_data.`ID Year`").alias("year_id"),
    col("population_data.Nation").alias("nation"),
    col("population_data.Population").alias("population"),
    col("population_data.`Slug Nation`").alias("slug"),
    col("population_data.Year").alias("year")
)

df_clean.show()

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|data     

In [0]:
from pyspark.sql.functions import col, mean, stddev

# Filter years 2013 through 2018
filtered_df = df_clean.filter((col("year") >= 2013) & (col("year") <= 2018))

# Cast population to integer if needed
filtered_df = filtered_df.withColumn("population", col("population").cast("long"))

# Compute mean and standard deviation
stats_df = filtered_df.select(
    mean("population").alias("mean_population"),
    stddev("population").alias("stddev_population")
)

stats_df.show()

+---------------+------------------+
|mean_population| stddev_population|
+---------------+------------------+
|   3.17437383E8|3922663.9759295187|
+---------------+------------------+



In [0]:
from pyspark.sql.functions import year, sum as _sum, col, row_number
from pyspark.sql.window import Window

# Step 1: Convert year to integer and value to float
df = df_final.withColumn("year", col("year").cast("int")) \
             .withColumn("value", col("value").cast("float"))

# Step 2: Group by series_id and year and sum the value
df_grouped = df.groupBy("series_id", "year") \
               .agg(_sum("value").alias("yearly_value"))

# Step 3: Use a window function to rank years by total value per series
window_spec = Window.partitionBy("series_id").orderBy(col("yearly_value").desc())

df_ranked = df_grouped.withColumn("rank", row_number().over(window_spec))

# Step 4: Filter for best year (rank = 1)
df_best_years = df_ranked.filter(col("rank") == 1).select(
    "series_id", "year", "yearly_value"
)

df_best_years.show()

+-----------+----+------------------+
|  series_id|year|      yearly_value|
+-----------+----+------------------+
|PRS30006011|2022| 20.49999976158142|
|PRS30006012|2022| 17.09999990463257|
|PRS30006013|1998| 705.8950042724609|
|PRS30006021|2010|17.699999570846558|
|PRS30006022|2010| 12.40000033378601|
|PRS30006023|2014|503.21600341796875|
|PRS30006031|2022|              20.5|
|PRS30006032|2021|17.100000202655792|
|PRS30006033|1998| 702.6719970703125|
|PRS30006061|2022|              37.0|
|PRS30006062|2021|31.600000023841858|
|PRS30006063|2024| 646.7480010986328|
|PRS30006081|2021|24.399999618530273|
|PRS30006082|2021|24.399999618530273|
|PRS30006083|2021|110.74199676513672|
|PRS30006091|2002|43.299999713897705|
|PRS30006092|2002| 44.39999961853027|
|PRS30006093|2013|  514.156005859375|
|PRS30006101|2020| 33.50000047683716|
|PRS30006102|2020| 36.20000076293945|
+-----------+----+------------------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import col

# 1. Filter df_final for the target series_id and period
df_filtered = df_final.filter(
    (col("series_id") == "PRS30006011") & (col("period") == "Q01")
).withColumn("year", col("year").cast("int")) \
 .withColumn("value", col("value").cast("float"))

# 2. Prepare df_clean (population data) with matching year
df_population = df_clean.select(
    col("year").cast("int").alias("year"),
    col("population").cast("long").alias("Population")
).dropDuplicates(["year"])

# 3. Join the two DataFrames on year
df_report = df_filtered.join(df_population, on="year", how="left") \
    .select("series_id", "year", "period", "value", "Population")

display(df_report) #.show()

series_id,year,period,value,Population
PRS30006011,1995,Q01,2.6,
PRS30006011,1996,Q01,-0.2,
PRS30006011,1997,Q01,0.3,
PRS30006011,1998,Q01,1.6,
PRS30006011,1999,Q01,-1.4,
PRS30006011,2000,Q01,-0.6,
PRS30006011,2001,Q01,-1.4,
PRS30006011,2002,Q01,-8.5,
PRS30006011,2003,Q01,-4.6,
PRS30006011,2004,Q01,-3.5,
