In [124]:
#importing necessary libraries and starting spark session
from os import environ
import yaml
import findspark
findspark.init()
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName('solutions_final')\
        .config('spark.driver.extraClassPath','/usr/lib/jvm/java-11-openjdk-amd64/lib/postgresql-42.6.0.jar')\
        .getOrCreate()


In [125]:
# Define the path to your YAML file
yaml_file_path = 'config.yaml'

# Read the YAML file and parse it into a Python dictionary
with open(yaml_file_path, 'r') as yaml_file:
    config = yaml.safe_load(yaml_file)


In [106]:
#importing necessary funcitons from spark
from pyspark.sql.functions import col,row_number, sum ,format_number,desc,year,month,lag,when ,cast,avg,substring,lit, length,expr,concat_ws, to_date,udf
from pyspark.sql.window import Window
import pyspark.sql.functions as f



## CLEANING PART


In [108]:
#reading csv from local folder

data_path = "./data/revised_final.csv"  
df = spark.read.csv(data_path, header=True, inferSchema=True)
#loading goods_classifiation 
data_path = "./data/goods_classification.csv"  
df_goods = spark.read.csv(data_path, header=True, inferSchema=True)

data_path = "./data/country_classification.csv"  
df_country = spark.read.csv(data_path, header=True, inferSchema=True)
data_path = "./data/services_classification.csv"  
df_services = spark.read.csv(data_path, header=True, inferSchema=True)


                                                                                

In [109]:
##exploring the main df
df.show()
df.printSchema()
df.count()

+--------+-------+----+------------+------------+-------------+------+
|time_ref|account|code|country_code|product_type|        value|status|
+--------+-------+----+------------+------------+-------------+------+
|  202306|Exports|  00|          AE|       Goods| 2.82028909E8|     F|
|  202306|Exports|  00|          AG|       Goods|     351919.0|     F|
|  202306|Exports|  00|          AI|       Goods|      84762.0|     F|
|  202306|Exports|  00|          AL|       Goods|       3463.0|     F|
|  202306|Exports|  00|          AM|       Goods|     679586.0|     F|
|  202306|Exports|  00|          AO|       Goods|     464583.0|     F|
|  202306|Exports|  00|          AR|       Goods|    5943055.0|     F|
|  202306|Exports|  00|          AS|       Goods|  1.1622992E7|     F|
|  202306|Exports|  00|          AT|       Goods|  1.1202334E7|     F|
|  202306|Exports|  00|          AU|       Goods|2.272665701E9|     F|
|  202306|Exports|  00|          AW|       Goods|     362396.0|     F|
|  202

2111267

In [110]:
#changing time_ref column to string appeding month and changing it into date format 

df = df.withColumn("time_ref", col("time_ref").cast("string"))

df = df.withColumn("time_ref", concat_ws("/", substring(col("time_ref"), 1, 4), substring(col("time_ref"), 5, 2), lit('30')))
df = df.withColumn("time_ref", to_date(col("time_ref"), "yyyy/MM/dd"))



In [111]:

## filtering unnecessary data and  changing the length of goods column for ease in join

df = df.filter(df['country_code'] != 'TOT')

df = df.filter(df['country_code'] != 'TOT (BoP basis)')

df = df.filter(df['country_code'] != 'TOT (OMT FOB)')
df = df.filter(df['country_code'] != 'TOT (OMT CIF)')
df = df.filter(df['country_code'] != 'TOT (OMT VFD)')



df = df.withColumn("code",
    when((col("product_type") == "Goods") & (length(col("code")) == 4), substring(col("code"), 1, 2))
    .when((col("product_type") == "Goods") & (length(col("code")) == 3), substring(col("code"), 1, 1))
    .otherwise(col("code")))

In [112]:
## renaming column from df_goods for ease 

df_goods = df_goods.withColumnRenamed('Level_1','code')\
        .withColumnRenamed('Level_1_desc','goods_category')

In [113]:
#writing file in parquet format in local  
df.write.format("parquet")\
    .mode('overwrite')\
    .options(compression="snappy")\
    .save("./data_cleaned/revised_final_cleaned/cleaned_revised.parquet")



df_goods.coalesce(1).write.format("parquet").mode("overwrite").\
options(compression='snappy').\
save("./data_cleaned/goods_cleaned/cleaned_goods.parquet")
df_services.coalesce(1).write.format("parquet").mode("overwrite").\
options(compression='snappy').\
save("./data_cleaned/services_cleaned/cleaned_sevices.parquet")
df_country.coalesce(1).write.format("parquet").mode("overwrite").\
options(compression='snappy').\
save("./data_cleaned/countries_cleaned/cleaned_countries.parquet")


