## Initializing the Spark

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


## Spark Installation

In [4]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 64kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 32.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=78130cc332e336138fdcb021cd3138da964c91f0f579d9a001e4895d0f084d9b
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


## Importing the Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd


## 5 Types of configurations in SparkSession

In [0]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Bonus_Assignment_6") \
    .config("logConf","false") \
    .config("driver.supervise","false") \
    .config("driver.userClassPathFirst","false") \
    .config("checkpoint.compress", "false") \
    .config("streaming.backpressure.enabled", "false") \
    .getOrCreate()

## Loading the Data

In [0]:
# Reading the data as dataframe
df=spark.read.csv('/content/sample_data/hotel_bookings.csv',header=True)

In [11]:
df.show()

+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|       hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_day_of_month|adults|country|market_segment|distribution_channel|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|Resort Hotel|          0|      342|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
|Resort Hotel|          0|      737|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
|Resort Hotel|          0|        7|             2015|              July|                        1|     1|    GBR|        Direct|              Direct|
|Resort Hotel|          0|       13|             2015|              July|                     

# Data Set  = "Hotel_Bookings.csv"

## Showing Data as RDD

In [0]:
RDD = spark.sparkContext.textFile("/content/sample_data/hotel_bookings.csv")

In [14]:
RDD.take(10)

['hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_day_of_month,adults,country,market_segment,distribution_channel',
 'Resort Hotel,0,342,2015,July,1,2,PRT,Direct,Direct',
 'Resort Hotel,0,737,2015,July,1,2,PRT,Direct,Direct',
 'Resort Hotel,0,7,2015,July,1,1,GBR,Direct,Direct',
 'Resort Hotel,0,13,2015,July,1,1,GBR,Corporate,Corporate',
 'Resort Hotel,0,14,2015,July,1,2,GBR,Online TA,TA/TO',
 'Resort Hotel,0,14,2015,July,1,2,GBR,Online TA,TA/TO',
 'Resort Hotel,0,0,2015,July,1,2,PRT,Direct,Direct',
 'Resort Hotel,0,9,2015,July,1,2,PRT,Direct,Direct',
 'Resort Hotel,1,85,2015,July,1,2,PRT,Online TA,TA/TO']

## RDD TO SPARK DATAFRAME

### Importing the Libraries 

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import Row
from pyspark.sql.types import *

In [16]:
df.show(2)

+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|       hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_day_of_month|adults|country|market_segment|distribution_channel|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|Resort Hotel|          0|      342|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
|Resort Hotel|          0|      737|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
only showing top 2 rows



In [18]:
dataset = RDD.map(lambda x : x.split(","))
data = dataset.map(lambda p: Row(hotel=p[0],is_canceled=p[1],lead_time=p[2],arrival_date_year=p[3],arrival_date_day_of_month=p[4],adults=p[5],country=p[6],market_segment=p[7],distribution_channel=p[8]))
RDD_DF = spark.createDataFrame(data)
RDD_DF.show(10)

+--------------------+-------------------------+-----------------+-------+--------------------+------------+-----------+---------+--------------+
|              adults|arrival_date_day_of_month|arrival_date_year|country|distribution_channel|       hotel|is_canceled|lead_time|market_segment|
+--------------------+-------------------------+-----------------+-------+--------------------+------------+-----------+---------+--------------+
|arrival_date_day_...|       arrival_date_month|arrival_date_year| adults|      market_segment|       hotel|is_canceled|lead_time|       country|
|                   1|                     July|             2015|      2|              Direct|Resort Hotel|          0|      342|           PRT|
|                   1|                     July|             2015|      2|              Direct|Resort Hotel|          0|      737|           PRT|
|                   1|                     July|             2015|      1|              Direct|Resort Hotel|          0|    



## Spark Data to RDD Converstion

In [74]:
SDF_TO_RDD = df.rdd.map(tuple)
SDF_TO_RDD.take(20)

