### Spark Setup

In [54]:
import sys
print(sys.executable)

/home/lap15383/anaconda3/envs/hello-spark/bin/python


In [55]:
!conda info | grep "active environment"

     active environment : hello-spark


In [56]:
!pyspark --version # 2.12.15

22/08/24 08:47:27 WARN Utils: Your hostname, lap15383-ThinkPad-T14-Gen-2i resolves to a loopback address: 127.0.1.1; using 172.25.37.44 instead (on interface wlp0s20f3)
22/08/24 08:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.0
      /_/
                        
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 11.0.16
Branch HEAD
Compiled by user ubuntu on 2022-06-09T19:58:58Z
Revision f74867bddfbcdd4d08076db36851e88b15e66556
Url https://github.com/apache/spark
Type --help for more information.


In [57]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window

In [58]:
spark = SparkSession.builder.appName("HelloPySpark").getOrCreate()

In [59]:
spark # only 1 cluster. On cloud => Cluster 1 2 3

### Data Proprocessing

In [60]:
IMPORT_DATA = "data/2018-2010_import.csv"
EXPORT_DATA = "data/2018-2010_export.csv"

In [61]:
df_import = spark.read.csv(IMPORT_DATA, header=True, inferSchema=True)
df_export = spark.read.options(header=True, inferSchema=True). \
                        csv(EXPORT_DATA)

In [62]:
type(df_import)

pyspark.sql.dataframe.DataFrame

In [63]:
df_import

DataFrame[HSCode: int, Commodity: string, value: double, country: string, year: int]

In [64]:
df_export

DataFrame[HSCode: int, Commodity: string, value: double, country: string, year: int]

In [65]:
df_import.show()

+------+--------------------+------+---------------+----+
|HSCode|           Commodity| value|        country|year|
+------+--------------------+------+---------------+----+
|     5|PRODUCTS OF ANIMA...|   0.0|AFGHANISTAN TIS|2018|
|     7|EDIBLE VEGETABLES...| 12.38|AFGHANISTAN TIS|2018|
|     8|EDIBLE FRUIT AND ...| 268.6|AFGHANISTAN TIS|2018|
|     9|COFFEE, TEA, MATE...| 35.48|AFGHANISTAN TIS|2018|
|    11|PRODUCTS OF THE M...|  null|AFGHANISTAN TIS|2018|
|    12|OIL SEEDS AND OLE...|  8.32|AFGHANISTAN TIS|2018|
|    13|LAC; GUMS, RESINS...|108.78|AFGHANISTAN TIS|2018|
|    20|PREPARATIONS OF V...|  0.65|AFGHANISTAN TIS|2018|
|    25|SALT; SULPHUR; EA...|  0.05|AFGHANISTAN TIS|2018|
|    27|MINERAL FUELS, MI...|   0.0|AFGHANISTAN TIS|2018|
|    39|PLASTIC AND ARTIC...|  null|AFGHANISTAN TIS|2018|
|    41|RAW HIDES AND SKI...|   0.0|AFGHANISTAN TIS|2018|
|    49|PRINTED BOOKDS, N...|  null|AFGHANISTAN TIS|2018|
|    51|WOOL, FINE OR COA...|  0.17|AFGHANISTAN TIS|2018|
|    52|      

In [66]:
df_export.show(vertical=True)

-RECORD 0-------------------------
 HSCode    | 2                    
 Commodity | MEAT AND EDIBLE M... 
 value     | 0.18                 
 country   | AFGHANISTAN TIS      
 year      | 2018                 
-RECORD 1-------------------------
 HSCode    | 3                    
 Commodity | FISH AND CRUSTACE... 
 value     | 0.0                  
 country   | AFGHANISTAN TIS      
 year      | 2018                 
-RECORD 2-------------------------
 HSCode    | 4                    
 Commodity | DAIRY PRODUCE; BI... 
 value     | 12.48                
 country   | AFGHANISTAN TIS      
 year      | 2018                 
-RECORD 3-------------------------
 HSCode    | 6                    
 Commodity | LIVE TREES AND OT... 
 value     | 0.0                  
 country   | AFGHANISTAN TIS      
 year      | 2018                 
-RECORD 4-------------------------
 HSCode    | 7                    
 Commodity | EDIBLE VEGETABLES... 
 value     | 1.89                 
 country   | AFGHANI

In [67]:
print(df_import.count())
print(df_export.count())

76124
137023


In [68]:
df_import.printSchema()
df_export.printSchema()

root
 |-- HSCode: integer (nullable = true)
 |-- Commodity: string (nullable = true)
 |-- value: double (nullable = true)
 |-- country: string (nullable = true)
 |-- year: integer (nullable = true)

root
 |-- HSCode: integer (nullable = true)
 |-- Commodity: string (nullable = true)
 |-- value: double (nullable = true)
 |-- country: string (nullable = true)
 |-- year: integer (nullable = true)



In [69]:
df_import.describe().show()



+-------+------------------+--------------------+------------------+---------------+------------------+
|summary|            HSCode|           Commodity|             value|        country|              year|
+-------+------------------+--------------------+------------------+---------------+------------------+
|  count|             76124|               76124|             64536|          76124|             76124|
|   mean| 53.90502338290158|                null|62.361424941119346|           null|2014.0182990909568|
| stddev|27.546852275859635|                null| 666.3271770956131|           null| 2.579384776170248|
|    min|                 1|AIRCRAFT, SPACECR...|               0.0|AFGHANISTAN TIS|              2010|
|    max|                99|ZINC AND ARTICLES...|          32781.57|       ZIMBABWE|              2018|
+-------+------------------+--------------------+------------------+---------------+------------------+



                                                                                

In [70]:
df_export.describe().show()



