In [99]:
#Importing necessary libraries, initiating and starting the spark session 
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("solutions").getOrCreate()

In [100]:
#importing some frequently used functions
from pyspark.sql.functions import sum , col ,length, year,month,to_date,desc,asc,when,lit

In [101]:
#load datasets 

data_path = "./data/revised_final_data1.csv"  
df = spark.read.csv(data_path, header=True, inferSchema=True)

data_path = "./data/goods_classification.csv"  
df_goods = spark.read.csv(data_path, header=True, inferSchema=True)

data_path = "./data/country_classification.csv"  
df_country = spark.read.csv(data_path, header=True, inferSchema=True)
data_path = "./data/services_classification.csv"  
df_services = spark.read.csv(data_path, header=True, inferSchema=True)



In [102]:
df.printSchema()

root
 |-- time_ref: date (nullable = true)
 |-- account: string (nullable = true)
 |-- code: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- value: double (nullable = true)
 |-- status: string (nullable = true)



In [103]:
#this has been done for the ease of calculation 
df = df.withColumn('value', col('value').cast('int'))

<h2>Question1 : Calculate the total export value for each country in the table. Show the top 10 countries with the highest total export values.
 </h2>

In [104]:
# here, from the main table , we calulate sum of export values for each countries.
# we then order those values on the basis of the total_export_value 
# then we show the table

result = df.where(col('account')=='Exports').groupBy('country_code').agg(sum('value').alias('total_export_value'))
result = result.join(df_country,result['country_code']==df_country['country_code'],'inner')
result = result.select('country_label',df['country_code'],'total_export_value')
result = result.orderBy(result.total_export_value.desc()).limit(10)
result.show()