23/09/13 14:05:07 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [114]:
#writing files to the prostgres table 

df.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres',driver = 'org.postgresql.Driver', dbtable = 'cleaned_data', user=config['postgres']["user"],password=config['postgres']["password"] ).mode('overwrite')
df_goods.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres',driver = 'org.postgresql.Driver', dbtable = 'cleaned_goods', user=config['postgres']["user"],password=pg_password ).mode('overwrite')
df_services.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres',driver = 'org.postgresql.Driver', dbtable = 'cleaned_services', user=config['postgres']["user"],password=pg_password ).mode('overwrite')
df_country.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres',driver = 'org.postgresql.Driver', dbtable = 'cleaned_countries', user=config['postgres']["user"],password=pg_password ).mode('overwrite')

                                                                                

## SOLUTIONS PART 

In [115]:
#reading the cleaned tables from the postgres database

jdbc_url = "jdbc:postgresql://localhost:5432/postgres"
connection_properties = {
    "user": pg_user,
    "password": pg_password,
    "driver": "org.postgresql.Driver"
}
table_name = "cleaned_data"
table_goods = "cleaned_goods"
table_services = "cleaned_services"
table_countries = "cleaned_countries"

df = spark.read \
    .jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
df_goods = spark.read \
    .jdbc(url=jdbc_url, table=table_goods, properties=connection_properties)

df_services = spark.read \
    .jdbc(url=jdbc_url, table=table_services, properties=connection_properties)

df_countries = spark.read \
    .jdbc(url=jdbc_url, table=table_countries, properties=connection_properties)

### Question 1:
#### Trend analysis of the two countries which has the highest transactIons with new zealand. Increasing and decreasing trends of the sum of the transaction in each quarter month.

In [116]:
# caching for faster loads
# only cached the main df because that is the largest file

df.cache()


DataFrame[time_ref: date, account: string, code: string, country_code: string, product_type: string, value: double, status: string]

In [117]:
## top 2 countries with the highest number of transactions 
df_highest = df.groupBy('country_code').agg(sum(col('value')).alias('sum($)'))
df_highest= df_highest.orderBy(desc(col('sum($)')))
df_highest = df_highest.withColumn('sum($)', format_number(col('sum($)'),2)).limit(2)
print("the top 2 countries are : ",df_highest.collect()[0][0],":",df_highest.collect()[0][1],"and" , df_highest.collect()[1][0],':',df_highest.collect()[1][1] )

## only taking the transaction of the top 2 countires (filter first)
df_cn = df.filter(col('country_code')=='CN')
df_au = df.filter(col('country_code')=='AU')

##renaming columns
columns = df_au.columns
for i in range(len(columns)):
    df_au = df_au.withColumnRenamed(columns[i], columns[i]+'_au')

## combining both countries transactions horizontally
df_joined = df_cn.join(df_au,df_cn['time_ref']==df_au['time_ref_au'],'inner')

#extracting month and year 
df_joined = df_joined.withColumn("year", year("time_ref")).withColumn("month", month("time_ref"))\
                     .withColumnRenamed('value','value($)')\
                     .withColumnRenamed('value_au','value($)_au')


## selecting only the required columns 
df_joined_req_col = df_joined.select('year','month','value($)','value($)_au')



df_joined_req_col = df_joined_req_col.groupBy('year','month')\
                        .agg(sum(col('value($)')).alias('sum($)'),sum(col('value($)_au')).alias('sum($)_au'))


#window
window_spec = Window.orderBy("year","month")

df_joined_req_col = df_joined_req_col.withColumn('lag($)',lag(col('sum($)'),1).over(window_spec))\
                                     .withColumn('lag_au($)',lag(col('sum($)_au'),1).over(window_spec))
df_joined_req_col = df_joined_req_col.na.drop()

#difference and trend analysis
df_final = df_joined_req_col.withColumn('difference($)',col('sum($)')-col('lag($)'))\
                            .withColumn('difference_au($)',col('sum($)_au')-col('lag_au($)'))


df_final = df_final.withColumn('change', when(col('difference($)')>0,"Increase").otherwise('Decrease'))\
                    .withColumn('change_au',when(col('difference_au($)')>0,'Increase').otherwise('Decrease'))


df_final_formatted = df_final.select('*')

## to show in human readable form
for i in range(len(df_final.columns)-4):
    df_final_formatted = df_final_formatted.withColumn(df_final.columns[i+2],format_number(col(df_final.columns[i+2]),0))

df_final_formatted.show()




                                                                                