+-------+------------------+--------------------+------------------+---------------+------------------+
|summary|            HSCode|           Commodity|             value|        country|              year|
+-------+------------------+--------------------+------------------+---------------+------------------+
|  count|            137023|              137023|            122985|         137023|            137023|
|   mean|51.330302212037395|                null|21.567829166160198|           null| 2014.056304416047|
| stddev|28.018025954244337|                null|229.70127859467328|           null|2.5801603419089547|
|    min|                 1|AIRCRAFT, SPACECR...|               0.0|AFGHANISTAN TIS|              2010|
|    max|                99|ZINC AND ARTICLES...|          19805.17|       ZIMBABWE|              2018|
+-------+------------------+--------------------+------------------+---------------+------------------+



                                                                                

#### Remove duplicate records

In [71]:
print("Import Df distinct values:",df_import.distinct().count())
print("Import Df total values:",df_import.count())
print("Export Df distinct values:",df_export.distinct().count())
print("Export Df total values:",df_export.count())

Import Df distinct values: 75093
Import Df total values: 76124
Export Df distinct values: 137023
Export Df total values: 137023


In Import Df, there are some duplicate records

In [72]:
df_import = df_import.dropDuplicates()

In [73]:
print(df_import.dropDuplicates(['Commodity', 'country', 'year']).count() == df_import.count())
print(df_export.dropDuplicates(['Commodity', 'country', 'year']).count() == df_export.count())

True
True


(commodity, country, year) is the primary keys of Import and Export Dataframe

#### Clean Data

In [74]:
# Null, Empty
def check_na(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    return df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns])

print("Number of record in Import DataFrame:", df_import.count())
check_na(df_import).show()
# df_import.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_import.columns]).show()

print("Number of record in Export DataFrame:", df_export.count())
check_na(df_export).show()
# df_export.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_export.columns]).show()

Number of record in Import DataFrame: 75093
+------+---------+-----+-------+----+
|HSCode|Commodity|value|country|year|
+------+---------+-----+-------+----+
|     0|        0|11416|      0|   0|
+------+---------+-----+-------+----+

Number of record in Export DataFrame: 137023
+------+---------+-----+-------+----+
|HSCode|Commodity|value|country|year|
+------+---------+-----+-------+----+
|     0|        0|14038|      0|   0|
+------+---------+-----+-------+----+



In [75]:
from pyspark.ml.feature import Imputer

na_columns = ['value'] #int type
def fill_na_with_median(df: pyspark.sql.dataframe.DataFrame, na_columns: list) -> pyspark.sql.dataframe.DataFrame:
    imputer = Imputer(inputCols=na_columns, outputCols=[c for c in na_columns])
    tmp_df =  imputer.setStrategy("median").fit(df).transform(df)
    # tmp_df = tmp_df.drop(*na_columns)
    # for item in na_columns:
    #     tmp_df = tmp_df.withColumnRenamed('{}_imputed'.format(item), item)
    return tmp_df

# fill_na_with_median(df_import, na_columns).show()

In [76]:
df_import = fill_na_with_median(df_import, na_columns)
df_export = fill_na_with_median(df_export, na_columns)

In [77]:
check_na(df_import).show()
check_na(df_export).show()

+------+---------+-----+-------+----+
|HSCode|Commodity|value|country|year|
+------+---------+-----+-------+----+
|     0|        0|    0|      0|   0|
+------+---------+-----+-------+----+





+------+---------+-----+-------+----+
|HSCode|Commodity|value|country|year|
+------+---------+-----+-------+----+
|     0|        0|    0|      0|   0|
+------+---------+-----+-------+----+



                                                                                

#### Setup function Longest Consecutive Year

In [78]:
# Write unittest
def longest_consecutive_year(l: list) -> int:
    s = set(l)
    lst = []
    for i in s:
        lst.append(i)
    lst.sort()
    n = len(lst)
    consecutive_year = 1
    ans = 1
    for i in range(1,n):
        if lst[i] == lst[i-1] + 1:
            consecutive_year += 1
        else:
            ans = max(ans, consecutive_year)
            consecutive_year = 1
    ans = max(ans, consecutive_year)
    return ans

lst = [2014, 2017, 2018, 2015, 2013, 2010, 2012, 2012, 2012, 2010] 
print(longest_consecutive_year(lst))
# [2014, 2017, 2016, 2018, 2015, 2013, 2010, 2012, 2011]

4


In [79]:
# Convert a Python unction to Pyspark UDF (User Defined Function)
from pyspark.sql.types import IntegerType, StringType, ArrayType, FloatType, StructField, StructType
longest_consecutive_year_udf = F.udf(lambda lst: longest_consecutive_year(lst))

#### Setup rank of country by value

In [80]:
df_import = df_import.withColumn("rank_value", F.rank().over(Window.partitionBy("Commodity").orderBy(F.col("value").desc()))) \
        .withColumn("country_rank", F.when(F.col("rank_value") == "1", F.col("country")) \
        .otherwise(F.lit(None))) \

df_import.show()

+------+--------------------+-------+----------+----+----------+------------+
|HSCode|           Commodity|  value|   country|year|rank_value|country_rank|
+------+--------------------+-------+----------+----+----------+------------+
|    88|AIRCRAFT, SPACECR...|4237.56|     U S A|2016|         1|       U S A|
|    88|AIRCRAFT, SPACECR...|3382.94|    FRANCE|2018|         2|        null|
|    88|AIRCRAFT, SPACECR...| 3297.8|    FRANCE|2017|         3|        null|
|    88|AIRCRAFT, SPACECR...|3017.42|    FRANCE|2016|         4|        null|
|    88|AIRCRAFT, SPACECR...|2333.57|     U S A|2014|         5|        null|
|    88|AIRCRAFT, SPACECR...|2049.61|     U S A|2017|         6|        null|
|    88|AIRCRAFT, SPACECR...|1973.15|     U S A|2013|         7|        null|
|    88|AIRCRAFT, SPACECR...|1857.04|     U S A|2015|         8|        null|
|    88|AIRCRAFT, SPACECR...| 1662.5|     U S A|2012|         9|        null|
|    88|AIRCRAFT, SPACECR...|1594.39|     U S A|2010|        10|

