### create external table over json data

In [0]:
%sql
create table nation_population_tbl(
        id_nation varchar(20), 
        nation varchar(20),
        id_year int,
        year varchar(10),
        population int,
        slug_nation varchar(30)
)
USING json
location "/FileStore/rearc_data/nation_population-3.json"


In [0]:
%sql
select * from nation_population_tbl

id_nation,nation,id_year,year,population,slug_nation
01000US,United States,2023,2023,332387540,united-states
01000US,United States,2022,2022,331097593,united-states
01000US,United States,2021,2021,329725481,united-states
01000US,United States,2020,2020,326569308,united-states
01000US,United States,2019,2019,324697795,united-states
01000US,United States,2018,2018,322903030,united-states
01000US,United States,2017,2017,321004407,united-states
01000US,United States,2016,2016,318558162,united-states
01000US,United States,2015,2015,316515021,united-states
01000US,United States,2014,2014,314107084,united-states


In [0]:
# %sql
# create table pr_data_current_tbl(
#         series_id   varchar(20),
#         year varchar(5),
#         period varchar(5),
#         value varchar(5),
#         footnote_codes varchar(10)
# )
# USING csv
# location '/dbfs/FileStore/rearc_data/pr_data_0_Current.csv'


In [0]:
pr_schema = "series_id string,year int,period string,value decimal,footnote_codes string"

pr_data_df = spark.read.format('csv')\
                .option('header',True)\
                .schema(pr_schema)\
                .load('/FileStore/rearc_data/pr_data_0_Current.csv')

display(pr_data_df)

series_id,year,period,value,footnote_codes
PRS30006011,1995,Q01,3,
PRS30006011,1995,Q02,2,
PRS30006011,1995,Q03,1,
PRS30006011,1995,Q04,0,
PRS30006011,1995,Q05,1,
PRS30006011,1996,Q01,0,
PRS30006011,1996,Q02,0,
PRS30006011,1996,Q03,0,
PRS30006011,1996,Q04,0,
PRS30006011,1996,Q05,0,


### 1. Using the dataframe from the population data API (Part 2), generate the mean and the standard deviation of the annual US population across the years [2013, 2018] inclusive.

In [0]:
from pyspark.sql.functions import *

In [0]:
population_data_df = spark.read.table('nation_population_tbl')

population_data_mean_std_df = population_data_df\
                                .filter(expr("year in ('2013','2018')"))\
                                .agg(mean(col('population')).alias('mean'),stddev(col('population')).alias('std_deviation'))
display(population_data_mean_std_df)

mean,std_deviation
317219812.0,8037283.973522897


### 2. Using the dataframe from the time-series (Part 1), For every series_id, find the best year: the year with the max/largest sum of "value" for all quarters in that year. Generate a report with each series id, the best year for that series, and the summed value for that year.

In [0]:
from pyspark.sql.window import Window

In [0]:
Windowspec = Window.partitionBy("series_id").orderBy(desc("sum_of_value"))

series_id_max_value_year_for_quarters_df = pr_data_df.groupBy(['series_id','year'])\
                                            .agg(sum(col('value')).alias('sum_of_value'))\
                                            .withColumn("row_number", row_number().over(Windowspec))\
                                            .filter(col('row_number')==1)\
                                            .drop(col('row_number'))       

display(series_id_max_value_year_for_quarters_df)






series_id,year,sum_of_value
PRS30006011,2022,20
PRS30006012,2022,18
PRS30006013,1998,707
PRS30006021,2010,18
PRS30006022,2010,13
PRS30006023,2014,504
PRS30006031,2022,21
PRS30006032,2021,17
PRS30006033,1998,703
PRS30006061,2022,37


### 3. Using both dataframes from Part 1 and Part 2, generate a report that will provide the value for series_id = PRS30006032 and period = Q01 and the population for that given year (if available in the population dataset).

In [0]:
filter_pr_data_df = pr_data_df.filter((col("period") == 'Q01') & (trim(col("series_id")) == 'PRS30006032'))

result_data_df = filter_pr_data_df[['series_id','year','period','value']]\
                        .join(population_data_df[['id_year','population']]\
                              ,filter_pr_data_df.year==population_data_df.id_year\
                              ,how='inner')\
                        .drop(col('id_year'))

display(result_data_df)

series_id,year,period,value,population
PRS30006032,2013,Q01,1,311536594
PRS30006032,2014,Q01,0,314107084
PRS30006032,2015,Q01,-2,316515021
PRS30006032,2016,Q01,-1,318558162
PRS30006032,2017,Q01,1,321004407
PRS30006032,2018,Q01,1,322903030
PRS30006032,2019,Q01,-2,324697795
PRS30006032,2020,Q01,-7,326569308
PRS30006032,2021,Q01,1,329725481
PRS30006032,2022,Q01,5,331097593
