In [12]:
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("CS504_Project").getOrCreate()
from pyspark.sql.functions import *
from datetime import date, datetime
import time
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

DataFrame[key: string, value: string]

In [13]:
####################################################################################################################################################################
# Read all files
####################################################################################################################################################################
# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","
file_type_csv = "csv"

# War Events File.
war_events = "data/war_events.csv"

war = spark.read.format(file_type_csv) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(war_events)

# display(war)

war.createOrReplaceTempView("WAR_EVENTS")

# Commodity Export File.
commodity_exports = "data/commodity_exports.csv"

exports_raw = spark.read.format(file_type_csv) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(commodity_exports)

exports = exports_raw.withColumn("Commodity", when(exports_raw.Commodity == "Dog or cat food; (not put up for retail sale), used in animal feeding","AnimalFodder") \
      .when(exports_raw.Commodity == "Wheat and meslin","Wheat") \
      .when(exports_raw.Commodity == "Iron ores and concentrates; including roasted iron pyrites","IronOre") \
      .when(exports_raw.Commodity == "Slag, dross; (other than granulated slag), scalings and other waste from the manufacture of iron or steel","Slag") \
      .when(exports_raw.Commodity == "Base metals, silver or gold, clad with platinum; not further worked than semi-manufactured","GoldOre") \
      .when(exports_raw.Commodity == "Cereal groats and meal; of maize (corn)","Corn") \
     .when(exports_raw.Commodity == "Vegetable oils; sunflower seed or safflower oil and their fractions, other than crude, whether or not refined, but not chemically modified","SunflowerOil") \
     .when(exports_raw.Commodity == "Vegetable oils; low erucic acid rape or colza oil and its fractions, crude","RapeseedOil") \
     .when(exports_raw.Commodity == "Copper ores and concentrates","Copper") \
     .when(exports_raw.Commodity == "Coal; anthracite, whether or not pulverised, but not agglomerated","Coal") \
     .when(exports_raw.Commodity == "Oils; petroleum oils and oils obtained from bituminous minerals, crude","CrudePetroleum") \
     .when(exports_raw.Commodity == "Petroleum gases and other gaseous hydrocarbons; liquefied, natural gas","RefinedPetroleum") \
     .when(exports_raw.Commodity == "Iron or steel, pig iron, spiegeleisen; granules thereof","Steel") \
     .when(exports_raw.Commodity == "Steel, stainless; cold-drawn or cold-rolled, tubes and pipes of circular cross-section","RolledSteel") \
     .when(exports_raw.Commodity == "Cooking appliances and plate warmers; for gas fuel or for both gas and other fuels, of iron or steel","Gas") \
     .when(exports_raw.Commodity == "Steel, stainless; table, kitchen and other household articles and parts thereof","StainlessSteel") \
     .when(exports_raw.Commodity == "Iron or steel; cast articles, excluding grinding balls and similar articles for mills, other than of non-malleable cast iron","CastIron") \
     .when(exports_raw.Commodity == "Nickel; unwrought, not alloyed","Nickel") \
     .when(exports_raw.Commodity == "Aluminium; unwrought, alloys","Aluminium") \
     .when(exports_raw.Commodity == "Barley","Barley") \
      )


# display(exports)
exports.createOrReplaceTempView("EXPORTS")

# Commodity Prices File.
commodity_prices = "data/commodity_prices.csv"

prices = spark.read.format(file_type_csv) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(commodity_prices)


prices=prices.withColumnRenamed("Date","Starting_Date")
# display(prices)
prices.createOrReplaceTempView("PRICES")

In [14]:
####################################################################################################################################################################
# Calculating the Year-Month for all Files
####################################################################################################################################################################
transform_war_events=spark.sql(""" 
                               select  *,date_format(TO_DATE(EVENT_DATE,'dd-MMMM-yyyy'),'yyyy-MM') as EVENT_YEAR_MONTH from WAR_EVENTS
                              """)
group_war_events=transform_war_events.groupBy("EVENT_YEAR_MONTH").agg(count("EVENT_TYPE").alias("COUNT_OF_EVENTS"))

# group_war_events.show()
group_war_events.createOrReplaceTempView("GROUP_EVENTS")

transform_exports=spark.sql(""" 
                               select  *,date_format(TO_DATE(Period,'yyyyMM'),'yyyy-MM') as PERIOD_YEAR_MONTH from EXPORTS
                              """)
# transform_exports.show(3)
transform_exports.createOrReplaceTempView("GROUP_EXPORTS")

transform_prices=spark.sql(""" 
                               select  *,date_format(TO_DATE(Starting_Date,'MM/dd/yyyy'),'yyyy-MM') as DATE_YEAR_MONTH from PRICES
                              """)
# transform_prices.show(3)
transform_prices.createOrReplaceTempView("GROUP_PRICES")

In [15]:
####################################################################################################################################################################
# Joining of all Files Based on Date
####################################################################################################################################################################
join_events_prices_exports=spark.sql(""" 
                               select  EVENT_YEAR_MONTH, 
                                       COUNT_OF_EVENTS,
                                       Reporter as REPORTER,
                                       Partner as PARTNER,
                                       GROUP_EXPORTS.Commodity,
                                       'Netweight (kg)'  as NETWEIGHT,
                                       'Trade Value (US$)' as TRADEVALUEINUSD,
                                       Amount as PRODUCERPRICE
                                       from 
                                       GROUP_EVENTS,
                                       GROUP_PRICES,
                                       GROUP_EXPORTS
                                       where
                                           EVENT_YEAR_MONTH = PERIOD_YEAR_MONTH
                                       AND PERIOD_YEAR_MONTH = DATE_YEAR_MONTH
                                       AND GROUP_EXPORTS.Commodity = GROUP_PRICES.Commodity
                                     
                              """)
display(join_events_prices_exports)

DataFrame[EVENT_YEAR_MONTH: string, COUNT_OF_EVENTS: bigint, REPORTER: string, PARTNER: string, Commodity: string, NETWEIGHT: string, TRADEVALUEINUSD: string, PRODUCERPRICE: string]

In [16]:
join_events_prices_exports.coalesce(1).write.format("parquet").save("data/Joined_Events_Prices_Exports.parquet")

                                                                                

In [17]:
join_events_prices_exports.show(4)

[Stage 20:>                                                         (0 + 8) / 8]

+----------------+---------------+------------------+------------------+---------+--------------+-----------------+-------------+
|EVENT_YEAR_MONTH|COUNT_OF_EVENTS|          REPORTER|           PARTNER|Commodity|     NETWEIGHT|  TRADEVALUEINUSD|PRODUCERPRICE|
+----------------+---------------+------------------+------------------+---------+--------------+-----------------+-------------+
|         2020-06|            993|         Lithuania|Russian Federation|    Steel|Netweight (kg)|Trade Value (US$)|        200.7|
|         2020-06|            993|Russian Federation|           Ukraine|    Steel|Netweight (kg)|Trade Value (US$)|        200.7|
|         2020-06|            993|          Slovenia|Russian Federation|    Wheat|Netweight (kg)|Trade Value (US$)|  169.7223367|
|         2020-06|            993|           Germany|           Ukraine|    Steel|Netweight (kg)|Trade Value (US$)|        200.7|
+----------------+---------------+------------------+------------------+---------+--------

                                                                                