In [81]:
df_export = df_export.withColumn("rank_value", F.rank().over(Window.partitionBy("Commodity").orderBy(F.col("value").desc()))) \
        .withColumn("country_rank", F.when(F.col("rank_value") == "1", F.col("country")) \
        .otherwise(F.lit(None))) \

df_export.show()

+------+--------------------+------+--------+----+----------+------------+
|HSCode|           Commodity| value| country|year|rank_value|country_rank|
+------+--------------------+------+--------+----+----------+------------+
|    76|ALUMINIUM AND ART...|909.65|MALAYSIA|2018|         1|    MALAYSIA|
|    76|ALUMINIUM AND ART...| 782.6|   U S A|2018|         2|        null|
|    76|ALUMINIUM AND ART...|775.63|MALAYSIA|2017|         3|        null|
|    76|ALUMINIUM AND ART...|764.63|KOREA RP|2017|         4|        null|
|    76|ALUMINIUM AND ART...|742.75|KOREA RP|2016|         5|        null|
|    76|ALUMINIUM AND ART...|680.27|   U S A|2017|         6|        null|
|    76|ALUMINIUM AND ART...|566.44|KOREA RP|2014|         7|        null|
|    76|ALUMINIUM AND ART...|538.06|KOREA RP|2015|         8|        null|
|    76|ALUMINIUM AND ART...|528.77|KOREA RP|2018|         9|        null|
|    76|ALUMINIUM AND ART...| 468.4|  TURKEY|2018|        10|        null|
|    76|ALUMINIUM AND ART

## Problem 1: Calculate total import and export values for each product

### Processing in Import DataFrame

1. Total import value and number of countries import from

In [82]:
new_df_import = df_import.groupBy("Commodity") \
            .agg(F.sum("value").alias("sum_value_import"))
new_df_import.show()

+--------------------+------------------+
|           Commodity|  sum_value_import|
+--------------------+------------------+
|       LIVE ANIMALS.|149.68999999999994|
|                SILK|2367.4499999999985|
| MAN-MADE FILAMENTS.| 7126.429999999997|
|PROJECT GOODS; SO...| 39179.01999999996|
|SHIPS, BOATS AND ...| 47597.75000000002|
|ARMS AND AMMUNITI...|326.37000000000023|
|PREPARATIONS OF M...|51.449999999999996|
|OTHER MADE UP TEX...|3937.5699999999915|
|MISCELLANEOUS EDI...|           1303.26|
|ARTICLES OF LEATH...| 2865.159999999998|
|ARTICLES OF IRON ...| 36412.63000000003|
|COPPER AND ARTICL...| 30108.51000000003|
|ARTICLES OF APPAR...|2089.7899999999986|
|OTHER VEGETABLE T...| 2737.029999999998|
|MISCELLANEOUS CHE...|38863.820000000036|
|CARPETS AND OTHER...| 833.1100000000004|
|PREPARED FEATHERS...|154.85000000000014|
|RAW HIDES AND SKI...|  5369.93999999999|
|MISCELLANEOUS ART...| 6449.509999999991|
|ALUMINIUM AND ART...|32144.880000000034|
+--------------------+------------

In [83]:
tmp = df_import.dropDuplicates(["Commodity", "country"]) \
        .groupBy("Commodity") \
        .agg(F.count("country").alias("nums_country_import"))

new_df_import = new_df_import.join(tmp, "Commodity", "inner")
new_df_import.show()

+--------------------+------------------+-------------------+
|           Commodity|  sum_value_import|nums_country_import|
+--------------------+------------------+-------------------+
|       LIVE ANIMALS.|149.68999999999994|                 90|
|                SILK|2367.4499999999985|                 68|
| MAN-MADE FILAMENTS.| 7126.429999999997|                114|
|PROJECT GOODS; SO...| 39179.01999999996|                160|
|SHIPS, BOATS AND ...| 47597.75000000002|                 90|
|ARMS AND AMMUNITI...|326.37000000000023|                 52|
|PREPARATIONS OF M...|51.449999999999996|                 39|
|OTHER MADE UP TEX...|3937.5699999999915|                145|
|MISCELLANEOUS EDI...|           1303.26|                121|
|ARTICLES OF LEATH...| 2865.159999999998|                131|
|ARTICLES OF IRON ...| 36412.63000000003|                199|
|COPPER AND ARTICL...| 30108.51000000003|                178|
|ARTICLES OF APPAR...|2089.7899999999986|                134|
|OTHER V

2. Max value and Country that this product has the highest value

In [84]:
from pyspark.sql import Window
tmp_import = df_import.withColumn("rank_value", F.rank().over(Window.partitionBy("Commodity").orderBy(F.col("value").desc()))) 
tmp_import = tmp_import.filter(tmp_import.rank_value == 1).select("Commodity", "country", "value") \
                .withColumnRenamed("value", "max_value_import") \
                .withColumnRenamed("country", "country_with_max_value_import")

tmp_import.show()

