In [293]:
import pyspark
from pyspark.sql import SparkSession,Window
from pyspark.conf import SparkConf
from pyspark.sql.functions import col,lit,explode,count,when,min,max,avg,trim,sum,struct,round,broadcast,udf
from pyspark.sql.types import StringType


spark = SparkSession.builder.appName('Yagro Interview')\
                    .getOrCreate()

In [280]:
df=spark.read.csv('/home/jovyan/data/YAGRO-fuel-sales.csv')

In [281]:
df.show()

+--------------+-----------+--------+------------+--------------+----------+-----------+--------+------------+--------------+---------------+-----------+--------+------------+--------------+------------+
|           _c0|        _c1|     _c2|         _c3|           _c4|       _c5|        _c6|     _c7|         _c8|           _c9|           _c10|       _c11|    _c12|        _c13|          _c14|        _c15|
+--------------+-----------+--------+------------+--------------+----------+-----------+--------+------------+--------------+---------------+-----------+--------+------------+--------------+------------+
|     Segment>>|       Farm|    NULL|        NULL|          NULL|Farm Total|  Corporate|    NULL|        NULL|          NULL|Corporate Total|     Retail|    NULL|        NULL|          NULL|Retail Total|
|   Ship Mode>>|First Class|Same Day|Second Class|Standard Class|      NULL|First Class|Same Day|Second Class|Standard Class|           NULL|First Class|Same Day|Second Class|Standard 

In [282]:
def remove_spaces(s):
    return s.replace(' ','_')
remove_spaces('Farm Total')

'Farm_Total'

In [283]:
rows = df.collect()

In [284]:
main_columns =[]
for i in range(1, len(rows[0])):
    if(rows[0][i] is not None and 'Total' not in  rows[0][i]):
        main_columns.append(rows[0][i])
print(main_columns)

['Farm', 'Corporate', 'Retail']


In [285]:
# for each main column get the subcolumns and their location
col_location =[]
current_main_col = main_columns.pop(0)
for i in range(1, len(rows[1])):
    if(rows[1][i] is None):
        col_location.append((current_main_col, remove_spaces(rows[0][i]),i))
        try:
            current_main_col = main_columns.pop(0)
        except IndexError:
            pass
    else:
        col_location.append((current_main_col, remove_spaces(rows[1][i]),i))

In [286]:
for i in col_location:
    print(i)

('Farm', 'First_Class', 1)
('Farm', 'Same_Day', 2)
('Farm', 'Second_Class', 3)
('Farm', 'Standard_Class', 4)
('Farm', 'Farm_Total', 5)
('Corporate', 'First_Class', 6)
('Corporate', 'Same_Day', 7)
('Corporate', 'Second_Class', 8)
('Corporate', 'Standard_Class', 9)
('Corporate', 'Corporate_Total', 10)
('Retail', 'First_Class', 11)
('Retail', 'Same_Day', 12)
('Retail', 'Second_Class', 13)
('Retail', 'Standard_Class', 14)
('Retail', 'Retail_Total', 15)


In [287]:
## cut the first 3 rows / rename first column and drop the last row

In [288]:
rdd_with_index = df.rdd.zipWithIndex()
df_with_index = rdd_with_index.map(lambda x: (*x[0], x[1])).toDF(df.columns + ["index"])

# get index of last row
last_row = df_with_index.agg(max("index").alias("max_index")).collect()[0][0]

df_new = df_with_index.filter((df_with_index.index > 2) & (df_with_index.index != last_row)).drop('index')

df_new.count(), df.count()

(822, 826)

In [289]:
df_with_index.show()

+--------------+-----------+--------+------------+--------------+----------+-----------+--------+------------+--------------+---------------+-----------+--------+------------+--------------+------------+-----+
|           _c0|        _c1|     _c2|         _c3|           _c4|       _c5|        _c6|     _c7|         _c8|           _c9|           _c10|       _c11|    _c12|        _c13|          _c14|        _c15|index|
+--------------+-----------+--------+------------+--------------+----------+-----------+--------+------------+--------------+---------------+-----------+--------+------------+--------------+------------+-----+
|     Segment>>|       Farm|    NULL|        NULL|          NULL|Farm Total|  Corporate|    NULL|        NULL|          NULL|Corporate Total|     Retail|    NULL|        NULL|          NULL|Retail Total|    0|
|   Ship Mode>>|First Class|Same Day|Second Class|Standard Class|      NULL|First Class|Same Day|Second Class|Standard Class|           NULL|First Class|Same Da

