## Install Pyspark

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [                                                                               Get:2 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
                                                                               Ign:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
                                                                               0% [Waiting for headers] [Waiting for headers] [Waiting for headers]0% [1 InRelease gpgv 242 kB] [Waiti

## Set Pyspark Environment

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

## Import Pyspark Package

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Upload File

In [4]:
from google.colab import files
uploaded = files.upload()

Saving Northwind.zip to Northwind (1).zip


In [5]:
!unzip Northwind.zip

Archive:  Northwind.zip
   creating: Northwind/
  inflating: Northwind/orders.csv    
  inflating: Northwind/order_details.csv  
  inflating: Northwind/products.csv  
  inflating: Northwind/suppliers.csv  


## Initiate Spark

In [6]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

# Dataframe

## Read Data

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Dataset Orders

In [8]:
df_products = spark.read. \
  format("csv"). \
  option("inferSchema","true"). \
  option("header","true"). \
  load("Northwind/products.csv")

In [9]:
df_products.show(2)
df_products.printSchema()
df_products.dtypes

+----------+------------+-----------+-----------+------------------+----------+--------------+--------------+-------------+------------+
|product_id|product_name|supplier_id|category_id| quantity_per_unit|unit_price|units_in_stock|units_on_order|reorder_level|discontinued|
+----------+------------+-----------+-----------+------------------+----------+--------------+--------------+-------------+------------+
|         1|        Chai|          8|          1|10 boxes x 30 bags|      18.0|            39|             0|           10|           1|
|         2|       Chang|          1|          1|24 - 12 oz bottles|      19.0|            17|            40|           25|           1|
+----------+------------+-----------+-----------+------------------+----------+--------------+--------------+-------------+------------+
only showing top 2 rows

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- supplier_id: integer (nullable = true)
 |-- category_id:

[('product_id', 'int'),
 ('product_name', 'string'),
 ('supplier_id', 'int'),
 ('category_id', 'int'),
 ('quantity_per_unit', 'string'),
 ('unit_price', 'double'),
 ('units_in_stock', 'int'),
 ('units_on_order', 'int'),
 ('reorder_level', 'int'),
 ('discontinued', 'int')]

In [10]:
df_products = df_products \
  .withColumn("product_id" ,
              df_products["product_id"]
              .cast(StringType()))   \
  .withColumn("supplier_id",
              df_products["supplier_id"]
              .cast(StringType()))    \
  .withColumn("category_id"  ,
              df_products["category_id"]
              .cast(StringType())) \

In [11]:
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- supplier_id: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- quantity_per_unit: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- units_in_stock: integer (nullable = true)
 |-- units_on_order: integer (nullable = true)
 |-- reorder_level: integer (nullable = true)
 |-- discontinued: integer (nullable = true)



### Dataset Orders

In [12]:
df_orders = spark.read. \
  format("csv"). \
  option("inferSchema","true"). \
  option("header","true"). \
  load("Northwind/orders.csv")

In [13]:
df_orders.show(2)
df_orders.printSchema()
df_orders.dtypes

+--------+-----------+-----------+-------------------+-------------------+-------------------+--------+-------+--------------------+------------------+---------+-----------+----------------+------------+
|order_id|customer_id|employee_id|         order_date|      required_date|       shipped_date|ship_via|freight|           ship_name|      ship_address|ship_city|ship_region|ship_postal_code|ship_country|
+--------+-----------+-----------+-------------------+-------------------+-------------------+--------+-------+--------------------+------------------+---------+-----------+----------------+------------+
|   10248|      VINET|          5|1996-07-04 00:00:00|1996-08-01 00:00:00|1996-07-16 00:00:00|       3|  32.38|Vins et alcools C...|59 rue de l'Abbaye|    Reims|       null|           51100|      France|
|   10249|      TOMSP|          6|1996-07-05 00:00:00|1996-08-16 00:00:00|1996-07-10 00:00:00|       1|  11.61|  Toms Spezialitäten|     Luisenstr. 48|  Münster|       null|           

[('order_id', 'int'),
 ('customer_id', 'string'),
 ('employee_id', 'int'),
 ('order_date', 'timestamp'),
 ('required_date', 'timestamp'),
 ('shipped_date', 'timestamp'),
 ('ship_via', 'int'),
 ('freight', 'double'),
 ('ship_name', 'string'),
 ('ship_address', 'string'),
 ('ship_city', 'string'),
 ('ship_region', 'string'),
 ('ship_postal_code', 'string'),
 ('ship_country', 'string')]

In [14]:
df_orders = df_orders \
  .withColumn("order_id" ,
              df_orders["order_id"]
              .cast(StringType()))   \
  .withColumn("employee_id"  ,
              df_orders["employee_id"]
              .cast(StringType())) \

In [15]:
df_orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- employee_id: string (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- required_date: timestamp (nullable = true)
 |-- shipped_date: timestamp (nullable = true)
 |-- ship_via: integer (nullable = true)
 |-- freight: double (nullable = true)
 |-- ship_name: string (nullable = true)
 |-- ship_address: string (nullable = true)
 |-- ship_city: string (nullable = true)
 |-- ship_region: string (nullable = true)
 |-- ship_postal_code: string (nullable = true)
 |-- ship_country: string (nullable = true)



### Dataset Order Details

In [16]:
df_order_details = spark.read. \
  format("csv"). \
  option("inferSchema","true"). \
  option("header","true"). \
  load("Northwind/order_details.csv")

In [17]:
df_order_details.show(2)
df_order_details.printSchema()
df_order_details.dtypes

+--------+----------+----------+--------+--------+
|order_id|product_id|unit_price|quantity|discount|
+--------+----------+----------+--------+--------+
|   10248|        11|      14.0|      12|     0.0|
|   10248|        42|       9.8|      10|     0.0|
+--------+----------+----------+--------+--------+
only showing top 2 rows

root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)



[('order_id', 'int'),
 ('product_id', 'int'),
 ('unit_price', 'double'),
 ('quantity', 'int'),
 ('discount', 'double')]

In [18]:
df_order_details = df_order_details \
  .withColumn("order_id" ,
              df_order_details["order_id"]
              .cast(StringType()))   \
  .withColumn("product_id"  ,
              df_order_details["product_id"]
              .cast(StringType())) \

In [19]:
df_order_details.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)



###Dataset Supplier

In [20]:
df_suppliers = spark.read. \
  format("csv"). \
  option("inferSchema","true"). \
  option("header","true"). \
  load("Northwind/suppliers.csv")

In [21]:
df_suppliers.show(2)
df_suppliers.printSchema()
df_suppliers.dtypes

+-----------+--------------------+----------------+-------------------+--------------+-----------+------+-----------+-------+--------------+----+-----------+
|supplier_id|        company_name|    contact_name|      contact_title|       address|       city|region|postal_code|country|         phone| fax|   homepage|
+-----------+--------------------+----------------+-------------------+--------------+-----------+------+-----------+-------+--------------+----+-----------+
|          1|      Exotic Liquids|Charlotte Cooper| Purchasing Manager|49 Gilbert St.|     London|  null|    EC1 4SD|     UK|(171) 555-2222|null|       null|
|          2|New Orleans Cajun...|   Shelley Burke|Order Administrator|P.O. Box 78934|New Orleans|    LA|      70117|    USA|(100) 555-4822|null|#CAJUN.HTM#|
+-----------+--------------------+----------------+-------------------+--------------+-----------+------+-----------+-------+--------------+----+-----------+
only showing top 2 rows

root
 |-- supplier_id: inte

[('supplier_id', 'int'),
 ('company_name', 'string'),
 ('contact_name', 'string'),
 ('contact_title', 'string'),
 ('address', 'string'),
 ('city', 'string'),
 ('region', 'string'),
 ('postal_code', 'string'),
 ('country', 'string'),
 ('phone', 'string'),
 ('fax', 'string'),
 ('homepage', 'string')]

In [22]:
df_suppliers = df_suppliers \
  .withColumn("supplier_id" ,
              df_suppliers["supplier_id"]
              .cast(StringType()))   \
  

### Create Alias in Suppliers table

In [23]:
 df_suppliers2 = df_suppliers.select(col("supplier_id").alias("Supplier_id"), col("company_name").alias("Supplier_name"))
 df_suppliers2.show()

+-----------+--------------------+
|Supplier_id|       Supplier_name|
+-----------+--------------------+
|          1|      Exotic Liquids|
|          2|New Orleans Cajun...|
|          3|Grandma Kelly's H...|
|          4|       Tokyo Traders|
|          5|Cooperativa de Qu...|
|          6|            Mayumi's|
|          7|       Pavlova, Ltd.|
|          8|Specialty Biscuit...|
|          9|    PB Knäckebröd AB|
|         10|Refrescos America...|
|         11|Heli Süßwaren Gmb...|
|         12|Plutzer Lebensmit...|
|         13|Nord-Ost-Fisch Ha...|
|         14|Formaggi Fortini ...|
|         15|     Norske Meierier|
|         16|   Bigfoot Breweries|
|         17|   Svensk Sjöföda AB|
|         18|Aux joyeux ecclés...|
|         19|New England Seafo...|
|         20|        Leka Trading|
+-----------+--------------------+
only showing top 20 rows



In [24]:
df_suppliers2.printSchema()

root
 |-- Supplier_id: string (nullable = true)
 |-- Supplier_name: string (nullable = true)



### Join with table Suppliers

In [25]:
df_denormalized2 = df_orders.join(df_order_details,"order_id").join(df_products,"product_id").join(df_suppliers2,"supplier_id")
df_denormalized2.show()

+-----------+----------+--------+-----------+-----------+-------------------+-------------------+-------------------+--------+-------+--------------------+--------------------+--------------+-----------+----------------+------------+----------+--------+--------+--------------------+-----------+-----------------+----------+--------------+--------------+-------------+------------+--------------------+
|supplier_id|product_id|order_id|customer_id|employee_id|         order_date|      required_date|       shipped_date|ship_via|freight|           ship_name|        ship_address|     ship_city|ship_region|ship_postal_code|ship_country|unit_price|quantity|discount|        product_name|category_id|quantity_per_unit|unit_price|units_in_stock|units_on_order|reorder_level|discontinued|       Supplier_name|
+-----------+----------+--------+-----------+-----------+-------------------+-------------------+-------------------+--------+-------+--------------------+--------------------+--------------+---

# Homework

buatlah satu dataframe yang berisikan report Yearly Supplier Net revenue (group by Year, Supplier ID, Supplier Name):

Supplier_id	| Supplier_name	| Year	| Net_revenue

In [28]:
net_revenue_formula = (1-df_order_details["discount"]) * df_order_details["unit_price"] * df_order_details["quantity"]
df_net_revnue_unsorte2 = df_denormalized2.groupBy("Supplier_id","Supplier_name",year("order_date").alias("Year")).agg(round(sum(net_revenue_formula),2).alias("Net_revenue"))
df_net_revnue_unsorte2.orderBy("supplier_id","Supplier_name","Year").show(100)

+-----------+--------------------+----+-----------+
|Supplier_id|       Supplier_name|Year|Net_revenue|
+-----------+--------------------+----+-----------+
|          1|      Exotic Liquids|1996|    3257.96|
|          1|      Exotic Liquids|1997|    8762.55|
|          1|      Exotic Liquids|1998|    7379.45|
|         10|Refrescos America...|1996|     556.74|
|         10|Refrescos America...|1997|    1630.13|
|         10|Refrescos America...|1998|     2317.5|
|         11|Heli Süßwaren Gmb...|1996|    5409.48|
|         11|Heli Süßwaren Gmb...|1997|   23109.68|
|         11|Heli Süßwaren Gmb...|1998|   10134.27|
|         12|Plutzer Lebensmit...|1996|    21285.9|
|         12|Plutzer Lebensmit...|1997|   66007.99|
|         12|Plutzer Lebensmit...|1998|   58078.51|
|         13|Nord-Ost-Fisch Ha...|1996|    2645.46|
|         13|Nord-Ost-Fisch Ha...|1997|     6034.4|
|         13|Nord-Ost-Fisch Ha...|1998|    4744.34|
|         14|Formaggi Fortini ...|1996|   12094.19|
|         14