+--------------------+-----------------------------+----------------+
|           Commodity|country_with_max_value_import|max_value_import|
+--------------------+-----------------------------+----------------+
|AIRCRAFT, SPACECR...|                        U S A|         4237.56|
|ALBUMINOIDAL SUBS...|                   CHINA P RP|          128.48|
|ALUMINIUM AND ART...|                   CHINA P RP|         1174.62|
|ANIMAL OR VEGETAB...|                    INDONESIA|         5658.82|
|ARMS AND AMMUNITI...|                 SOUTH AFRICA|           41.67|
|ARTICLES OF APPAR...|                   CHINA P RP|          197.89|
|ARTICLES OF APPAR...|                BANGLADESH PR|          285.13|
|ARTICLES OF IRON ...|                   CHINA P RP|         1735.33|
|ARTICLES OF LEATH...|                   CHINA P RP|          377.04|
|ARTICLES OF STONE...|                   CHINA P RP|          539.19|
|BEVERAGES, SPIRIT...|                        U S A|          272.59|
|CARPETS AND OTHER..

In [85]:
# # Note: Uppercase and lowercase when groupBy._df_import groupBy accept and auto convert flexible upper and lower while select column not accept.
# max_value_import = df_import.groupBy("Commodity").max("value").withColumnRenamed("max(value)", "max_value")
# tmp_import = df_import.join(max_value_import, "Commodity", "left")
# tmp_import = tmp_import.filter(tmp_import.value == tmp_import.max_value).select('Commodity', 'country', 'value') \
#                         .withColumnRenamed("country", "country_with_max_value_import") \
#                         .withColumnRenamed("value", "max_value_import")
# # print(tmp_import.count())
# tmp_import.show()

In [86]:
new_df_import = new_df_import.join(tmp_import, 'Commodity', 'inner')
new_df_import.show()

+--------------------+------------------+-------------------+-----------------------------+----------------+
|           Commodity|  sum_value_import|nums_country_import|country_with_max_value_import|max_value_import|
+--------------------+------------------+-------------------+-----------------------------+----------------+
|       LIVE ANIMALS.|149.68999999999994|                 90|                        U S A|            4.07|
|                SILK|2367.4499999999985|                 68|                   CHINA P RP|          358.36|
| MAN-MADE FILAMENTS.| 7126.429999999997|                114|                   CHINA P RP|          447.87|
|PROJECT GOODS; SO...| 39179.01999999996|                160|                   CHINA P RP|         5062.51|
|SHIPS, BOATS AND ...| 47597.75000000002|                 90|                   CHINA P RP|         1454.44|
|ARMS AND AMMUNITI...|326.37000000000023|                 52|                 SOUTH AFRICA|           41.67|
|PREPARATIONS OF M.

3. Longest Consecutive Year

In [87]:
year_import = df_import.select("Commodity", "year") \
                    .distinct() \
                    .groupBy("Commodity") \
                    .agg(F.collect_list('year')) \
                    .withColumnRenamed("collect_list(year)", "year_list")
year_import.show()

