In [18]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local[*]").getOrCreate()


In [21]:
#load database
df = spark.read.csv("dataset/commodity_trade_statistics_data.csv", header = True, inferSchema = True)
df.show

+---------------+----+---------+--------------------+------+---------+---------+---------------+-----------+---------------+
|country_or_area|year|comm_code|           commodity|  flow|trade_usd|weight_kg|  quantity_name|   quantity|       category|
+---------------+----+---------+--------------------+------+---------+---------+---------------+-----------+---------------+
|    Afghanistan|2016|   010410|         Sheep, live|Export|     6088|     2339|Number of items|       51.0|01_live_animals|
|    Afghanistan|2016|   010420|         Goats, live|Export|     3958|      984|Number of items|       53.0|01_live_animals|
|    Afghanistan|2008|   010210|Bovine animals, l...|Import|  1026804|      272|Number of items|     3769.0|01_live_animals|
|        Albania|2016|   010290|Bovine animals, l...|Import|  2414533|  1114023|Number of items|     6853.0|01_live_animals|
|        Albania|2016|   010392|Swine, live excep...|Import| 14265937|  9484953|Number of items|    96040.0|01_live_animals|


In [22]:
df.printSchema()

root
 |-- country_or_area: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- comm_code: string (nullable = true)
 |-- commodity: string (nullable = true)
 |-- flow: string (nullable = true)
 |-- trade_usd: long (nullable = true)
 |-- weight_kg: long (nullable = true)
 |-- quantity_name: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- category: string (nullable = true)



In [25]:
#select
df.select("country_or_area", "commodity").show()

+---------------+--------------------+
|country_or_area|           commodity|
+---------------+--------------------+
|    Afghanistan|         Sheep, live|
|    Afghanistan|         Goats, live|
|    Afghanistan|Bovine animals, l...|
|        Albania|Bovine animals, l...|
|        Albania|Swine, live excep...|
|        Albania|Fowls, live domes...|
|        Albania|Fowls, live domes...|
|        Albania|Poultry, live exc...|
|        Albania|Fowls, live domes...|
|        Albania|Poultry, live exc...|
|        Albania|Animals, live, ex...|
|        Albania|Bovine animals, l...|
|        Albania|Swine, live excep...|
|        Albania|Swine, live excep...|
|        Albania|Fowls, live domes...|
|        Albania|Poultry, live exc...|
|        Albania|Fowls, live domes...|
|        Albania|Poultry, live exc...|
|        Albania|Animals, live, ex...|
|        Albania|Bovine animals, l...|
+---------------+--------------------+
only showing top 20 rows



In [31]:
#where
df.where(df.trade_usd > 1000000).show()

+---------------+----+---------+--------------------+------+---------+---------+---------------+---------+---------------+
|country_or_area|year|comm_code|           commodity|  flow|trade_usd|weight_kg|  quantity_name| quantity|       category|
+---------------+----+---------+--------------------+------+---------+---------+---------------+---------+---------------+
|    Afghanistan|2008|   010210|Bovine animals, l...|Import|  1026804|      272|Number of items|   3769.0|01_live_animals|
|        Albania|2016|   010290|Bovine animals, l...|Import|  2414533|  1114023|Number of items|   6853.0|01_live_animals|
|        Albania|2016|   010392|Swine, live excep...|Import| 14265937|  9484953|Number of items|  96040.0|01_live_animals|
|        Albania|2016|   010511|Fowls, live domes...|Import|  2671732|   254652|Number of items|5629138.0|01_live_animals|
|        Albania|2016|   010591|Fowls, live domes...|Import|  2421513|  1926850|Number of items|1006990.0|01_live_animals|
|        Albania

In [30]:
df.where(df.country_or_area == "Indonesia").show()

+---------------+----+---------+--------------------+------+---------+---------+---------------+--------+---------------+
|country_or_area|year|comm_code|           commodity|  flow|trade_usd|weight_kg|  quantity_name|quantity|       category|
+---------------+----+---------+--------------------+------+---------+---------+---------------+--------+---------------+
|      Indonesia|2016|   010119|Horses, live exce...|Import|   580194|    30941|Number of items|    38.0|01_live_animals|
|      Indonesia|2016|   010120|Asses, mules and ...|Import|     1525|     1162|    No Quantity|    null|01_live_animals|
|      Indonesia|2016|   010210|Bovine animals, l...|Import|  2313879|   488410|Number of items|  1121.0|01_live_animals|
|      Indonesia|2016|   010290|Bovine animals, l...|Import|599149354|195275703|Number of items|654129.0|01_live_animals|
|      Indonesia|2016|   010310|Swine, live pure-...|Export|     5500|     1760|Number of items|    11.0|01_live_animals|
|      Indonesia|2016|  

In [28]:
#group by
df.groupBy("flow").agg({
    'trade_usd':'avg'
}).show()

+---------+--------------------+
|     flow|      avg(trade_usd)|
+---------+--------------------+
|   Import| 8.006771839206687E7|
|   Export| 1.321502995639191E8|
|Re-Export| 3.918781493411192E7|
|Re-Import|1.2326870310037987E7|
|     null|                null|
+---------+--------------------+



In [37]:
#select
df.select("country_or_area").distinct().show()

+------------------+
|   country_or_area|
+------------------+
|     Côte d'Ivoire|
|              Chad|
|   Rep. of Moldova|
|          Anguilla|
|          Paraguay|
|             Yemen|
|State of Palestine|
|           Senegal|
|        Cabo Verde|
|            Sweden|
|          Kiribati|
|         Fmr Sudan|
|         Cook Isds|
|            Guyana|
|           Eritrea|
|       Philippines|
|          Djibouti|
|             Tonga|
|          Malaysia|
|         Singapore|
+------------------+
only showing top 20 rows