+--------------------+------------+------------------+
|       country_label|country_code|total_export_value|
+--------------------+------------+------------------+
|China, People's R...|          CN|      168541418942|
|           Australia|          AU|      124044791511|
|United States of ...|          US|      106261562532|
|               Japan|          JP|       40647876106|
|  Korea, Republic of|          KR|       24767856475|
|      United Kingdom|          GB|       22290125277|
|           Singapore|          SG|       18358723868|
|           Indonesia|          ID|       15861041776|
|              Taiwan|          TW|       15548494077|
|Hong Kong (Specia...|          HK|       12708136935|
+--------------------+------------+------------------+



## Question 2 : Find the year-month and the name of the good which has been least imported. Show bottom 5 . 


In [105]:
# firstly , we extract the month and year column from the dataframe
# we join that dataframe to the df_goods to find about goods 
# then , we see finally operate using just the imports .

df_date = df.withColumn('year', year('time_ref'))\
            .withColumn('month',month('time_ref'))
result = df_date.join(df_goods,df['code']==df_goods['Level_1'],'inner')

result = result.where(col('account') =='Imports')\
                .groupBy('year','month','Level_1_desc').agg(sum('value').alias('total_import_value'))
result = result.orderBy(asc('total_import_value'))\
               .limit(3)
result.show()


+----+-----+--------------------+------------------+
|year|month|        Level_1_desc|total_import_value|
+----+-----+--------------------+------------------+
|2022|   12|Vegetable plaitin...|            647026|
|2021|    9|Vegetable plaitin...|            661692|
|2020|   12|Vegetable plaitin...|            764698|
+----+-----+--------------------+------------------+



## Question 3: Calculate the moving average of the total transactions quarter yearly for each year . 

In [106]:
from pyspark.sql.window import Window
from pyspark.sql.functions import avg

df_date = df.withColumn('year', year('time_ref'))\
            .withColumn('month',month('time_ref'))
df_date = df_date.groupby('year','month').agg(avg(col('value')).alias('average_quarter_yearly'))
window_spec = Window.partitionBy().orderBy(asc('year'),asc('month')).rowsBetween(-1, 0)
result = df_date.withColumn('moving_average', avg('average_quarter_yearly').over(window_spec))
result.show()


23/09/06 13:56:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 13:56:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 13:56:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+----+-----+----------------------+------------------+
|year|month|average_quarter_yearly|    moving_average|
+----+-----+----------------------+------------------+
|2020|    6|     2702956.963312896| 2702956.963312896|
|2020|    9|    1943676.5140735754|2323316.7386932354|
|2020|   12|    2069752.2486702127| 2006714.381371894|
|2021|    3|    2049365.6202395784|2059558.9344548956|
|2021|    6|    2296070.1126439786|2172717.8664417784|
|2021|    9|    2299967.2109367712|2298018.6617903747|
|2021|   12|     2463816.563425926|2381891.8871813486|
|2022|    3|    2483905.2814483894|2473860.9224371575|
|2022|    6|    2692141.3752504727| 2588023.328349431|
|2022|    9|    2846082.0266207154| 2769111.700935594|
|2022|   12|    2959018.1771503347| 2902550.101885525|
|2023|    3|     2896694.179212157| 2927856.178181246|
|2023|    6|     2826342.530288408|2861518.3547502826|
+----+-----+----------------------+------------------+



23/09/06 13:56:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 13:56:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 13:56:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/06 13:56:10 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


## Question 4 : Create an udf to find the trade deficit percent with each  country for  this dataset of new zealand

In [107]:
#here, we use a udf to make a fucntion which calculates the trade deficit percentsage.
#the necessary details are figured out above


from pyspark.sql.types import FloatType,IntegerType
from pyspark.sql.functions import udf


df_imp = df.where(col('account')=='Imports')
df_imp = df_imp.groupBy('country_code').agg(sum('value').alias('Import_sum'))
df_exp  = df.where(col('account')=='Exports')
df_exp = df_exp.groupBy(col('country_code')).agg(sum('value').alias('Export_sum'))
df_joined = df_imp.join(df_exp,df_imp['country_code']==df_exp['country_code'])

df_joined = df_joined.withColumn("Trade_Deficit", df_joined["Import_sum"] - df_joined["Export_sum"])
trade_balance = df_joined.select(sum("Trade_Deficit")).collect()[0][0]

#udf
def deficit_percent(trade_deficit):
    return (trade_deficit/trade_balance)*100

#register
percent_udf = udf(deficit_percent,FloatType())



df_joined = df_joined.withColumn('Deficit_percent',percent_udf(df_joined['Trade_Deficit']))
df_joined.show(500)




+--------------------+------------+--------------------+------------+-------------+---------------+
|        country_code|  Import_sum|        country_code|  Export_sum|Trade_Deficit|Deficit_percent|
+--------------------+------------+--------------------+------------+-------------+---------------+
|                  DZ|   159052484|                  DZ|  5908157791|  -5749105307|      -10.15159|
|                  LT|   360064076|                  LT|   185575927|    174488149|      0.3081057|
|                  MM|   107831191|                  MM|   480284366|   -372453175|    -0.65766615|
|                  CI|    16076381|                  CI|   318535372|   -302458991|     -0.5340726|
|                  TC|     2180495|                  TC|     3770576|     -1590081|  -0.0028077152|
|                  AZ|     5882682|                  AZ|   210357444|   -204474762|    -0.36105514|
|                  FI|  1752053062|                  FI|   235221770|   1516831292|       2.678373|


## Question 5: calculate the average transactions of imports and exports in a year combined in a table and show it in columns. 

In [108]:
#here we find the average transaction of each country and use pivot to have a look at it for each year.
df_date = df.withColumn('year', year('time_ref'))
df_date = df_date.groupby('country_code').pivot('year').agg({'value':'avg'})
df_date.show(100)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|        country_code|                2020|                2021|                2022|                2023|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|                  DZ| 1.050868217142857E7|  7568666.4015748035|2.0291455663934425E7| 3.643892882692308E7|
|                  LT|   88997.74813895782|  134517.66968011126|  148898.63609898108|  123389.72845528455|
|                  MM|   457271.4542682927|   431525.1038461538|  295658.89318600367|           163675.96|
|                  TC|             31224.2|   27754.37037037037|   37051.71428571428|   65172.27777777778|
|                  CI|    850592.782051282|   864866.9108910891|   954465.3407407408|   765604.9558823529|
|                  SC|  30626.930693069306|   44991.71351351351|  55985.956043956045|   47111.80582524272|
|                  PM|              7

## Question 6 : Find out which service  each country has imported the most from new zealand. 

In [109]:
#firstly , we take only those values which are in import account 
#we join the services table and  the main table to use the service_label column
#we group the data on the basis of countries and service labels and calculate the sum of the services 
#after this , we join this table to the country table to get the names of the country and order them on the basis of services imported.
# lastly , we select the country and the stype of service they import the most

# df.show()

df_imp = df.where(col('account')=='Imports')
# df_imp.show()
df_imp_services = df_imp.join(df_services, df_imp['code']==df_services['code'],'inner')
# df_imp_services.show()
df_imp_services = df_imp_services.groupby('country_code','service_label').agg(sum(col('value')).alias('Total_import'))
# df_imp_goods.show()

df_imp_services_country  = df_imp_services.join(df_country,df_imp_services['country_code']==df_country['country_code']).orderBy(desc('Total_import'))


df_imp_services_country.select(col('country_label').alias('Country'),col('service_label').alias('Service'))\
                    .show()

+--------------------+--------------------+
|             Country|             Service|
+--------------------+--------------------+
|           Australia|            Services|
|           Singapore|            Services|
|United States of ...|            Services|
|          Not Stated|            Services|
|           Singapore|      Transportation|
|           Australia|Other business se...|
|         Switzerland|            Services|
|             Denmark|            Services|
|             Denmark|      Transportation|
|             Denmark|       Sea transport|
|             Denmark|             Freight|
|           Australia|Technical, trade-...|
|         Switzerland|      Transportation|
|      United Kingdom|            Services|
|           Australia|Telecommunication...|
|         Switzerland|             Freight|
|         Switzerland|       Sea transport|
|           Australia|         Reinsurance|
|           Australia|Insurance and pen...|
|           Singapore|       Sea

## Question 7 : Find out which  good is related to animal product or not , and find out how many number of transactions(both import and export) of animal related products and non-animal related products


In [110]:
#here, we have used just the Goods category and then joined it to the goods table 
#the descripetion of goods is then searched for the regex pattern that we have provided below 
#on the basis of that , we have classified the animal category into 1 and non-animal into 0 
#we have extracted count of each transaction for each country related to animal products. 


df_good = df.where(col('product_type')=='Goods')
df_good_desc = df_good.join(df_goods,df_good['code']==df_goods['Level_1'],'inner')
result_df = df_good_desc.withColumn("contains_animal_product",
                         when(col("Level_1_desc").rlike("(?i)(?:animal|meat|fish)"), lit(1))
                         .otherwise(lit(0)))
result_df.select('country_code','Level_1_desc','contains_animal_product').show()
result_df_country = result_df.groupBy('country_code').agg(sum('contains_animal_product').alias('Transaction_animal')).orderBy(desc('Transaction_animal'))
result_df_country.show()





+------------+-------------+-----------------------+
|country_code| Level_1_desc|contains_animal_product|
+------------+-------------+-----------------------+
|          AR|Animals; live|                      1|
|          AR|Animals; live|                      1|
|          AR|Animals; live|                      1|
|          AR|Animals; live|                      1|
|          AR|Animals; live|                      1|
|          AR|Animals; live|                      1|
|          AU|Animals; live|                      1|
|          AU|Animals; live|                      1|
|          AU|Animals; live|                      1|
|          AU|Animals; live|                      1|
|          AU|Animals; live|                      1|
|          AU|Animals; live|                      1|
|          CA|Animals; live|                      1|
|          CA|Animals; live|                      1|
|          CA|Animals; live|                      1|
|          CA|Animals; live|                  



+------------+------------------+
|country_code|Transaction_animal|
+------------+------------------+
|          AU|              2425|
|          US|              2395|
|          CN|              2339|
|          GB|              2315|
|          JP|              2296|
|          NL|              2266|
|          CA|              2173|
|          DE|              2164|
|          KR|              2155|
|          TH|              2142|
|          TW|              2110|
|          IT|              2026|
|          MY|              1987|
|          HK|              1970|
|          IN|              1960|
|          FJ|              1949|
|          VN|              1912|
|          ID|              1898|
|          DK|              1890|
|          FR|              1862|
+------------+------------------+
only showing top 20 rows



                                                                                