In [290]:
# rebuild it with structs
# test
# df_new = df_new.withColumn("Farm", struct(col("_c1").alias("First_Class")))
#df_new = df_new.withColumn("Farm", 
#    col("Farm").withField("Extra", struct(
#        col("_c4").alias("First_Class2")))
#)
#df_new.show()

In [291]:
df_new = df_new.withColumnRenamed('_c0','order_id')

for i in col_location:
    if(i[0] not in df_new.columns):
        df_new = df_new.withColumn(i[0], struct(col(f"_c{i[2]}").alias(i[1]))).drop(f"_c{i[2]}")
    else:
        df_new = df_new.withColumn(i[0],col(i[0]).withField(i[1], col(f"_c{i[2]}").alias(i[1]))).drop(f"_c{i[2]}")

In [267]:
df_new.show()

+--------------+--------------------+--------------------+--------------------+
|      order_id|                Farm|           Corporate|              Retail|
+--------------+--------------------+--------------------+--------------------+
|CA-2011-100293|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|
|CA-2011-100706|{NULL, NULL, 129....|{NULL, NULL, NULL...|{NULL, NULL, NULL...|
|CA-2011-100895|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|
|CA-2011-100916|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|
|CA-2011-101266|{NULL, NULL, 13.3...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|
|CA-2011-101560|{NULL, NULL, NULL...|{NULL, NULL, 542....|{NULL, NULL, NULL...|
|CA-2011-101770|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|
|CA-2011-102274|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|
|CA-2011-102673|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|
|CA-2011-102988|{NULL, NULL, NULL...|{NU

In [268]:
# query test
df_new.select('Farm.*').show()

+-----------+--------+------------+--------------+----------+
|First_Class|Same_Day|Second_Class|Standard_Class|Farm_Total|
+-----------+--------+------------+--------------+----------+
|       NULL|    NULL|        NULL|          NULL|      NULL|
|       NULL|    NULL|      129.44|          NULL|    129.44|
|       NULL|    NULL|        NULL|        605.47|    605.47|
|       NULL|    NULL|        NULL|          NULL|      NULL|
|       NULL|    NULL|       13.36|          NULL|     13.36|
|       NULL|    NULL|        NULL|          NULL|      NULL|
|       NULL|    NULL|        NULL|          NULL|      NULL|
|       NULL|    NULL|        NULL|          NULL|      NULL|
|       NULL|    NULL|        NULL|          NULL|      NULL|
|       NULL|    NULL|        NULL|          NULL|      NULL|
|       NULL|    NULL|        NULL|          NULL|      NULL|
|     149.95|    NULL|        NULL|          NULL|    149.95|
|       NULL|    NULL|        NULL|          NULL|      NULL|
|       

#### Exercise start
##### How much fuel was ordered by farms?

In [269]:
df_new.agg(round(sum('Farm.Farm_Total'),3).alias('Total_Fuel_Farms')).show()

+----------------+
|Total_Fuel_Farms|
+----------------+
|      195580.971|
+----------------+



In [270]:
# check if total column is accurate
farm_columns = df_new.select("Farm.*").columns
aggregations = {f"Farm.{col}": round(sum(f"Farm.{col}"), 4).alias(f"Total_{col}") for col in farm_columns}
df_farms = df_new.agg(*aggregations.values())
df_farms = df_farms.withColumn('Total_Using_Other_columns',col('Total_First_Class')+ col('Total_Same_Day')+ col('Total_Second_Class')+ col('Total_Standard_Class'))

df_farms.show()

# check value from the total listed in the original file
df_with_index.filter(df_with_index.index == last_row).select(col('_c5').alias('Total_last_row')).show()

+-----------------+--------------+------------------+--------------------+----------------+-------------------------+
|Total_First_Class|Total_Same_Day|Total_Second_Class|Total_Standard_Class|Total_Farm_Total|Total_Using_Other_columns|
+-----------------+--------------+------------------+--------------------+----------------+-------------------------+
|        20802.173|      8132.409|        49724.2545|         116714.3145|      195580.971|               195373.151|
+-----------------+--------------+------------------+--------------------+----------------+-------------------------+

+--------------+
|Total_last_row|
+--------------+
|    195580.971|
+--------------+



##### How much fuel was ordered by retail customers?

In [271]:
# check if total column is accurate
retail_columns = df_new.select("Retail.*").columns
aggregations = {f"Retail.{col}": round(sum(f"Retail.{col}"), 4).alias(f"Total_{col}") for col in retail_columns}
df_retail = df_new.agg(*aggregations.values())
df_retail = df_retail.withColumn('Total_Using_Other_columns',col('Total_First_Class')+ col('Total_Same_Day')+ col('Total_Second_Class')+ col('Total_Standard_Class'))

df_retail.show()

# check value from the total listed in the original file
df_with_index.filter(df_with_index.index == last_row).select(col('_c15').alias('Total_last_row')).show()

+-----------------+--------------+------------------+--------------------+------------------+-------------------------+
|Total_First_Class|Total_Same_Day|Total_Second_Class|Total_Standard_Class|Total_Retail_Total|Total_Using_Other_columns|
+-----------------+--------------+------------------+--------------------+------------------+-------------------------+
|         7737.786|      2977.456|          8791.127|          54748.6325|        74255.0015|               74255.0015|
+-----------------+--------------+------------------+--------------------+------------------+-------------------------+

+--------------+
|Total_last_row|
+--------------+
|    74255.0015|
+--------------+



In [272]:
df_sd = df_new.select(col("Farm.Same_day").alias('farm_sd'),col("Corporate.Same_day").alias('corporate_sd'),col("Retail.Same_day").alias('retail_sd'))
df_sd_sum = df_sd.agg(
    round(sum("farm_sd"),4).alias("Total_Farm_SD"),
    round(sum("corporate_sd"),4).alias("Total_Corporate_SD"),
     round(sum("retail_sd"),4).alias("Total_Retail_SD")
)
df_sd_sum = df_sd_sum.withColumn('Total',round(col('Total_Farm_SD')+ col('Total_Corporate_SD')+ col('Total_Retail_SD'),4))
df_sd_sum.show()

#compared with last row values
df_sd_last_row = df_with_index.filter(df_with_index.index == last_row).select(col('_c2').alias('Total_Farm_SD'), col('_c7').alias('Total_Corporate_SD'), col('_c12').alias('Total_Retail_SD')).withColumn('Total', round(col('Total_Farm_SD')+ col('Total_Corporate_SD')+ col('Total_Retail_SD'),4))
df_sd_last_row.show()

+-------------+------------------+---------------+---------+
|Total_Farm_SD|Total_Corporate_SD|Total_Retail_SD|    Total|
+-------------+------------------+---------------+---------+
|     8132.409|          9907.308|       2977.456|21017.173|
+-------------+------------------+---------------+---------+

+-------------+------------------+---------------+---------+
|Total_Farm_SD|Total_Corporate_SD|Total_Retail_SD|    Total|
+-------------+------------------+---------------+---------+
|     8132.409|            9907.3|        2977.46|21017.169|
+-------------+------------------+---------------+---------+



In [274]:
df_sd_sum = df_sd_sum.withColumn('index',lit(0))
df_sd_last_row = df_sd_last_row.withColumn('index',lit(1))

df_union = df_sd_sum.union(df_sd_last_row)

data2 = [
    (0, "Total using actual data in table"),
    (1, "<- Total using the values in last row")
]
columns2 = ["index", "notes"]

df_notes = spark.createDataFrame(data2, columns2)

df_union.join(broadcast(df_notes),'index','left').show(truncate=False)

+-----+-------------+------------------+---------------+---------+-------------------------------------+
|index|Total_Farm_SD|Total_Corporate_SD|Total_Retail_SD|Total    |notes                                |
+-----+-------------+------------------+---------------+---------+-------------------------------------+
|0    |8132.409     |9907.308          |2977.456       |21017.173|Total using actual data in table     |
|1    |8132.409     |9907.3            |2977.46        |21017.169|<- Total using the values in last row|
+-----+-------------+------------------+---------------+---------+-------------------------------------+



In [275]:
## how much ordered in 2012 and from US based clients in 2014

In [276]:
def extract_year(s):
    return (s.split('-')[1])
extract_year('CA-2011-100895')

'2011'

In [278]:
def extract_country(s):
    return (s.split('-')[0])
extract_country('CA-2011-100895')

'CA'

In [296]:
extract_year_udf = udf(extract_year, StringType())
extract_country_udf = udf(extract_country, StringType())
df_new = df_new.withColumn('year',extract_year_udf(col('order_id')))
df_new = df_new.withColumn('country',extract_country_udf(col('order_id')))
df_new.show()

+--------------+--------------------+--------------------+--------------------+----+-------+
|      order_id|                Farm|           Corporate|              Retail|year|country|
+--------------+--------------------+--------------------+--------------------+----+-------+
|CA-2011-100293|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|2011|     CA|
|CA-2011-100706|{NULL, NULL, 129....|{NULL, NULL, NULL...|{NULL, NULL, NULL...|2011|     CA|
|CA-2011-100895|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|2011|     CA|
|CA-2011-100916|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|2011|     CA|
|CA-2011-101266|{NULL, NULL, 13.3...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|2011|     CA|
|CA-2011-101560|{NULL, NULL, NULL...|{NULL, NULL, 542....|{NULL, NULL, NULL...|2011|     CA|
|CA-2011-101770|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|2011|     CA|
|CA-2011-102274|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL,

In [304]:
df_12 = df_new.filter(col('year')== '2012')
df_12.select('Farm.Farm_Total','Retail.Retail_Total','Corporate.Corporate_Total')


df_12 = df_12.agg(
    round(sum(col("Farm.Farm_Total")),4).alias("Total_Farm"),
    round(sum(col("Retail.Retail_Total")),4).alias("Total_Retail"),
    round(sum(col("Corporate.Corporate_Total")),4).alias("Total_Corporate")
)

df_12 = df_12.withColumn("Grand_Total_2012", 
                                  col("Total_Farm") + col("Total_Retail") + col("Total_Corporate"))

df_12.show()

+----------+------------+---------------+----------------+
|Total_Farm|Total_Retail|Total_Corporate|Grand_Total_2012|
+----------+------------+---------------+----------------+
|49206.2265|    4046.424|       18107.33|      71359.9805|
+----------+------------+---------------+----------------+



In [306]:
df_US = df_new.filter((col('year')== '2014') & (col('country')== 'US'))
df_US.select('Farm.Farm_Total','Retail.Retail_Total','Corporate.Corporate_Total')


df_US = df_US.agg(
    round(sum(col("Farm.Farm_Total")),4).alias("Total_Farm"),
    round(sum(col("Retail.Retail_Total")),4).alias("Total_Retail"),
    round(sum(col("Corporate.Corporate_Total")),4).alias("Total_Corporate")
)

df_US = df_US.withColumn("Grand_Total_US", 
                                  round(col("Total_Farm") + col("Total_Retail") + col("Total_Corporate"),4))

df_US.show()

+----------+------------+---------------+--------------+
|Total_Farm|Total_Retail|Total_Corporate|Grand_Total_US|
+----------+------------+---------------+--------------+
| 10036.198|   8147.3605|      11479.617|    29663.1755|
+----------+------------+---------------+--------------+



### Storage

In [309]:
def order_id_null(spark,df):
    null_user_id_count = df.filter(col("order_id").isNull()).count()
    assert null_user_id_count == 0, f"order_id null test Failed"
    print("order_id null test - Success")

order_id_null(spark,df_new)

df_new.write.partitionBy('year').mode('append').parquet("/home/jovyan/data/dataset.parquet")

order_id null test - Success


In [310]:
df_new = spark.read.parquet("/home/jovyan/data/dataset.parquet")
df_new.show()

+--------------+--------------------+--------------------+--------------------+-------+----+
|      order_id|                Farm|           Corporate|              Retail|country|year|
+--------------+--------------------+--------------------+--------------------+-------+----+
|CA-2014-100412|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|     CA|2014|
|CA-2014-100426|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|     CA|2014|
|CA-2014-100622|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|     CA|2014|
|CA-2014-100902|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|     CA|2014|
|CA-2014-101042|{NULL, NULL, NULL...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|     CA|2014|
|CA-2014-101210|{NULL, NULL, 13.1...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|     CA|2014|
|CA-2014-101273|{NULL, NULL, 14.3...|{NULL, NULL, NULL...|{NULL, NULL, NULL...|     CA|2014|
|CA-2014-101483|{NULL, NULL, 61.2...|{NULL, NULL, NULL...|{NULL, NULL,