In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import *

In [2]:
# Initialize spark session
spark = SparkSession.builder \
.appName("Test") \
.master("local[*]") \
.enableHiveSupport() \
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/03 01:50:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# load data set
df = spark.read.format("csv").option("header",True) \
.option("inferSchema",True)\
.load("data.csv")

                                                                                

In [4]:
df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- originName: string (nullable = true)
 |-- originTypeName: string (nullable = true)
 |-- destinationName: string (nullable = true)
 |-- destinationTypeName: string (nullable = true)
 |-- gradeName: string (nullable = true)
 |-- quantity: integer (nullable = true)



### 1. What are the top 5 destinations for oil produced in Albania?

In [5]:
top5_albania_destinations = df.filter(upper(col("originName"))=='ALBANIA') \
.groupBy("destinationName") \
.agg(sum("quantity").alias("Total_Qty")) \
.orderBy("Total_Qty",ascending=False).limit(5)

In [6]:
top5_albania_destinations.show()



+--------------------+---------+
|     destinationName|Total_Qty|
+--------------------+---------+
|  PADD1 (East Coast)|     9926|
|          New Jersey|     9926|
|       United States|     4963|
|       Paulsboro, NJ|     4963|
|AXEON SPECIALTY P...|     4963|
+--------------------+---------+



                                                                                


### 2. For UK, which destinations have a total quantity greater than 100,000?


In [7]:
uk_destinations = df.filter(col("originName")=='United Kingdom') \
.groupBy("destinationName") \
.agg(sum("quantity").alias("Total_Qty")) \
.filter(col("Total_Qty") > 100000)

In [8]:
uk_destinations.show()



+------------------+---------+
|   destinationName|Total_Qty|
+------------------+---------+
|             Texas|   137125|
|     United States|   233095|
|PADD3 (Gulf Coast)|   354908|
|         Louisiana|   112342|
|       Mississippi|   101885|
+------------------+---------+



                                                                                

### 3) What was the most exported grade for each year and origin?

In [9]:
exported_grades = df.groupBy("year","originName","gradeName") \
.agg(sum("quantity").alias("Total_Qty")) \
.withColumn("rn",dense_rank().over(Window.partitionBy("year","originName").orderBy(col("Total_Qty").desc()))) \
.filter(col("rn")==1) \
.drop("rn")

In [10]:
exported_grades.show()



+----+-----------------+-----------+---------+
|year|       originName|  gradeName|Total_Qty|
+----+-----------------+-----------+---------+
|2009|           Africa|Light Sweet|  2559851|
|2009|          Algeria|Light Sweet|   698243|
|2009|           Angola|     Medium|   840693|
|2009|        Argentina|Heavy Sweet|   115633|
|2009|     Asia-Pacific|Light Sweet|   129815|
|2009|        Australia|Light Sweet|    18074|
|2009|       Azerbaijan|Light Sweet|   223433|
|2009|           Belize| Light Sour|     4277|
|2009|          Bolivia|Light Sweet|     9611|
|2009|           Brazil| Heavy Sour|   638687|
|2009|         Cameroon|Heavy Sweet|    81445|
|2009|           Canada| Heavy Sour|  2716028|
|2009|  Canada (Region)| Heavy Sour|  2716028|
|2009|             Chad|Heavy Sweet|   165186|
|2009|            China|Heavy Sweet|    13734|
|2009|         Colombia| Heavy Sour|   510496|
|2009|Congo-Brazzaville|Light Sweet|   146342|
|2009|   Congo-Kinshasa|     Medium|    23884|
|2009|    Cot

                                                                                

### BONUS Question: In order to save output from Question.1 dataframe to Inceberg file format below code can be used:

top5_albania_destinations.write \
    .format("iceberg") \
    .mode("overwrite") \
    .save("spark_catalog.default.top5_albania_destinations")


## Additionally, while initializing the Spark Session few additional configurations would need to be added!!


.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
.config("spark.sql.catalog.spark_catalog.type", "hive") 


## However, while running the PySpark code, it's throwing errors for Python version and spark catalog.

In [7]:
spark.stop()