In [4]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [6]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
#for MaxRows
#spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', True)
#To Enable the Spark Rows Show
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
#spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better


In [41]:
from pyspark.sql import functions as F
from functools import reduce
from operator import add
from pyspark.sql.functions import col
from pyspark.sql import DataFrame
import plotly.express as px
import pandas as pd
import numpy as np

In [8]:
df_imports = spark.read.csv("./imports.csv",inferSchema=True,header=True)
df_exports = spark.read.csv("./exports.csv",inferSchema=True,header=True)
df_bal = spark.read.csv("./balance.csv",inferSchema=True,header=True)

In [44]:
df_imports.select("*").describe().show()
df_exports.select("*").describe().show()
df_bal.select("*").describe().show()
df_imports.printSchema()
df_imports.show(5)

+-------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|    Partner|               2002|                2003|                2004|                2005|                2006|                2007|                2008|                2009|                2010|                2011|                2012|                2013|                2014|                2015|                2016|
+-------+-----------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------

In [42]:
#Import vs Export with a certain country Bar Plot
def importvsExport(country):
  cols_list = df_imports.columns[1:]
  expression = '+'.join(cols_list)
  one = df_imports[df_imports['Partner'] == country]
  df_i = one.withColumn("Total_Import" ,reduce(add, [col(x) for x in one.columns[1:]]))
  two = df_exports[df_exports['Partner'] == country]
  df_e = two.withColumn("Total_Export" ,reduce(add, [col(x) for x in two.columns[1:]]))
  im1 = df_i.select('Total_Import')
  ex2 = df_e.select('Total_Export')
  main = im1.join(ex2).toPandas().transpose().rename(columns={0:'Total'})
  fig = px.bar(main, x=['Total_Import','Total_Export'],y ='Total', color='Total')
  fig.show()

In [45]:
importvsExport('Andorra')

In [309]:
def importVsExport_Line(country):
  one = df_imports[df_imports['Partner'] == country]
  two = df_exports[df_exports['Partner'] == country]
  df1=one.toPandas().T
  df1.columns = df1.iloc[0]
  df1 = df1.drop(df1.index[0])
  df1 = df1.reset_index()
  df1 = df1.rename_axis(None, axis=1)
  df1 = df1.rename(columns={'index':'Year'})
  df1['Type'] = 'Import'
  df2=two.toPandas().T
  df2.columns = df2.iloc[0]
  df2 = df2.drop(df2.index[0])
  df2 = df2.reset_index()
  df2 = df2.rename_axis(None, axis=1)
  df2 = df2.rename(columns={'index':'Year'})
  df2['Type'] = 'Export'
  df1['Year']=df1['Year'].astype(int)
  df2['Year']=df2['Year'].astype(int)
  linedata = pd.concat([df1,df2]) #df_concat = df_1.union(df_2)
  fig = px.line(linedata, x='Year', y=country,color='Type')
  fig.show()

In [310]:
importVsExport_Line('Algeria')

In [None]:
#Column Sum
from pyspark.sql import functions as F
df_i.add(*[F.sum(df_i[c_name]) for c_name in df_i.columns[1:]]).show()