In [1]:
%reload_kedro
from pyspark.sql import SparkSession

2019-06-29 21:08:48,178 - root - INFO - ** Kedro project prospa
2019-06-29 21:08:48,182 - anyconfig - INFO - Loading: /Users/Emil_Pastor/Documents/Projects/Startup/myanswer/interview-test-data-engineer/prospa/conf/base/logging.yml
2019-06-29 21:08:48,189 - anyconfig - INFO - Loading: /Users/Emil_Pastor/Documents/Projects/Startup/myanswer/interview-test-data-engineer/prospa/conf/base/catalog.yml
2019-06-29 21:08:48,196 - anyconfig - INFO - Loading: /Users/Emil_Pastor/Documents/Projects/Startup/myanswer/interview-test-data-engineer/prospa/conf/base/credentials.yml
2019-06-29 21:08:48,198 - anyconfig - INFO - Loading: /Users/Emil_Pastor/Documents/Projects/Startup/myanswer/interview-test-data-engineer/prospa/conf/base/parameters.yml
2019-06-29 21:08:48,204 - root - INFO - Defined global variables proj_dir, proj_name, conf and io


# Load data sets

In [2]:
customer_dimension = io.load("customer_dimension")
part_dimension = io.load("part_dimension")
supplier_dimension = io.load("supplier_dimension")
order_fact = io.load("order_fact")

2019-06-29 21:08:48,212 - kedro.io.data_catalog - INFO - Loading data from `customer_dimension` (SparkDataSet)...
2019-06-29 21:08:57,151 - kedro.io.data_catalog - INFO - Loading data from `part_dimension` (SparkDataSet)...
2019-06-29 21:08:57,360 - kedro.io.data_catalog - INFO - Loading data from `supplier_dimension` (SparkDataSet)...
2019-06-29 21:08:57,547 - kedro.io.data_catalog - INFO - Loading data from `order_fact` (SparkDataSet)...


In [3]:
customer_dimension.registerTempTable("customer_dimension")
part_dimension.registerTempTable("part_dimension")
supplier_dimension.registerTempTable("supplier_dimension")
order_fact.registerTempTable("order_fact")

# Initiate Spark Session

In [4]:
spark = SparkSession \
    .builder \
    .getOrCreate()

# What are the top 5 nations in terms of revenue?

In [5]:
df=spark.sql("SELECT SD.Supplier_Country_Name\
                    ,SUM(OR.Order_Line_Revenue) AS Supplier_Country_Revenue\
                FROM order_fact OR \
          INNER JOIN supplier_dimension SD\
                  ON OR.Order_Supplier_Key=SD.Supplier_Key\
            GROUP BY SD.Supplier_Country_Name\
            ORDER BY Supplier_Country_Revenue DESC").limit(5)
df.show(5)

+---------------------+------------------------+
|Supplier_Country_Name|Supplier_Country_Revenue|
+---------------------+------------------------+
|        UNITED STATES|    1.7130294535100004E8|
|                CHINA|     1.527625987020998E8|
|           MOZAMBIQUE|        1.458187652671E8|
|              VIETNAM|    1.2863593749619988E8|
|                EGYPT|     1.270949754020999E8|
+---------------------+------------------------+



# From the top 5 nations, what is the most common shipping mode?

In [6]:
df.registerTempTable("Top_5_Nations")

In [7]:
df_ship=spark.sql("SELECT OR.Order_Ship_Mode\
                    ,COUNT(DISTINCT OR.Order_Customer_Key) AS Shipping_Count \
                FROM order_fact OR \
          INNER JOIN supplier_dimension SD\
                  ON OR.Order_Supplier_Key=SD.Supplier_Key\
               WHERE SD.Supplier_Country_Name IN (SELECT Supplier_Country_Name FROM Top_5_Nations) \
            GROUP BY OR.Order_Ship_Mode\
            ORDER BY Shipping_Count DESC").limit(1)
df_ship.show(1)

+---------------+--------------+
|Order_Ship_Mode|Shipping_Count|
+---------------+--------------+
|            FOB|           911|
+---------------+--------------+



