<a href="https://colab.research.google.com/github/maxdelvecchyo/Deep-Learning/blob/master/day1_Ortega.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Set up CoLab environment: Install spark and configure python to work with it

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from IPython.display import clear_output

# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
# sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.config("spark.ui.port","4050").getOrCreate()
clear_output()
spark

In [None]:
# As you will notice, Spark UI does not work. So please run this cell.
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
clear_output()
!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

http://ec89e69fa4fe.ngrok.io


## Move the kaggle dataset from drive to Colab file system
Run cell

In [None]:

from google.colab import drive
drive.mount('/content/drive')

# Download files from Vantage Drive to local colab drive!
!cp /content/drive/Shared\ drives/Vantage\ AI/Kennismanagement/Vantage\ Program/2020-07-12\ Spark\ Guido\ +\ Joris/Data\ Handson/data-handson.zip /content/
!unzip /content/data-handson.zip -d /content/data/
!rm /content/data-handson.zip
!rm -rf /content/sample_data/

clear_output()
print('Setup is all done! Back to class')

Setup is all done! Back to class


# Hands-on - DAY 1

## Load in the data files (ETL)

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
aisles = spark.read.format('csv').load('data/aisles.csv')
aisles.show(5)

+--------+--------------------+
|     _c0|                 _c1|
+--------+--------------------+
|aisle_id|               aisle|
|       1|prepared soups sa...|
|       2|   specialty cheeses|
|       3| energy granola bars|
|       4|       instant foods|
+--------+--------------------+
only showing top 5 rows



In [None]:
# Indicate that the data has a header
aisles = spark.read.format('csv').option('header','true').load('data/aisles.csv')
aisles.show(5)

+--------+--------------------+
|aisle_id|               aisle|
+--------+--------------------+
|       1|prepared soups sa...|
|       2|   specialty cheeses|
|       3| energy granola bars|
|       4|       instant foods|
|       5|marinades meat pr...|
+--------+--------------------+
only showing top 5 rows



In [None]:
# Or with less code
aisles = spark.read.csv('data/aisles.csv', header=True)
aisles.show(5)
print('Schema: ')
aisles.printSchema()

+--------+--------------------+
|aisle_id|               aisle|
+--------+--------------------+
|       1|prepared soups sa...|
|       2|   specialty cheeses|
|       3| energy granola bars|
|       4|       instant foods|
|       5|marinades meat pr...|
+--------+--------------------+
only showing top 5 rows

Schema: 
root
 |-- aisle_id: string (nullable = true)
 |-- aisle: string (nullable = true)



In [None]:
from pyspark.sql import functions as F
# Cast columns to a typ
aisles.withColumn('aisle_id',F.col('aisle_id').cast('integer')).printSchema()
aisles.selectExpr('cast(aisle_id as int) as aisle_id','aisle').printSchema()
spark.read.csv('data/aisles.csv', header=True, schema='aisle_id int, aisle string', mode='FAILFAST').printSchema()
# NOTE, these lines of code are all lazy, so nothing is actually loaded.

root
 |-- aisle_id: integer (nullable = true)
 |-- aisle: string (nullable = true)

root
 |-- aisle_id: integer (nullable = true)
 |-- aisle: string (nullable = true)

root
 |-- aisle_id: integer (nullable = true)
 |-- aisle: string (nullable = true)



## 1. Load in `aisles.csv`, cast data to correct types and write to parquet:

`df.write.format('parquet').save('<folder-name>')`

In [None]:
aisles = spark.read.csv('data/aisles.csv', header=True)
aisles = aisles.withColumn('aisle_id',F.col('aisle_id').cast('integer'))
aisles.printSchema()
aisles.write.format('parquet').save('./data/aisles')

root
 |-- aisle_id: integer (nullable = true)
 |-- aisle: string (nullable = true)



AnalysisException: ignored

## 2. Load in `departments.csv`, cast data to correct types and write to parquet


In [None]:
departments = spark.read.csv('data/departments.csv', header=True)
departments = departments.withColumn('department_id',F.col('department_id').cast('integer'))
departments.write.format('parquet').save('./data/departments')

## 3. Load in `orders.csv`, cast data to correct types and write to spark storage:
`df.write.mode('overwrite').saveAsTable('<table-name>') `

Now you can also query the data with sql! 

df2 = `spark.sql('SELECT * FROM <table-name>')`


