In [2]:
# %pip install pyspark==3.4.1

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# from pyspark.sql.functions import array, col
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import datetime

In [4]:
import os
cwd = os.getcwd()

In [5]:
spark = SparkSession.builder.master("spark://127.0.0.1:7077") \
                    .appName('ConsumerPriceIndices') \
                    .getOrCreate()
# spark = SparkSession.builder.master("local[1]") \
#                     .appName('ConsumerPriceIndices') \
#                     .getOrCreate()

In [6]:
cpi_df = spark.read.option("header", True)\
    .csv(f"{cwd}/dataset/ConsumerPriceIndices_E_All_Data/ConsumerPriceIndices_E_All_Data.csv")
# cpi_df.printSchema()

In [7]:
col_name = []
for col in cpi_df.dtypes:
    col_name.append(col[0])
# col_name

In [8]:
year_list = []
for x in col_name:
    if 'Y' in x and 'F' not in x and 'N' not in x:
        year_list.append(int(x.replace('Y', '')))
# year_list

In [9]:
ItemCPI_df = cpi_df.select(
    cpi_df['Item Code'].cast('int').alias('id'),
    cpi_df['Item'].cast('string').alias('item'),
    cpi_df['Unit'].cast('string').alias('unit'),
).orderBy('id')
ItemCPI_df = ItemCPI_df.dropDuplicates(["id"])
ItemCPI_df.show()

+-----+--------------------+----+
|   id|                item|unit|
+-----+--------------------+----+
|23012|Consumer Prices, ...|null|
|23013|Consumer Prices, ...|null|
|23014|Food price inflation|   %|
+-----+--------------------+----+



In [10]:
for year in year_list:
    cpi_df = cpi_df.withColumn(f"NewColumn_{year}", F.array(f"Y{year}", f"Y{year}F", f"Y{year}N", F.lit(year)))
    cpi_df = cpi_df.drop(f"Y{year}", f"Y{year}F")
    
cpi_df = cpi_df.withColumn(f"merge_column", F.array(*[f"NewColumn_{year}" for year in year_list]))
transformed_date = datetime.datetime.now()
for year in year_list:
    cpi_df = cpi_df.drop(f"NewColumn_{year}")
cpi_df = cpi_df.select(
    cpi_df['Area Code'].cast('int').alias('country_id'),
    cpi_df['Item Code'].cast('int').alias('item_cpi_id'),
    cpi_df['Months Code'].cast('int').alias('month_id'),
    F.explode('merge_column').alias('year_col')
)
cpi_df = cpi_df.withColumn('value', F.lit(cpi_df.year_col[0])) \
                .withColumn('flag', F.lit(cpi_df.year_col[1])) \
                .withColumn('base_year', F.lit(cpi_df.year_col[2])) \
                .withColumn('year', F.lit(cpi_df.year_col[3])) \
                .withColumn('transformed_date', F.lit(transformed_date))
cpi_df = cpi_df.drop('year_col')
cpi_df.show()

+----------+-----------+--------+----------+----+-----------------+----+--------------------+
|country_id|item_cpi_id|month_id|     value|flag|        base_year|year|    transformed_date|
+----------+-----------+--------+----------+----+-----------------+----+--------------------+
|         2|      23013|    7001| 24.356332|   I|base year is 2015|2000|2023-07-26 19:59:...|
|         2|      23013|    7001| 29.944592|   I|base year is 2015|2001|2023-07-26 19:59:...|
|         2|      23013|    7001| 33.421952|   I|base year is 2015|2002|2023-07-26 19:59:...|
|         2|      23013|    7001| 39.967661|   I|base year is 2015|2003|2023-07-26 19:59:...|
|         2|      23013|    7001| 43.401939|   I|base year is 2015|2004|2023-07-26 19:59:...|
|         2|      23013|    7001| 48.779789|   X|base year is 2015|2005|2023-07-26 19:59:...|
|         2|      23013|    7001| 53.967956|   X|base year is 2015|2006|2023-07-26 19:59:...|
|         2|      23013|    7001| 56.689401|   X|base year i

In [11]:
ItemCPI_df.write\
    .mode("overwrite") \
    .parquet("hdfs://127.0.0.1:9000/FAOSTAT_prj/DataWarehouse/Item_CPI")

In [12]:
cpi_df.write\
    .mode("overwrite") \
    .parquet("hdfs://127.0.0.1:9000/FAOSTAT_prj/DataWarehouse/CPI")

In [13]:
spark.stop()