# Data manipulation for ARDL analysis

In this notebook, we will use Pyspark and aggregate monthly data to quarterly data so that we can merge with GDP data. 

In [1]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

In [2]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1684800927608_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Load exchange rate and trade data

In [16]:
ex = spark.read.parquet("s3://trade-final-project-bucket/dataset/exchangerate2.parquet").persist()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
ex = (ex.withColumnRenamed("Code","country")
        .withColumnRenamed("Time","year_month")
        .withColumnRenamed("value","ex")
     )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
ex.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Country_name: string (nullable = true)
 |-- country: long (nullable = true)
 |-- Country_name2: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- year_month: string (nullable = true)
 |-- ex: double (nullable = true)

In [19]:
ex.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+-------------+--------------------+----------+------------------+
|        Country_name|country|Country_name2|                Area|year_month|                ex|
+--------------------+-------+-------------+--------------------+----------+------------------+
|Afghanistan, Isla...|    130|  Afghanistan|                Asia|    198801|0.3092632225995748|
|             Albania|    229|      Albania|Central_and_East_...|    198801|              null|
+--------------------+-------+-------------+--------------------+----------+------------------+
only showing top 2 rows

In [20]:
data = spark.read.parquet("s3://trade-final-project-bucket/dataset/trades.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year_month: long (nullable = true)
 |-- export_import: long (nullable = true)
 |-- country: long (nullable = true)
 |-- hs9: long (nullable = true)
 |-- q1: long (nullable = true)
 |-- q2: long (nullable = true)
 |-- value: long (nullable = true)
 |-- hs6: string (nullable = true)
 |-- hs2: string (nullable = true)

In [22]:
data = data.join(ex, on=['country', 'year_month'], how='inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
data = data.withColumn('year_month_str', F.col('year_month').cast('string'))
data = (data.withColumn('quarter', F.quarter(F.to_date('year_month_str', 'yyyyMM')))
            .withColumn('year', F.year(F.to_date('year_month_str', 'yyyyMM')))
       )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- country: long (nullable = true)
 |-- year_month: long (nullable = true)
 |-- export_import: long (nullable = true)
 |-- hs9: long (nullable = true)
 |-- q1: long (nullable = true)
 |-- q2: long (nullable = true)
 |-- value: long (nullable = true)
 |-- hs6: string (nullable = true)
 |-- hs2: string (nullable = true)
 |-- Country_name: string (nullable = true)
 |-- Country_name2: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- ex: double (nullable = true)
 |-- year_month_str: string (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- year: integer (nullable = true)

## Aggregate from monthly data to quarterly data. 

In [46]:
data_g = (data.groupby("year", "quarter", "export_import", "country", "Country_name", "hs2")
          .agg(F.mean("ex").alias("mean_ex"), F.sum("value").alias("sum_value")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
data_g.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: integer (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- export_import: long (nullable = true)
 |-- country: long (nullable = true)
 |-- Country_name: string (nullable = true)
 |-- hs2: string (nullable = true)
 |-- mean_ex: double (nullable = true)
 |-- sum_value: long (nullable = true)

## Load GDP data and merge with previous dataframe 

In [32]:
gdp = spark.read.parquet("s3://trade-final-project-bucket/dataset/gdp_quarter.parquet").persist()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
gdp = (gdp.withColumn("quarter", F.substring(F.col("Time"), 2, 1).cast("integer"))
          .withColumn("year", F.substring(F.col("Time"), 4, 4).cast("integer"))
          .withColumnRenamed("value", "gdp")
      )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
gdp.head(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(Country_name='Argentina', Code=413, Country_name2='Argentina', Area='Middle_and_South_America', Time='Q1_1988', gdp=None, quarter=1, year=1988), Row(Country_name='Armenia, Rep. of', Code=151, Country_name2='Armenia', Area='Central_and_East_Europe_Russia', Time='Q1_1988', gdp=None, quarter=1, year=1988)]

In [35]:
gdp = gdp.withColumnRenamed("Code","country")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
data_g = data_g.join(gdp, on=['country', 'quarter', 'year'], how='inner')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
data_g = (data_g.withColumnRenamed("mean_ex","ex")
          .withColumnRenamed("sum_value","value")
         )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
data_g = data_g.withColumn('year_quarter', F.concat(F.col("year"), F.lit("_"), F.col("quarter")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
data_g.head(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(country=205, quarter=1, year=2002, export_import=1, Country_name='United Kingdom', hs2='85', ex=0.005314876791761418, value=80384260, Country_name='United Kingdom', Country_name2='United_Kingdom', Area='Western_Europe', Time='Q1_2002', gdp=292210.0, year_quarter='2002_1'), Row(country=207, quarter=4, year=2003, export_import=1, Country_name='Netherlands, The', hs2='84', ex=None, value=146989643, Country_name='Netherlands, The', Country_name2='Netherlands', Area='Western_Europe', Time='Q4_2003', gdp=129623.89, year_quarter='2003_4')]

In [51]:
data_g.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- country: long (nullable = true)
 |-- quarter: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- export_import: long (nullable = true)
 |-- Country_name: string (nullable = true)
 |-- hs2: string (nullable = true)
 |-- ex: double (nullable = true)
 |-- value: long (nullable = true)
 |-- Country_name: string (nullable = true)
 |-- Country_name2: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- gdp: double (nullable = true)
 |-- year_quarter: string (nullable = true)

In [54]:
data_g = (data_g.withColumn("value_log", F.log(F.col("value")))
                .withColumn("ex_log", F.log(F.col("ex")))
                .withColumn("gdp_log", F.log(F.col("gdp")))
         )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
data_g = (data_g[["country", "year_quarter", "export_import",  "hs2", "value_log", "ex_log", "gdp_log"]])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
data_g.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- country: long (nullable = true)
 |-- year_quarter: string (nullable = true)
 |-- export_import: long (nullable = true)
 |-- hs2: string (nullable = true)
 |-- value_log: double (nullable = true)
 |-- ex_log: double (nullable = true)
 |-- gdp_log: double (nullable = true)

## Add lags and difference columns

In [59]:
windowSpec = Window.partitionBy("country","hs2","export_import").orderBy("year_quarter")

# Create the lag variable
df_with_lag = (data_g.withColumn("value_log_L1", F.lag("value_log").over(windowSpec))
                 .withColumn("value_log_L2", F.lag("value_log",2).over(windowSpec))
                 .withColumn("value_log_L3", F.lag("value_log",3).over(windowSpec))
                 .withColumn("value_log_L4", F.lag("value_log",4).over(windowSpec))
                 .withColumn("ex_log_L1", F.lag("ex_log").over(windowSpec))
                 .withColumn("ex_log_L2", F.lag("ex_log",2).over(windowSpec))
                 .withColumn("ex_log_L3", F.lag("ex_log",3).over(windowSpec))
                 .withColumn("ex_log_L4", F.lag("ex_log",4).over(windowSpec))
                 .withColumn("gdp_log_L1", F.lag("gdp_log").over(windowSpec))
                 .withColumn("gdp_log_L2", F.lag("gdp_log",2).over(windowSpec))
                 .withColumn("gdp_log_L3", F.lag("gdp_log",3).over(windowSpec))
                 .withColumn("gdp_log_L4", F.lag("gdp_log",4).over(windowSpec))
                )

# Calculate the difference
df_with_diff = (df_with_lag.withColumn("D_value_log_L1", F.col("value_log_L1") - F.col("value_log_L2"))
                           .withColumn("D_value_log_L2", F.col("value_log_L2") - F.col("value_log_L3"))
                           .withColumn("D_value_log_L3", F.col("value_log_L3") - F.col("value_log_L4"))
                           .withColumn("D_ex_log_L1", F.col("ex_log_L1") - F.col("ex_log_L2"))
                           .withColumn("D_ex_log_L2", F.col("ex_log_L2") - F.col("ex_log_L3"))
                           .withColumn("D_ex_log_L3", F.col("ex_log_L3") - F.col("ex_log_L4"))
                           .withColumn("D_gdp_log_L1", F.col("gdp_log_L1") - F.col("gdp_log_L2"))
                           .withColumn("D_gdp_log_L2", F.col("gdp_log_L2") - F.col("gdp_log_L3"))
                           .withColumn("D_gdp_log_L3", F.col("gdp_log_L3") - F.col("gdp_log_L4"))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
df_with_diff = (df_with_diff.withColumn("D_value_log", F.col("value_log") - F.col("value_log_L1"))
                            .withColumn("D_ex_log", F.col("ex_log") - F.col("ex_log_L1"))
                            .withColumn("D_gdp_log", F.col("gdp_log") - F.col("gdp_log_L1"))
               )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
df_with_diff.filter((df_with_diff.country==413)&(df_with_diff.hs2==72)&(df_with_diff.export_import==1)).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------+-------------+---+------------------+-------------------+-------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+----------+----------+----------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------+------------+------------+--------------------+--------------------+---------+
|country|year_quarter|export_import|hs2|         value_log|             ex_log|gdp_log|      value_log_L1|      value_log_L2|      value_log_L3|      value_log_L4|          ex_log_L1|          ex_log_L2|          ex_log_L3|          ex_log_L4|gdp_log_L1|gdp_log_L2|gdp_log_L3|gdp_log_L4|      D_value_log_L1|      D_value_log_L2|      D_value_log_L3|         D_ex_log_L1|         D_ex_log_L2|         D_ex_log_L3|D_gdp_log_L1|D_gdp_log_L2|D_gdp_log_L3|         D_value_log|            D_ex_log|D

In [67]:
df_with_diff.write.parquet("s3://trade-final-project-bucket/dataset/trades_ARDL.parquet", mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…