In [None]:
 # Noob alert
 orders = spark.read.format('json').load('./data/orders.txt')
 orders = orders.withColumn('days_since_prior_order',F.col('days_since_prior_order').cast('integer'))
 orders = orders.withColumn('order_dow',F.col('order_dow').cast('integer'))
 orders = orders.withColumn('order_hour_of_day',F.col('order_hour_of_day').cast('integer'))
 orders = orders.withColumn('order_id',F.col('order_id').cast('integer'))
 orders = orders.withColumn('order_number',F.col('order_number').cast('integer'))
 orders = orders.withColumn('user_id',F.col('user_id').cast('integer'))
 orders.printSchema()
 orders.show(5)
 orders.write.saveAsTable('orders')


root
 |-- days_since_prior_order: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- user_id: integer (nullable = true)

+----------------------+--------+---------+-----------------+--------+------------+-------+
|days_since_prior_order|eval_set|order_dow|order_hour_of_day|order_id|order_number|user_id|
+----------------------+--------+---------+-----------------+--------+------------+-------+
|                  null| history|        6|               12| 2618231|           0|     13|
|                     8| history|        0|               11| 2560699|           1|     13|
|                     6| history|        6|               21| 2288946|           2|     13|
|                     9| history|        1|               12|   19256|           3|     13|
|                     6| history|    

## 4. Load in `orderContents`, set the right column names and save the table

Maybe register the resulting dataframe as a temporary sql table.

`df.createOrReplaceTempView('<table-name')`

Now it is also possible to query this table!

In [None]:
ordercontents = spark.read.parquet('./data/orderContents')
ordercontents = ordercontents.withColumnRenamed("_c0","order_id")
ordercontents = ordercontents.withColumnRenamed("_c1","product_id")
ordercontents = ordercontents.withColumnRenamed("_c2","add_to_cart_order")
ordercontents = ordercontents.withColumnRenamed("_c3","reordered")
ordercontents.show(5)
orders.printSchema()

ordercontents.write.saveAsTable('ordercontents')

+--------+----------+-----------------+---------+
|order_id|product_id|add_to_cart_order|reordered|
+--------+----------+-----------------+---------+
|  161263|     18376|                4|        1|
|   36981|     17024|                4|        0|
|   86528|     35752|                4|        1|
|  575540|     33956|                3|        0|
|  185986|     21637|                6|        0|
+--------+----------+-----------------+---------+
only showing top 5 rows

root
 |-- days_since_prior_order: integer (nullable = true)
 |-- eval_set: string (nullable = true)
 |-- order_dow: integer (nullable = true)
 |-- order_hour_of_day: integer (nullable = true)
 |-- order_id: integer (nullable = true)
 |-- order_number: integer (nullable = true)
 |-- user_id: integer (nullable = true)



AnalysisException: ignored

## 5. Find out datatype of products.txt, load the data, cast types and save (in a format keeping the types you just set)

In [None]:
products = spark.read.csv('data/products.csv', header=True)
products = products.withColumn('product_id',F.col('product_id').cast('integer'))
products = products.withColumn('aisle_id',F.col('aisle_id').cast('integer'))
products = products.withColumn('department_id',F.col('department_id').cast('integer'))
products.printSchema()
products.show(5)
products.write.format('parquet').save('./data/products')
#products = departments.withColumn('department_id',F.col('department_id').cast('integer'))
#departments.write.format('parquet').save('./data/departments')

root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- aisle_id: integer (nullable = true)
 |-- department_id: integer (nullable = true)

+----------+--------------------+--------+-------------+
|product_id|        product_name|aisle_id|department_id|
+----------+--------------------+--------+-------------+
|         1|Chocolate Sandwic...|      61|           19|
|         2|    All-Seasons Salt|     104|           13|
|         3|Robust Golden Uns...|      94|            7|
|         4|Smart Ones Classi...|      38|            1|
|         5|Green Chile Anyti...|       5|           13|
+----------+--------------------+--------+-------------+
only showing top 5 rows



# Label Engineering
Goal: manipulate the data in such a way that we obtain labels.

|user_id|product_id|product_bought_in_last_visit|
|---|---|---|
| A | 1 | 1 |
|A  | 5  |0  |
| ... | ... | ... |
| B | 2 | 0 |
|B| 3| 1|
|...|...|...|



# Feature Engineering
Goal: Create both user and product features

## User features:
1. Average basket size
2. Whether the user ordered organic, gluten-free, or Asian items in the past
3. Number of distinct items
4. Average time between orders

## Product Features
1. Probability of a product being bought on a certain day
2. Average time between buys
3. Number of times being bought
4. Probability of an item being reordered?

## UserxProduct Features
1. How often has the user bought an item in the past
2. Average time for a given user between being a product

#  DAY 2