In [1]:
from pyspark.sql.session import SparkSession
import config as c
import pyspark.sql.functions as fn
from pyspark.sql import Window

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
temperatureDF = spark.read.csv(c.staging+"/temperature",header=True,inferSchema=True)

In [4]:
temperatureDF_with_year = temperatureDF.withColumn("year",fn.substring("dt", 0, 4))

In [5]:
temperatureDF_final = temperatureDF_with_year.groupBy("country","year")\
                                        .agg(fn.avg("AverageTemperature").alias("temperature"))

In [6]:
w = Window.partitionBy("country")

In [7]:
temperatureDF_final = temperatureDF_final.withColumn("temperature",
                            fn.when(fn.col("temperature").isNull(),fn.avg("temperature").over(w))
                            .otherwise(fn.col("temperature")))

In [8]:
carbonPerCountry = spark.read.csv(c.staging+"/carbonPerCountry", header=True, inferSchema=True)

In [9]:
carbonPerCountry = carbonPerCountry.withColumn("annual_co2_emission_tonnes",
                            fn.when(fn.col("annual_co2_emission_tonnes").isNull(),
                                    fn.avg("annual_co2_emission_tonnes").over(w))
                            .otherwise(fn.col("annual_co2_emission_tonnes")))

In [10]:
carbonPerEconomy = spark.read.csv(c.staging+"/carbonPerEconomy",header=True,inferSchema=True)

In [11]:
carbonPerEconomy = carbonPerEconomy.withColumn("annual_co2_emission_gdp",
                            fn.when(fn.col("annual_co2_emission_gdp").isNull(),
                                    fn.avg("annual_co2_emission_gdp").over(w))
                            .otherwise(fn.col("annual_co2_emission_gdp")))

In [12]:
carbonPerCapita = spark.read.csv(c.staging+"/carbonPerCapita",header=True,inferSchema=True)

In [13]:
carbonPerCapita = carbonPerCapita.withColumn("annual_co2_emission_capita",
                            fn.when(fn.col("annual_co2_emission_capita").isNull(),
                                    fn.avg("annual_co2_emission_capita").over(w))
                            .otherwise(fn.col("annual_co2_emission_capita")))

In [14]:
carbonAnnualShare = spark.read.csv(c.staging+"/carbonAnnualShare",header=True,inferSchema=True)

In [15]:
carbonAnnualShare = carbonAnnualShare.withColumn("annual_co2_share",
                            fn.when(fn.col("annual_co2_share").isNull(),
                                    fn.avg("annual_co2_share").over(w))
                            .otherwise(fn.col("annual_co2_share")))

In [16]:
gasPerSector = spark.read.csv(c.staging+"/gasPerSector",header=True,inferSchema=True)

In [17]:
gasPerSector = gasPerSector.withColumn("source_others",
                            fn.when(fn.col("source_others").isNull(),
                                    fn.avg("source_others").over(w))
                            .otherwise(fn.col("source_others"))) \
                    .withColumn("source_bunkers",
                            fn.when(fn.col("source_bunkers").isNull(),
                                    fn.avg("source_bunkers").over(w))
                            .otherwise(fn.col("source_bunkers"))) \
                    .withColumn("source_waste",
                            fn.when(fn.col("source_waste").isNull(),
                                    fn.avg("source_waste").over(w))
                            .otherwise(fn.col("source_waste"))) \
                    .withColumn("source_industry",
                            fn.when(fn.col("source_industry").isNull(),
                                    fn.avg("source_industry").over(w))
                            .otherwise(fn.col("source_industry"))) \
                    .withColumn("source_res_com",
                            fn.when(fn.col("source_res_com").isNull(),
                                    fn.avg("source_res_com").over(w))
                            .otherwise(fn.col("source_res_com"))) \
                    .withColumn("source_transport",
                            fn.when(fn.col("source_transport").isNull(),
                                    fn.avg("source_transport").over(w))
                            .otherwise(fn.col("source_transport"))) \
                    .withColumn("source_agriculture",
                            fn.when(fn.col("source_agriculture").isNull(),
                                    fn.avg("source_agriculture").over(w))
                            .otherwise(fn.col("source_agriculture"))) \
                    .withColumn("source_forestry",
                            fn.when(fn.col("source_forestry").isNull(),
                                    fn.avg("source_forestry").over(w))
                            .otherwise(fn.col("source_forestry"))) \
                    .withColumn("source_land",
                            fn.when(fn.col("source_land").isNull(),
                                    fn.avg("source_land").over(w))
                            .otherwise(fn.col("source_land"))) \
                    .withColumn("source_energy",
                            fn.when(fn.col("source_energy").isNull(),
                                    fn.avg("source_energy").over(w))
                            .otherwise(fn.col("source_energy"))) 

In [18]:
join_list = ["country","year"]

In [19]:
gasDF = carbonPerCountry.join(carbonPerEconomy,join_list)\
                        .join(carbonPerCapita,join_list)\
                        .join(carbonAnnualShare,join_list)\
                        .join(gasPerSector,join_list)

In [20]:
data_final = temperatureDF_final.join(gasDF,join_list)

In [21]:
data_final.write.mode("overwrite").csv(c.input+"/dataset",header=True,sep=",")