the top 2 countries are :  CN : 847,920,192,569.20 and AU : 761,016,231,517.13


23/09/13 14:05:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:29 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 1

+----+-----+------------------+------------------+------------------+------------------+-------------------+------------------+--------+---------+
|year|month|            sum($)|         sum($)_au|            lag($)|         lag_au($)|      difference($)|  difference_au($)|  change|change_au|
+----+-----+------------------+------------------+------------------+------------------+-------------------+------------------+--------+---------+
|2014|    9|30,149,345,938,005|33,248,377,325,064|32,078,321,144,240|30,005,845,954,049| -1,928,975,206,235| 3,242,531,371,015|Decrease| Increase|
|2014|   12|37,084,530,234,212|35,309,257,882,759|30,149,345,938,005|33,248,377,325,064|  6,935,184,296,207| 2,060,880,557,695|Increase| Increase|
|2015|    3|35,467,973,908,136|31,201,448,464,812|37,084,530,234,212|35,309,257,882,759| -1,616,556,326,076|-4,107,809,417,947|Decrease| Decrease|
|2015|    6|33,182,651,329,464|31,174,360,965,062|35,467,973,908,136|31,201,448,464,812| -2,285,322,578,672|   -27,087

23/09/13 14:05:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [118]:
#writing the table to postgres 
df_final.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres',driver = 'org.postgresql.Driver', dbtable = 'task1', user='postgres',password='postgres').mode('overwrite').save()


23/09/13 14:05:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 14:05:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/09/13 1

### Question 2 : 
#### TO calculate the most sold(Exported) good to each country in each year from NewZealand and  Find how deviated is it from the mean transaction  of that particular good. 


In [119]:

#finding just the sold goods
df_sold = df.filter(col('account')=='Exports')
df_sold = df_sold.filter(col('product_type')=='Goods')

#extracting year
df_each_year = df_sold.withColumn('year', year('time_ref'))


## finding out which country has imported max sum of  good in a year
df_result =df_each_year.groupBy('country_code','year').agg(f.max(col('value')).alias('value'))

#joining the main table with teh grouped one to find out which country imported how much in a particular year along with other details
df_joined = df_each_year.join(df_result , on = ['country_code','value','year'] , how = 'inner')
df_joined  = df_joined.dropDuplicates()
df_joined = df_joined.filter(col('code')!=00)
df_joined = df_joined.orderBy('country_code','year')


## to find out the mean (average value) of each category of goods. 
df_goods_avg = df_each_year.groupBy('code').agg(avg(col('value')).alias('mean_value_goods'))
df_goods_avg = df_goods_avg.withColumn('mean_value_goods', f.round(col('mean_value_goods'),2))

df_goods_unique = df_goods.groupBy('goods_category').agg(f.max('code').alias('code'))
df_joined_avg = df_goods_avg.join(df_goods_unique , on = 'code', how='inner')



## joining the average for each good and previous table

df_joined_final =  df_joined.join(df_joined_avg, on = 'code', how = 'inner')



##percent deviation from mean 

df_deviation = df_joined_final.withColumn('deviation from mean', f.round(col('value')-col('mean_value_goods'),2))

df_deviation.select('year','country_code','value','mean_value_goods','goods_category','deviation from mean').orderBy('year').show()

[Stage 57:>                                                         (0 + 1) / 1]

+----+------------+---------+----------------+--------------------+-------------------+
|year|country_code|    value|mean_value_goods|      goods_category|deviation from mean|
+----+------------+---------+----------------+--------------------+-------------------+
|2014|          VA|  44817.0|       136561.55|Apparel and cloth...|          -91744.55|
|2014|          MW|  36363.0|       542332.18|Optical, photogra...|         -505969.18|
|2014|          PS|  96501.0|       701522.83|Vegetables and ce...|         -605021.83|
|2014|          LI|   1571.0|       542332.18|Optical, photogra...|         -540761.18|
|2014|          CV|   1447.0|       333540.86|Electrical machin...|         -332093.86|
|2014|          SO|   8400.0|       542332.18|Optical, photogra...|         -533932.18|
|2014|          CC|  11088.0|      2167770.96|Natural, cultured...|        -2156682.96|
|2014|          GL|   3344.0|        65929.13|Arms and ammuniti...|          -62585.13|
|2014|          ZB|    6.0E7|   

                                                                                

In [120]:
##writing thi dataframe to postgres
df_deviation.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/postgres',driver = 'org.postgresql.Driver', dbtable = 'task2', user='postgres',password='postgres').mode('overwrite').save()


In [121]:
#stopping the spark session
spark.stop()