[('Resort Hotel',
  '0',
  '342',
  '2015',
  'July',
  '1',
  '2',
  'PRT',
  'Direct',
  'Direct'),
 ('Resort Hotel',
  '0',
  '737',
  '2015',
  'July',
  '1',
  '2',
  'PRT',
  'Direct',
  'Direct'),
 ('Resort Hotel',
  '0',
  '7',
  '2015',
  'July',
  '1',
  '1',
  'GBR',
  'Direct',
  'Direct'),
 ('Resort Hotel',
  '0',
  '13',
  '2015',
  'July',
  '1',
  '1',
  'GBR',
  'Corporate',
  'Corporate'),
 ('Resort Hotel',
  '0',
  '14',
  '2015',
  'July',
  '1',
  '2',
  'GBR',
  'Online TA',
  'TA/TO'),
 ('Resort Hotel',
  '0',
  '14',
  '2015',
  'July',
  '1',
  '2',
  'GBR',
  'Online TA',
  'TA/TO'),
 ('Resort Hotel',
  '0',
  '0',
  '2015',
  'July',
  '1',
  '2',
  'PRT',
  'Direct',
  'Direct'),
 ('Resort Hotel',
  '0',
  '9',
  '2015',
  'July',
  '1',
  '2',
  'PRT',
  'Direct',
  'Direct'),
 ('Resort Hotel',
  '1',
  '85',
  '2015',
  'July',
  '1',
  '2',
  'PRT',
  'Online TA',
  'TA/TO'),
 ('Resort Hotel',
  '1',
  '75',
  '2015',
  'July',
  '1',
  '2',
  'PRT',
  'O

## Spark to Pandas converstion

In [23]:
SDF_TO_DF = df.toPandas()
SDF_TO_DF

Unnamed: 0,hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_day_of_month,adults,country,market_segment,distribution_channel
0,Resort Hotel,0,342,2015,July,1,2,PRT,Direct,Direct
1,Resort Hotel,0,737,2015,July,1,2,PRT,Direct,Direct
2,Resort Hotel,0,7,2015,July,1,1,GBR,Direct,Direct
3,Resort Hotel,0,13,2015,July,1,1,GBR,Corporate,Corporate
4,Resort Hotel,0,14,2015,July,1,2,GBR,Online TA,TA/TO
...,...,...,...,...,...,...,...,...,...,...
119385,City Hotel,0,23,2017,August,30,2,BEL,Offline TA/TO,TA/TO
119386,City Hotel,0,102,2017,August,31,3,FRA,Online TA,TA/TO
119387,City Hotel,0,34,2017,August,31,2,DEU,Online TA,TA/TO
119388,City Hotel,0,109,2017,August,31,2,GBR,Online TA,TA/TO


## (Spark) Select first 10 rows of dataset

In [29]:
df.show(10)

+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|       hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_day_of_month|adults|country|market_segment|distribution_channel|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|Resort Hotel|          0|      342|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
|Resort Hotel|          0|      737|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
|Resort Hotel|          0|        7|             2015|              July|                        1|     1|    GBR|        Direct|              Direct|
|Resort Hotel|          0|       13|             2015|              July|                     

## (SQL) Select first 10 rows of dataset 

In [0]:
df.createOrReplaceTempView("temp_sql")

In [33]:
lim_df_10 = spark.sql("""
select * 
from temp_sql 
LIMIT 10
""")

lim_df_10.show()

+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|       hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_day_of_month|adults|country|market_segment|distribution_channel|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|Resort Hotel|          0|      342|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
|Resort Hotel|          0|      737|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
|Resort Hotel|          0|        7|             2015|              July|                        1|     1|    GBR|        Direct|              Direct|
|Resort Hotel|          0|       13|             2015|              July|                     

## Spark DataSet Schema/

In [34]:
df.printSchema()

root
 |-- hotel: string (nullable = true)
 |-- is_canceled: string (nullable = true)
 |-- lead_time: string (nullable = true)
 |-- arrival_date_year: string (nullable = true)
 |-- arrival_date_month: string (nullable = true)
 |-- arrival_date_day_of_month: string (nullable = true)
 |-- adults: string (nullable = true)
 |-- country: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- distribution_channel: string (nullable = true)



## SQL Dataset Schema and Describing the Dataset

In [37]:
desc_df = """
describe temp_sql
"""
spark.sql(desc_df).show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|               hotel|   string|   null|
|         is_canceled|   string|   null|
|           lead_time|   string|   null|
|   arrival_date_year|   string|   null|
|  arrival_date_month|   string|   null|
|arrival_date_day_...|   string|   null|
|              adults|   string|   null|
|             country|   string|   null|
|      market_segment|   string|   null|
|distribution_channel|   string|   null|
+--------------------+---------+-------+



## (Spark) Group by and get max, min, count of a column in the dataset

In [40]:
df.groupBy('adults').count().show()

+------+-----+
|adults|count|
+------+-----+
|     3| 6202|
|     0|  403|
|     5|    2|
|    27|    2|
|    26|    5|
|     6|    1|
|    55|    1|
|    40|    1|
|     1|23027|
|    20|    2|
|    10|    1|
|     4|   62|
|     2|89680|
|    50|    1|
+------+-----+



In [42]:
df.select(max('adults')).show()

+-----------+
|max(adults)|
+-----------+
|          6|
+-----------+



In [43]:
df.select(min('adults')).show()

+-----------+
|min(adults)|
+-----------+
|          0|
+-----------+



## (SQL) Group by and get max, min, count of a column in the dataset 

In [44]:
t3 = """
select adults, count(adults) 
from temp_sql 
group by adults
"""
spark.sql(t3).show()

+------+-------------+
|adults|count(adults)|
+------+-------------+
|     3|         6202|
|     0|          403|
|     5|            2|
|    27|            2|
|    26|            5|
|     6|            1|
|    55|            1|
|    40|            1|
|     1|        23027|
|    20|            2|
|    10|            1|
|     4|           62|
|     2|        89680|
|    50|            1|
+------+-------------+



In [48]:
t3_1 = """
select max(adults) as old_person 
from temp_sql
"""
spark.sql(t3_1).show()

+----------+
|old_person|
+----------+
|         6|
+----------+



In [46]:
t3_2 = """
select min(adults) as young_person 
from temp_sql
"""
spark.sql(t3_2).show()

+------------+
|young_person|
+------------+
|           0|
+------------+



## Spark - Filter your dataset by some conditions based on your column 

In [49]:
# Only two members are over age 40
old_adult = df.filter(df['adults'] > 40)
old_adult.show()

+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|       hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_day_of_month|adults|country|market_segment|distribution_channel|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|Resort Hotel|          1|      336|             2015|         September|                        7|    50|    PRT|        Direct|              Direct|
|Resort Hotel|          1|      338|             2015|           October|                        4|    55|    PRT|        Direct|              Direct|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+



## SQL  Filter your dataset by some conditions based on your column

In [50]:
t4 = """
select * 
from temp_sql where adults > 40
"""
spark.sql(t4).show()

+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|       hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_day_of_month|adults|country|market_segment|distribution_channel|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|Resort Hotel|          1|      336|             2015|         September|                        7|    50|    PRT|        Direct|              Direct|
|Resort Hotel|          1|      338|             2015|           October|                        4|    55|    PRT|        Direct|              Direct|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+



## (Spark) Apply order by

In [52]:
order_by_set = df.select('hotel','is_canceled','lead_time','arrival_date_year','arrival_date_month','arrival_date_day_of_month','country','market_segment','adults').orderBy('adults')
order_by_set.show()

+------------+-----------+---------+-----------------+------------------+-------------------------+-------+--------------+------+
|       hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_day_of_month|country|market_segment|adults|
+------------+-----------+---------+-----------------+------------------+-------------------------+-------+--------------+------+
|Resort Hotel|          0|        1|             2015|           October|                        6|    PRT|     Corporate|     0|
|  City Hotel|          0|       15|             2015|            August|                       28|    PRT| Complementary|     0|
|Resort Hotel|          0|        0|             2015|           October|                       12|    PRT|     Corporate|     0|
|Resort Hotel|          0|       36|             2015|          November|                       20|    ESP|        Groups|     0|
|Resort Hotel|          0|      165|             2015|          December|                 

## Apply order by - SQL

In [58]:
t5 = """
select hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,arrival_date_day_of_month,country,market_segment,adults 
from temp_sql 
order by adults
"""
spark.sql(t5).show()

+------------+-----------+---------+-----------------+------------------+-------------------------+-------+--------------+------+
|       hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_day_of_month|country|market_segment|adults|
+------------+-----------+---------+-----------------+------------------+-------------------------+-------+--------------+------+
|Resort Hotel|          0|        1|             2015|           October|                        6|    PRT|     Corporate|     0|
|  City Hotel|          0|       15|             2015|            August|                       28|    PRT| Complementary|     0|
|Resort Hotel|          0|        0|             2015|           October|                       12|    PRT|     Corporate|     0|
|Resort Hotel|          0|       36|             2015|          November|                       20|    ESP|        Groups|     0|
|Resort Hotel|          0|      165|             2015|          December|                 

## Spark -  Select distinct records by a column 

In [59]:
df.select('adults').distinct().show()

+------+
|adults|
+------+
|     3|
|     0|
|     5|
|    27|
|    26|
|     6|
|    55|
|    40|
|     1|
|    20|
|    10|
|     4|
|     2|
|    50|
+------+



## Select distinct records by a column 

In [61]:
t6 = """
select distinct adults 
from temp_sql
"""
spark.sql(t6).show()

+------+
|adults|
+------+
|     3|
|     0|
|     5|
|    27|
|    26|
|     6|
|    55|
|    40|
|     1|
|    20|
|    10|
|     4|
|     2|
|    50|
+------+



## Transform the data type of column from string to float 

In [62]:
adults_df_schema = df.withColumn('adults', col('adults').cast((FloatType())))
adults_df_schema.printSchema()

root
 |-- hotel: string (nullable = true)
 |-- is_canceled: string (nullable = true)
 |-- lead_time: string (nullable = true)
 |-- arrival_date_year: string (nullable = true)
 |-- arrival_date_month: string (nullable = true)
 |-- arrival_date_day_of_month: string (nullable = true)
 |-- adults: float (nullable = true)
 |-- country: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- distribution_channel: string (nullable = true)



In [63]:
df.show(2)

+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|       hotel|is_canceled|lead_time|arrival_date_year|arrival_date_month|arrival_date_day_of_month|adults|country|market_segment|distribution_channel|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
|Resort Hotel|          0|      342|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
|Resort Hotel|          0|      737|             2015|              July|                        1|     2|    PRT|        Direct|              Direct|
+------------+-----------+---------+-----------------+------------------+-------------------------+------+-------+--------------+--------------------+
only showing top 2 rows



## Apply group by with having clause

In [65]:
df.groupBy('adults').agg(sum('lead_time')).show()

+------+--------------+
|adults|sum(lead_time)|
+------+--------------+
|     3|      583512.0|
|     0|       34882.0|
|     5|         662.0|
|    27|         701.0|
|    26|        1735.0|
|     6|         328.0|
|    55|         338.0|
|    40|         304.0|
|     1|     1545821.0|
|    20|         662.0|
|    10|         334.0|
|     4|        4772.0|
|     2|   1.0243536E7|
|    50|         336.0|
+------+--------------+



## Apply group by with having clause

In [67]:
task_8 = """
SELECT adults, sum(lead_time) 
from temp_sql 
group by adults 
having sum(lead_time) > 1000"""
spark.sql(task_8).show()

+------+------------------------------+
|adults|sum(CAST(lead_time AS DOUBLE))|
+------+------------------------------+
|     3|                      583512.0|
|     0|                       34882.0|
|    26|                        1735.0|
|     1|                     1545821.0|
|     4|                        4772.0|
|     2|                   1.0243536E7|
+------+------------------------------+



## Apply a min-max normalization on any numerical column.

In [3]:
SELECT
    1.00*(adults-2)/AgeRange,
    1.00*(lead_time-10)/LeadTimeRange,
FROM
    (
    SELECT
       adults,
       MIN(Age) OVER () AS MinAge,
       MAX(Age) OVER () - MIN(Age) OVER () AS AgeRange,
       lead_time,
       MIN(lead_time) OVER () AS MinLeadTime,
       MAX(lead_time) OVER () - MIN(MinLeadTime) OVER () AS LeadTimeRange,
    FROM
       df
    ) X

## Standard Normalization

In [4]:
select distinct
       asin,
       (
       select distinct
              adults
         from asin df2
        where df2.asin = df.asin
       ) as lead_time
  from df

## Transform a categorical column with OneHot Encoding

In [6]:
select hotel,
count(country when market_segment = 'Direct' then 1 end) as stright,
count(country when market_segment = 'Corporate' then 1 end) as rich
from df
group by lead_time