+--------------------+--------------------+
|           Commodity|           year_list|
+--------------------+--------------------+
|       LIVE ANIMALS.|[2014, 2017, 2016...|
|                SILK|[2018, 2014, 2017...|
| MAN-MADE FILAMENTS.|[2014, 2015, 2016...|
|SHIPS, BOATS AND ...|[2014, 2018, 2013...|
|PROJECT GOODS; SO...|[2017, 2013, 2016...|
|ARMS AND AMMUNITI...|[2018, 2017, 2016...|
|PREPARATIONS OF M...|[2015, 2018, 2014...|
|OTHER MADE UP TEX...|[2014, 2018, 2017...|
|MISCELLANEOUS EDI...|[2013, 2016, 2018...|
|ARTICLES OF IRON ...|[2017, 2013, 2015...|
|ARTICLES OF LEATH...|[2014, 2017, 2016...|
|COPPER AND ARTICL...|[2017, 2015, 2016...|
|ARTICLES OF APPAR...|[2017, 2015, 2013...|
|OTHER VEGETABLE T...|[2014, 2018, 2017...|
|CARPETS AND OTHER...|[2013, 2014, 2016...|
|PREPARED FEATHERS...|[2016, 2014, 2015...|
|MISCELLANEOUS CHE...|[2017, 2013, 2014...|
|RAW HIDES AND SKI...|[2014, 2013, 2016...|
|MISCELLANEOUS ART...|[2015, 2014, 2013...|
|ALUMINIUM AND ART...|[2018, 201

In [88]:
longest_consecutive_year_import = year_import.withColumn('longest_consecutive_year_import', longest_consecutive_year_udf(F.col("year_list"))) \
                    .drop("year_list")
longest_consecutive_year_import.show()

+--------------------+-------------------------------+
|           Commodity|longest_consecutive_year_import|
+--------------------+-------------------------------+
|       LIVE ANIMALS.|                              9|
|                SILK|                              9|
| MAN-MADE FILAMENTS.|                              9|
|SHIPS, BOATS AND ...|                              9|
|PROJECT GOODS; SO...|                              9|
|ARMS AND AMMUNITI...|                              9|
|PREPARATIONS OF M...|                              9|
|OTHER MADE UP TEX...|                              9|
|MISCELLANEOUS EDI...|                              9|
|ARTICLES OF IRON ...|                              9|
|ARTICLES OF LEATH...|                              9|
|COPPER AND ARTICL...|                              9|
|ARTICLES OF APPAR...|                              9|
|OTHER VEGETABLE T...|                              9|
|CARPETS AND OTHER...|                              9|
|PREPARED 

In [89]:
new_df_import = new_df_import.join(longest_consecutive_year_import, "Commodity", "inner") 
new_df_import.show()

+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+
|           Commodity|  sum_value_import|nums_country_import|country_with_max_value_import|max_value_import|longest_consecutive_year_import|
+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+
|       LIVE ANIMALS.|149.68999999999994|                 90|                        U S A|            4.07|                              9|
|                SILK|2367.4499999999985|                 68|                   CHINA P RP|          358.36|                              9|
| MAN-MADE FILAMENTS.| 7126.429999999997|                114|                   CHINA P RP|          447.87|                              9|
|PROJECT GOODS; SO...| 39179.01999999996|                160|                   CHINA P RP|         5062.51|                              9|
|SHIPS, BOATS

### Processing in Export DataFrame

1. Total export value and number of countries export from

In [90]:
new_df_export = df_export.groupBy("Commodity") \
            .agg(F.sum("value").alias("sum_value_export"))
# print(new_df_export.count())
new_df_export.show()

+--------------------+------------------+
|           Commodity|  sum_value_export|
+--------------------+------------------+
|       LIVE ANIMALS.|386.19000000000005|
|                SILK|1432.0000000000002|
| MAN-MADE FILAMENTS.|20649.139999999996|
|PROJECT GOODS; SO...| 838.4200000000008|
|SHIPS, BOATS AND ...|43183.310000000056|
|ARMS AND AMMUNITI...| 728.0800000000011|
|PREPARATIONS OF M...|           2204.54|
|OTHER MADE UP TEX...|39782.380000000056|
|MISCELLANEOUS EDI...| 5178.789999999991|
|ARTICLES OF LEATH...|20343.830000000024|
|ARTICLES OF IRON ...|61659.530000000064|
|COPPER AND ARTICL...|24569.900000000016|
|ARTICLES OF APPAR...|62335.470000000016|
|OTHER VEGETABLE T...| 3534.729999999995|
|MISCELLANEOUS CHE...|28635.830000000005|
|CARPETS AND OTHER...|14488.630000000025|
|PREPARED FEATHERS...|2408.5899999999997|
|RAW HIDES AND SKI...| 9178.040000000014|
|MISCELLANEOUS ART...|4594.9999999999945|
|ALUMINIUM AND ART...| 25444.20000000004|
+--------------------+------------

In [91]:
tmp = df_export.dropDuplicates(["Commodity", "country"]) \
        .groupBy("Commodity") \
        .agg(F.count("country").alias("nums_country_export"))

new_df_export = new_df_export.join(tmp, "Commodity", "inner")
new_df_export.show()

+--------------------+------------------+-------------------+
|           Commodity|  sum_value_export|nums_country_export|
+--------------------+------------------+-------------------+
|       LIVE ANIMALS.|386.19000000000005|                 95|
|                SILK|1432.0000000000002|                175|
| MAN-MADE FILAMENTS.|20649.139999999996|                210|
|PROJECT GOODS; SO...| 838.4200000000008|                196|
|SHIPS, BOATS AND ...|43183.310000000056|                120|
|ARMS AND AMMUNITI...| 728.0800000000011|                135|
|PREPARATIONS OF M...|           2204.54|                135|
|OTHER MADE UP TEX...|39782.380000000056|                231|
|MISCELLANEOUS EDI...| 5178.789999999991|                205|
|ARTICLES OF LEATH...|20343.830000000024|                227|
|ARTICLES OF IRON ...|61659.530000000064|                229|
|COPPER AND ARTICL...|24569.900000000016|                212|
|ARTICLES OF APPAR...|62335.470000000016|                226|
|OTHER V

2. Max value and Country that this product has the highest value

In [92]:
from pyspark.sql import Window
tmp_export = df_export.withColumn("rank_value", F.rank().over(Window.partitionBy("Commodity").orderBy(F.col("value").desc()))) 
tmp_export = tmp_export.filter(tmp_export.rank_value == 1).select("Commodity", "country", "value") \
                .withColumnRenamed("value", "max_value_export") \
                .withColumnRenamed("country", "country_with_max_value_export")

tmp_export.show()

+--------------------+-----------------------------+----------------+
|           Commodity|country_with_max_value_export|max_value_export|
+--------------------+-----------------------------+----------------+
|ALUMINIUM AND ART...|                     MALAYSIA|          909.65|
|ARMS AND AMMUNITI...|                       FRANCE|           68.16|
|ARTICLES OF APPAR...|                  U ARAB EMTS|         2114.92|
|ARTICLES OF APPAR...|                        U S A|         2231.06|
|ARTICLES OF IRON ...|                        U S A|         1756.05|
|ARTICLES OF LEATH...|                        U S A|          570.26|
|ARTICLES OF STONE...|                        U S A|          538.83|
|CARPETS AND OTHER...|                        U S A|          925.08|
|            CEREALS.|                         IRAN|         1968.73|
|COPPER AND ARTICL...|                   CHINA P RP|         2019.93|
|ESSENTIAL OILS AN...|                        U S A|           320.0|
|        FERTILISERS

In [93]:
# # Note: Uppercase and lowercase when groupBy._df_export groupBy accept and auto convert flexible upper and lower while select column not accept.
# max_value_export = df_export.groupBy("Commodity").max("value").withColumnRenamed("max(value)", "max_value")
# tmp_export = df_export.join(max_value_export, "Commodity", "left")
# tmp_export = tmp_export.filter(tmp_export.value == tmp_export.max_value).select('Commodity', 'country', 'value') \
#                         .withColumnRenamed("country", "country_with_max_value_export") \
#                         .withColumnRenamed("value", "max_value_export")
# # print(tmp_export.count())
# tmp_export.sort("Commodity").show()

In [94]:
new_df_export = new_df_export.join(tmp_export, 'Commodity', 'inner')
new_df_export.show()

+--------------------+------------------+-------------------+-----------------------------+----------------+
|           Commodity|  sum_value_export|nums_country_export|country_with_max_value_export|max_value_export|
+--------------------+------------------+-------------------+-----------------------------+----------------+
|ALUMINIUM AND ART...| 25444.20000000004|                214|                     MALAYSIA|          909.65|
|ARMS AND AMMUNITI...| 728.0800000000011|                135|                       FRANCE|           68.16|
|ARTICLES OF APPAR...|62335.470000000016|                226|                  U ARAB EMTS|         2114.92|
|ARTICLES OF APPAR...| 75154.45999999999|                231|                        U S A|         2231.06|
|ARTICLES OF IRON ...|61659.530000000064|                229|                        U S A|         1756.05|
|ARTICLES OF LEATH...|20343.830000000024|                227|                        U S A|          570.26|
|ARTICLES OF STONE.

3. Longest Consecutive Year

In [95]:
year_export = df_export.select("Commodity", "year") \
                    .distinct() \
                    .groupBy("Commodity") \
                    .agg(F.collect_list('year')) \
                    .withColumnRenamed("collect_list(year)", "year_list")
year_export.show()

+--------------------+--------------------+
|           Commodity|           year_list|
+--------------------+--------------------+
|       LIVE ANIMALS.|[2017, 2016, 2018...|
|                SILK|[2018, 2017, 2015...|
| MAN-MADE FILAMENTS.|[2015, 2016, 2018...|
|PROJECT GOODS; SO...|[2017, 2016, 2018...|
|SHIPS, BOATS AND ...|[2018, 2016, 2015...|
|ARMS AND AMMUNITI...|[2018, 2017, 2016...|
|PREPARATIONS OF M...|[2015, 2018, 2016...|
|OTHER MADE UP TEX...|[2018, 2017, 2015...|
|MISCELLANEOUS EDI...|[2016, 2018, 2015...|
|ARTICLES OF IRON ...|[2017, 2015, 2016...|
|ARTICLES OF LEATH...|[2017, 2016, 2018...|
|COPPER AND ARTICL...|[2017, 2015, 2016...|
|ARTICLES OF APPAR...|[2017, 2015, 2016...|
|OTHER VEGETABLE T...|[2018, 2017, 2016...|
|PREPARED FEATHERS...|[2016, 2015, 2017...|
|CARPETS AND OTHER...|[2016, 2015, 2017...|
|MISCELLANEOUS CHE...|[2017, 2016, 2015...|
|RAW HIDES AND SKI...|[2016, 2017, 2015...|
|MISCELLANEOUS ART...|[2015, 2017, 2018...|
|ALUMINIUM AND ART...|[2018, 201

In [96]:
longest_consecutive_year_export = year_export.withColumn('longest_consecutive_year_export', longest_consecutive_year_udf(F.col("year_list"))) \
                    .drop("year_list")
longest_consecutive_year_export.show()


+--------------------+-------------------------------+
|           Commodity|longest_consecutive_year_export|
+--------------------+-------------------------------+
|       LIVE ANIMALS.|                              9|
|                SILK|                              9|
| MAN-MADE FILAMENTS.|                              9|
|PROJECT GOODS; SO...|                              9|
|SHIPS, BOATS AND ...|                              9|
|ARMS AND AMMUNITI...|                              9|
|PREPARATIONS OF M...|                              9|
|OTHER MADE UP TEX...|                              9|
|MISCELLANEOUS EDI...|                              9|
|ARTICLES OF IRON ...|                              9|
|ARTICLES OF LEATH...|                              9|
|COPPER AND ARTICL...|                              9|
|ARTICLES OF APPAR...|                              9|
|OTHER VEGETABLE T...|                              9|
|PREPARED FEATHERS...|                              9|
|CARPETS A

In [97]:
new_df_export = new_df_export.join(longest_consecutive_year_export, "Commodity", "inner") 
new_df_export.show()

                                                                                

+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+
|           Commodity|  sum_value_export|nums_country_export|country_with_max_value_export|max_value_export|longest_consecutive_year_export|
+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+
|ALUMINIUM AND ART...| 25444.20000000004|                214|                     MALAYSIA|          909.65|                              9|
|ARMS AND AMMUNITI...| 728.0800000000011|                135|                       FRANCE|           68.16|                              9|
|ARTICLES OF APPAR...|62335.470000000016|                226|                  U ARAB EMTS|         2114.92|                              9|
|ARTICLES OF APPAR...| 75154.45999999999|                231|                        U S A|         2231.06|                              9|
|ARTICLES OF 

### Combine 2 DataFrame: Import and Export

1. Make sure the list of commodity of import and export is the same

In [98]:
commodity_import = new_df_import.select("commodity")
commodity_export = new_df_export.select("commodity")
print(commodity_import.subtract(commodity_export).count())
print(commodity_export.subtract(commodity_import).count())
commodity_import.show()

                                                                                

0


                                                                                

0
+--------------------+
|           commodity|
+--------------------+
|       LIVE ANIMALS.|
|                SILK|
| MAN-MADE FILAMENTS.|
|PROJECT GOODS; SO...|
|SHIPS, BOATS AND ...|
|ARMS AND AMMUNITI...|
|PREPARATIONS OF M...|
|OTHER MADE UP TEX...|
|MISCELLANEOUS EDI...|
|ARTICLES OF IRON ...|
|ARTICLES OF LEATH...|
|COPPER AND ARTICL...|
|ARTICLES OF APPAR...|
|OTHER VEGETABLE T...|
|CARPETS AND OTHER...|
|MISCELLANEOUS CHE...|
|PREPARED FEATHERS...|
|RAW HIDES AND SKI...|
|MISCELLANEOUS ART...|
|ALUMINIUM AND ART...|
+--------------------+
only showing top 20 rows



Because subtract = 0 means the list of import is covered the list of export and vice versa. So 2 list is equivalent. We can do this by convert to set (hashmap) or sort it as the same order and compare one the whole dataframe.

2. Combine 2 DataFrame results

In [99]:
combined_df = new_df_import.join(new_df_export, "Commodity", "inner")
combined_df.show(truncate=True)

                                                                                

+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+
|           Commodity|  sum_value_import|nums_country_import|country_with_max_value_import|max_value_import|longest_consecutive_year_import|  sum_value_export|nums_country_export|country_with_max_value_export|max_value_export|longest_consecutive_year_export|
+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+
|ALUMINIUM AND ART...|32144.880000000034|                193|                   CHINA P RP|         1174.62|                              9| 25444.20000000004|                214|                     MALAYSIA|          909.

3. Calculate Balance: export - import

In [100]:
balance_df = combined_df.withColumn("balance", combined_df.sum_value_export - combined_df.sum_value_import)
balance_df.show(truncate=True)

                                                                                

+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+-------------------+
|           Commodity|  sum_value_import|nums_country_import|country_with_max_value_import|max_value_import|longest_consecutive_year_import|  sum_value_export|nums_country_export|country_with_max_value_export|max_value_export|longest_consecutive_year_export|            balance|
+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+-------------------+
|ALUMINIUM AND ART...|32144.880000000034|                193|                   CHINA P RP|         1174.62|                              9| 25444.20000000004|    

4. Write to CSV file

In [101]:
# Positive >= 0, Negative < 0
positive_balance_df = balance_df.filter(balance_df.balance >= 0)
print(positive_balance_df.count())
positive_balance_df.show(truncate=True)

                                                                                

49


                                                                                

+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+
|           Commodity|  sum_value_import|nums_country_import|country_with_max_value_import|max_value_import|longest_consecutive_year_import|  sum_value_export|nums_country_export|country_with_max_value_export|max_value_export|longest_consecutive_year_export|           balance|
+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+
|ARMS AND AMMUNITI...|326.37000000000023|                 52|                 SOUTH AFRICA|           41.67|                              9| 728.0800000000011|       

In [102]:
negative_balance_df = balance_df.filter(balance_df.balance < 0)
print(negative_balance_df.count())
negative_balance_df.show(truncate=False, vertical=True)

49


                                                                                

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Commodity                       | ALUMINIUM AND ARTICLES THEREOF.                                                                                                               
 sum_value_import                | 32144.880000000034                                                                                                                            
 nums_country_import             | 193                                                                                                                                           
 country_with_max_value_import   | CHINA P RP                                                                                                                                    
 max_value_import                | 1174.62                                                                    

In [103]:
# coalesce > repartition. Big data => Out of memory
# negative_balance_df.coalesce(1).write.csv("negativeTradeProduct", header=True)
import os, shutil
# Remove _SUCCESS, _SUCCESS.crc, ...csv.crc
def write_to_one_csv(df: pyspark.sql.dataframe.DataFrame, file_name: str):
    if os.path.exists(file_name):
        shutil.rmtree(file_name)
    # Repartition to 1 partition to write to one csv file => In a folder name as file_name
    df.coalesce(1).write.csv(file_name, header=True)
    # Copy the single csv file to outside
    for file in os.listdir(file_name):
        if file.endswith(".csv"):
            print(file)
            shutil.copy(file_name + "/" + file, file_name + '.csv')
            break
    # Remove the remain folder
    shutil.rmtree(file_name)
    

In [104]:
positive_balance_df.show()

                                                                                

+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+
|           Commodity|  sum_value_import|nums_country_import|country_with_max_value_import|max_value_import|longest_consecutive_year_import|  sum_value_export|nums_country_export|country_with_max_value_export|max_value_export|longest_consecutive_year_export|           balance|
+--------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+-------------------+-----------------------------+----------------+-------------------------------+------------------+
|ARMS AND AMMUNITI...|326.37000000000023|                 52|                 SOUTH AFRICA|           41.67|                              9| 728.0800000000011|       

In [105]:
write_to_one_csv(positive_balance_df, "data/positiveTradeProduct")

[Stage 1150:>                                                       (0 + 1) / 1]

part-00000-e2c09512-bf20-43b7-a82c-375304af294a-c000.csv


                                                                                

In [106]:
write_to_one_csv(negative_balance_df, "data/negativeTradeProduct")

[Stage 1188:>                                                       (0 + 1) / 1]

part-00000-8bc7e2c0-6418-4e31-961d-2c84a744715e-c000.csv


                                                                                

## Problem 2: Accumulated value

### Column format (There is a column year)

In [107]:
column_acc_import = df_import.withColumn("acc_value", F.expr("sum(value) over (partition by commodity, country order by year)")) \
                    .withColumn('acc_value', F.col("acc_value").cast("decimal(38,2)"))
print("Accumulated import value in column format")
column_acc_import.show()

Accumulated import value in column format




+------+--------------------+-----+---------------+----+----------+------------+---------+
|HSCode|           Commodity|value|        country|year|rank_value|country_rank|acc_value|
+------+--------------------+-----+---------------+----+----------+------------+---------+
|    88|AIRCRAFT, SPACECR...| 0.12|AFGHANISTAN TIS|2012|       474|        null|     0.12|
|    88|AIRCRAFT, SPACECR...| 0.37|AFGHANISTAN TIS|2013|       305|        null|     0.49|
|    88|AIRCRAFT, SPACECR...|  0.0|    AMERI SAMOA|2016|       624|        null|     0.00|
|    88|AIRCRAFT, SPACECR...| 0.37|    AMERI SAMOA|2017|       305|        null|     0.37|
|    88|AIRCRAFT, SPACECR...| 0.37|      ANTARTICA|2010|       305|        null|     0.37|
|    88|AIRCRAFT, SPACECR...| 2.88|      AUSTRALIA|2010|       190|        null|     2.88|
|    88|AIRCRAFT, SPACECR...| 1.11|      AUSTRALIA|2011|       237|        null|     3.99|
|    88|AIRCRAFT, SPACECR...|  0.9|      AUSTRALIA|2012|       255|        null|     4.89|

                                                                                

In [108]:
from pyspark.sql import Window
column_acc_export = df_export.withColumn("acc_value", F.sum("value").over(Window.partitionBy("Commodity", "country").orderBy("year"))) \
                    .withColumn('acc_value', F.col("acc_value").cast("decimal(38,2)"))
print("Accumulated export value in column format")
column_acc_export.show()

Accumulated export value in column format
+------+--------------------+-----+---------------+----+----------+------------+---------+
|HSCode|           Commodity|value|        country|year|rank_value|country_rank|acc_value|
+------+--------------------+-----+---------------+----+----------+------------+---------+
|    76|ALUMINIUM AND ART...| 3.51|AFGHANISTAN TIS|2010|       527|        null|     3.51|
|    76|ALUMINIUM AND ART...| 2.63|AFGHANISTAN TIS|2011|       578|        null|     6.14|
|    76|ALUMINIUM AND ART...|  3.6|AFGHANISTAN TIS|2012|       519|        null|     9.74|
|    76|ALUMINIUM AND ART...| 1.81|AFGHANISTAN TIS|2013|       664|        null|    11.55|
|    76|ALUMINIUM AND ART...| 5.01|AFGHANISTAN TIS|2014|       441|        null|    16.56|
|    76|ALUMINIUM AND ART...| 8.86|AFGHANISTAN TIS|2015|       354|        null|    25.42|
|    76|ALUMINIUM AND ART...|  9.4|AFGHANISTAN TIS|2016|       342|        null|    34.82|
|    76|ALUMINIUM AND ART...|40.56|AFGHANISTAN T

### Row format (Each year becomes a column name)

In [109]:
# Pivot function
row_acc_import = df_import.groupBy("Commodity", "country").pivot("year").sum("value").na.fill(0)

for i in range(2011, 2019):
        row_acc_import = row_acc_import.withColumn(str(i), F.col(str(i-1)) + F.col(str(i))) \
                        .withColumn(str(i), F.col(str(i)).cast("decimal(38,2)"))                                                                                                                                                                                
print("Accumulated import value in row format")
row_acc_import.show()

Accumulated import value in row format
+--------------------+---------------+----+-----+-----+-----+-----+-----+-----+-----+------+
|           Commodity|        country|2010| 2011| 2012| 2013| 2014| 2015| 2016| 2017|  2018|
+--------------------+---------------+----+-----+-----+-----+-----+-----+-----+-----+------+
|ELECTRICAL MACHIN...|    BAHARAIN IS|1.08| 3.79| 5.97| 6.92|10.01|10.19|11.40|13.91| 16.17|
|WOOD AND ARTICLES...|  NEW CALEDONIA| 0.0| 0.00| 0.20| 0.57| 0.57| 0.57| 0.57| 0.57|  0.57|
|RAW HIDES AND SKI...|BOSNIA-HRZGOVIN|0.01| 0.38| 0.38| 0.48| 0.85| 0.87| 0.88| 0.96|  0.96|
|            CEREALS.|VIETNAM SOC REP|0.05| 0.42| 0.43| 0.80| 0.80| 0.80| 0.80| 0.81|  0.89|
|PAPER AND PAPERBO...|        GEORGIA|3.16| 3.16| 3.53| 3.56| 3.93| 3.93| 4.22| 4.59|  4.96|
|TANNING OR DYEING...|         UGANDA| 0.0| 0.00| 0.00| 0.00| 0.00| 0.00| 0.00| 0.00|  0.37|
|MANUFACTURES OF S...|BOSNIA-HRZGOVIN| 0.0| 0.00| 0.00| 0.00| 0.00| 0.00| 0.02| 0.04|  0.41|
|ARTICLES OF STONE...|        L

In [110]:
row_acc_export = df_export.groupBy("Commodity", "country").pivot("year").sum("value").na.fill(0)

for i in range(2011, 2019):
        row_acc_export = row_acc_export.withColumn(str(i), F.col(str(i-1)) + F.col(str(i))) \
                        .withColumn(str(i), F.col(str(i)).cast("decimal(38,2)"))
print("Accumulated export value in row format")
row_acc_export.show()

Accumulated export value in row format




+--------------------+---------------+------+------+------+------+-------+-------+-------+-------+-------+
|           Commodity|        country|  2010|  2011|  2012|  2013|   2014|   2015|   2016|   2017|   2018|
+--------------------+---------------+------+------+------+------+-------+-------+-------+-------+-------+
|INORGANIC CHEMICA...|       BARBADOS|  0.03|  0.07|  0.08|  0.09|   0.10|   0.11|   0.12|   0.13|   0.15|
|FISH AND CRUSTACE...|        COMOROS|   0.0|  0.00|  0.00|  0.00|   0.00|   0.04|   0.26|   0.49|   0.67|
|ARTICLES OF LEATH...|        CROATIA|   1.0|  2.16|  3.30|  4.23|   5.20|   5.83|   6.72|   7.68|   8.37|
|ARTICLES OF LEATH...|   KIRIBATI REP|   0.0|  0.00|  0.00|  0.00|   0.00|   0.00|   0.00|   0.36|   0.36|
|ELECTRICAL MACHIN...|    BAHARAIN IS| 24.56| 54.40| 87.21|116.62| 141.95| 165.67| 188.57| 225.65| 259.75|
|TOOLS IMPLEMENTS,...|      MAURITIUS|  0.37|  0.64|  0.79|  0.91|   1.20|   1.56|   1.91|   2.53|   3.13|
|             COTTON.|          SYRIA

                                                                                

In [111]:
# Run main Spark UI at localhost:8081
# Run Spark UI at localhost:4040 or localhost:4041 when running in a cluster
spark.stop()