# What are the top selling months?

In [8]:
df_sell=spark.sql("SELECT YEAR(Order_Date)*100 + MONTH(Order_Date) AS Year_Month\
                         ,SUM(Order_Line_Revenue) AS Monthly_Revenue\
                     FROM order_fact\
                 GROUP BY Year_Month\
                 ORDER BY Monthly_Revenue DESC")
df_sell.show(10)

+----------+--------------------+
|Year_Month|     Monthly_Revenue|
+----------+--------------------+
|    199312|     3.09095660532E7|
|    199310|3.0758079172800012E7|
|    199201|3.0537091363399997E7|
|    199608|3.0231565316999994E7|
|    199512|3.0143667112199996E7|
|    199401| 2.997313480920004E7|
|    199309|2.9957340914299995E7|
|    199405| 2.974829256199999E7|
|    199612| 2.957026777099999E7|
|    199409|2.9489398987300012E7|
+----------+--------------------+
only showing top 10 rows



# Who are the top customer in terms of revenue and/or quantity?

In [9]:
spark.sql("SELECT CD.Customer_Name\
                 ,SUM(OR.Order_Line_Revenue) AS Customer_Revenue\
             FROM order_fact OR \
       INNER JOIN customer_dimension CD\
               ON OR.Order_Customer_Key=CD.Customer_Key\
         GROUP BY CD.Customer_Name\
         ORDER BY Customer_Revenue DESC").show(5)

+------------------+-----------------+
|     Customer_Name| Customer_Revenue|
+------------------+-----------------+
|Customer#000001489|5418543.599999999|
|Customer#000000214|     4684271.0263|
|Customer#000001396|4655099.209099999|
|Customer#000001246|     4651060.5863|
|Customer#000000073|4648501.807700001|
+------------------+-----------------+
only showing top 5 rows



In [10]:
spark.sql("SELECT CD.Customer_Name\
                 ,SUM(OR.Order_Line_Quantity) AS Customer_Quantity\
             FROM order_fact OR \
       INNER JOIN customer_dimension CD\
               ON OR.Order_Customer_Key=CD.Customer_Key\
         GROUP BY CD.Customer_Name\
         ORDER BY Customer_Quantity DESC").show(5)

+------------------+-----------------+
|     Customer_Name|Customer_Quantity|
+------------------+-----------------+
|Customer#000001489|             3868|
|Customer#000001396|             3408|
|Customer#000000073|             3384|
|Customer#000000214|             3369|
|Customer#000000898|             3309|
+------------------+-----------------+
only showing top 5 rows



# Compare the sales revenue of on current period against previous period

In [11]:
df_revn=spark.sql("SELECT YEAR(Order_Date)*100 + MONTH(Order_Date) AS Year_Month\
                         ,SUM(Order_Line_Revenue) AS Monthly_Revenue\
                     FROM order_fact\
                 GROUP BY Year_Month")
df_revn.registerTempTable("Revenue")

In [12]:
spark.sql("SELECT *\
                 ,LAG(Monthly_Revenue,12)OVER(ORDER BY Year_Month) AS Last_12_Monthly_Revenue\
                 ,Monthly_Revenue-LAG(Monthly_Revenue,12)OVER(ORDER BY Year_Month) AS Difference_Between_Previous_Year_Monthly_Revenue\
             FROM Revenue\
          ").show()

+----------+--------------------+-----------------------+------------------------------------------------+
|Year_Month|     Monthly_Revenue|Last_12_Monthly_Revenue|Difference_Between_Previous_Year_Monthly_Revenue|
+----------+--------------------+-----------------------+------------------------------------------------+
|    199201|3.0537091363399997E7|                   null|                                            null|
|    199202| 2.473271612919999E7|                   null|                                            null|
|    199203|2.9474009758200005E7|                   null|                                            null|
|    199204|2.7840816030399997E7|                   null|                                            null|
|    199205|2.7553435227700002E7|                   null|                                            null|
|    199206|2.4983270502499998E7|                   null|                                            null|
|    199207|2.6444198584999986E7|    