# Expedia 호텔 예약 예측 
- kaggle, https://www.kaggle.com/c/expedia-hotel-recommendations

In [1]:
sc

<pyspark.context.SparkContext at 0x7fb1c59ba240>

In [1]:
import collections
import time

from pyspark import StorageLevel
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [3]:
spark = SparkSession \
    .builder \
    .appName("project") \
    .master("local[*]") \
    .getOrCreate()

In [3]:
train_data = spark.read.csv("file:///home/ubuntu/Session/180602_project/data/train.csv")
test_data = spark.read.csv("file:///home/ubuntu/Session/180602_project/data/test.csv")
destination_data = spark.read.csv("file:///home/ubuntu/Session/180602_project/data/destinations.csv")

## 전처리

### rename
- column이름 바꾸기

In [4]:
col_name = train_data.columns
print(col_name)

['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13', '_c14', '_c15', '_c16', '_c17', '_c18', '_c19', '_c20', '_c21', '_c22', '_c23']


In [5]:
real_name = train_data.take(1)
real_name

[Row(_c0='date_time', _c1='site_name', _c2='posa_continent', _c3='user_location_country', _c4='user_location_region', _c5='user_location_city', _c6='orig_destination_distance', _c7='user_id', _c8='is_mobile', _c9='is_package', _c10='channel', _c11='srch_ci', _c12='srch_co', _c13='srch_adults_cnt', _c14='srch_children_cnt', _c15='srch_rm_cnt', _c16='srch_destination_id', _c17='srch_destination_type_id', _c18='is_booking', _c19='cnt', _c20='hotel_continent', _c21='hotel_country', _c22='hotel_market', _c23='hotel_cluster')]

In [6]:
new_col_name = []
for idx in range(24) :
    new_col_name.append(train_data.take(1)[0][idx])
    
print(new_col_name)

['date_time', 'site_name', 'posa_continent', 'user_location_country', 'user_location_region', 'user_location_city', 'orig_destination_distance', 'user_id', 'is_mobile', 'is_package', 'channel', 'srch_ci', 'srch_co', 'srch_adults_cnt', 'srch_children_cnt', 'srch_rm_cnt', 'srch_destination_id', 'srch_destination_type_id', 'is_booking', 'cnt', 'hotel_continent', 'hotel_country', 'hotel_market', 'hotel_cluster']


In [7]:
command = []
for idx in range(24) :
    command.append(col(col_name[idx]).alias(new_col_name[idx]))
    
train_data = train_data.select(command)

In [8]:
train_data = train_data.filter(train_data.date_time != 'date_time')
train_data.show(3)

+-------------------+---------+--------------+---------------------+--------------------+------------------+-------------------------+-------+---------+----------+-------+----------+----------+---------------+-----------------+-----------+-------------------+------------------------+----------+---+---------------+-------------+------------+-------------+
|          date_time|site_name|posa_continent|user_location_country|user_location_region|user_location_city|orig_destination_distance|user_id|is_mobile|is_package|channel|   srch_ci|   srch_co|srch_adults_cnt|srch_children_cnt|srch_rm_cnt|srch_destination_id|srch_destination_type_id|is_booking|cnt|hotel_continent|hotel_country|hotel_market|hotel_cluster|
+-------------------+---------+--------------+---------------------+--------------------+------------------+-------------------------+-------+---------+----------+-------+----------+----------+---------------+-----------------+-----------+-------------------+------------------------+--

In [4]:
## 함수 정의
def making_dataframe(df) :
    col_name = df.columns
    real_name = df.take(1)
    
    new_col_name = []
    for idx in range(len(col_name)) :
        new_col_name.append(real_name[0][idx])
        
    command = []
    for idx in range(len(col_name)) :
        command.append(col(col_name[idx]).alias(new_col_name[idx]))
        
    df = df.select(command)
    df = df.filter(df[real_name[0][0]] != real_name[0][0])
    
    return df

In [5]:
train_df = making_dataframe(train_data)
test_df = making_dataframe(test_data)
destination_df = making_dataframe(destination_data)

In [20]:
train_df.show(3)
print("\n\n\n")
test_df.show(3)
print("\n\n\n")
destination_df.show(3)

+-------------------+---------+--------------+---------------------+--------------------+------------------+-------------------------+-------+---------+----------+-------+----------+----------+---------------+-----------------+-----------+-------------------+------------------------+----------+---+---------------+-------------+------------+-------------+
|          date_time|site_name|posa_continent|user_location_country|user_location_region|user_location_city|orig_destination_distance|user_id|is_mobile|is_package|channel|   srch_ci|   srch_co|srch_adults_cnt|srch_children_cnt|srch_rm_cnt|srch_destination_id|srch_destination_type_id|is_booking|cnt|hotel_continent|hotel_country|hotel_market|hotel_cluster|
+-------------------+---------+--------------+---------------------+--------------------+------------------+-------------------------+-------+---------+----------+-------+----------+----------+---------------+-----------------+-----------+-------------------+------------------------+--

### NA 처리1

In [6]:
def drop_na(df) :
    new_df = df.na.drop()
    
    return new_df

In [7]:
def fill_na(df, ctg) :
    if ctg == "train" :
        new_df = df.na.fill({"date_time":"2013.5-08-00 00:00:00", "srch_ci":"2013.5-09-00","srch_co":"2013.5-09-00"})
    else :
        new_df = df.na.fill({"date_time":"2015-08-00 00:00:00", "srch_ci":"2015-09-00","srch_co":"2015-09-00"})
        
    return new_df

In [8]:
train_df1_1 = drop_na(train_df)
train_df1_2 = fill_na(train_df, "train")
test_df1 = fill_na(test_df, "test")

### schema
- 데이터 타입 바꿔주기

In [11]:
train_df.schema

StructType(List(StructField(date_time,StringType,true),StructField(site_name,StringType,true),StructField(posa_continent,StringType,true),StructField(user_location_country,StringType,true),StructField(user_location_region,StringType,true),StructField(user_location_city,StringType,true),StructField(orig_destination_distance,StringType,true),StructField(user_id,StringType,true),StructField(is_mobile,StringType,true),StructField(is_package,StringType,true),StructField(channel,StringType,true),StructField(srch_ci,StringType,true),StructField(srch_co,StringType,true),StructField(srch_adults_cnt,StringType,true),StructField(srch_children_cnt,StringType,true),StructField(srch_rm_cnt,StringType,true),StructField(srch_destination_id,StringType,true),StructField(srch_destination_type_id,StringType,true),StructField(is_booking,StringType,true),StructField(cnt,StringType,true),StructField(hotel_continent,StringType,true),StructField(hotel_country,StringType,true),StructField(hotel_market,StringTyp

In [10]:
test_df.schema

StructType(List(StructField(id,StringType,true),StructField(date_time,StringType,true),StructField(site_name,StringType,true),StructField(posa_continent,StringType,true),StructField(user_location_country,StringType,true),StructField(user_location_region,StringType,true),StructField(user_location_city,StringType,true),StructField(orig_destination_distance,StringType,true),StructField(user_id,StringType,true),StructField(is_mobile,StringType,true),StructField(is_package,StringType,true),StructField(channel,StringType,true),StructField(srch_ci,StringType,true),StructField(srch_co,StringType,true),StructField(srch_adults_cnt,StringType,true),StructField(srch_children_cnt,StringType,true),StructField(srch_rm_cnt,StringType,true),StructField(srch_destination_id,StringType,true),StructField(srch_destination_type_id,StringType,true),StructField(hotel_continent,StringType,true),StructField(hotel_country,StringType,true),StructField(hotel_market,StringType,true)))

In [9]:
## 함수 정의
def string_to_double(df) :
    col_name = df.columns
    for name in col_name :
        df = df.withColumn(name, df[name].cast(DoubleType()))
            
    return df

def string_to_double2(df) :
    col_name = ["reserv_year", "reserv_month", "check_in_year", "check_in_month", "check_out_year", "check_out_month"]
    for name in col_name :
        df = df.withColumn(name, test_df[name].cast(DoubleType()))
            
    return df

def string_to_date(df, stringlist=["date_time", "srch_ci", "srch_co"]) :
    col1 = functions.udf(lambda date_time: date_time.split()[0].split("-")[0])
    col2 = functions.udf(lambda date_time: date_time.split()[0].split("-")[1])
    ##col3 = functions.udf(lambda date_time: date_time.split()[0].split("-")[2])   
    ##col4 = functions.udf(lambda date_time: date_time.split()[1].split(":")[0])
    
    new_df = df.select("*", col1(df.date_time).alias("reserv_year"), col2(df.date_time).alias("reserv_month"))
    
    col5 = functions.udf(lambda srch_ci: srch_ci.split("-")[0])
    col6 = functions.udf(lambda srch_ci: srch_ci.split("-")[1])
    #col7 = functions.udf(lambda srch_ci: srch_ci.split("-")[2])   
    
    new_df = new_df.select("*", col5(df.srch_ci).alias("check_in_year"), col6(df.srch_ci).alias("check_in_month"))
    
    col8 = functions.udf(lambda srch_co: srch_co.split("-")[0])
    col9 = functions.udf(lambda srch_co: srch_co.split("-")[1])
    #col10 = functions.udf(lambda srch_co: srch_co.split("-")[2])   
    
    new_df = new_df.select("*", col8(df.srch_co).alias("check_out_year"), col9(df.srch_co).alias("check_out_month"))
    new_df = new_df.drop(stringlist[0]).drop(stringlist[1]).drop(stringlist[2])
    
    return new_df

In [10]:
train_df2_1 = string_to_double(string_to_date(train_df1_1))
train_df2_2 = string_to_double(string_to_date(train_df1_2))
test_df2 = string_to_double(string_to_date(test_df1))

In [14]:
train_df2_1.printSchema()
print("\n\n\n")
train_df2_2.printSchema()
print("\n\n\n")
test_df2.printSchema()

root
 |-- site_name: double (nullable = true)
 |-- posa_continent: double (nullable = true)
 |-- user_location_country: double (nullable = true)
 |-- user_location_region: double (nullable = true)
 |-- user_location_city: double (nullable = true)
 |-- orig_destination_distance: double (nullable = true)
 |-- user_id: double (nullable = true)
 |-- is_mobile: double (nullable = true)
 |-- is_package: double (nullable = true)
 |-- channel: double (nullable = true)
 |-- srch_adults_cnt: double (nullable = true)
 |-- srch_children_cnt: double (nullable = true)
 |-- srch_rm_cnt: double (nullable = true)
 |-- srch_destination_id: double (nullable = true)
 |-- srch_destination_type_id: double (nullable = true)
 |-- is_booking: double (nullable = true)
 |-- cnt: double (nullable = true)
 |-- hotel_continent: double (nullable = true)
 |-- hotel_country: double (nullable = true)
 |-- hotel_market: double (nullable = true)
 |-- hotel_cluster: double (nullable = true)
 |-- reserv_year: double (nulla

### describe
- data의 데이터 분포 확인

In [127]:
def describe_df(df) :
    col_name = df.columns
    for name in col_name :
        try :
            newRow = spark.createDataFrame([["median", df.approxQuantile(name, [0.5], 0.25)[0]]])
            df.describe(name).union(newRow).show()
        except :
            try :
                df.describe(name).show()
            except :
                print(name + " is no describe because of error")

In [129]:
describe_df(train_df2_1)

+-------+-----------------+
|summary|        site_name|
+-------+-----------------+
|  count|         37670293|
|   mean|9.795271329585889|
| stddev| 11.9675435665128|
|    min|              2.0|
|    max|             53.0|
| median|             11.0|
+-------+-----------------+

+-------+------------------+
|summary|    posa_continent|
+-------+------------------+
|  count|          37670293|
|   mean|2.6804730188851997|
| stddev|0.7480393482506577|
|    min|               0.0|
|    max|               4.0|
| median|               3.0|
+-------+------------------+

+-------+---------------------+
|summary|user_location_country|
+-------+---------------------+
|  count|             37670293|
|   mean|    86.10880194109454|
| stddev|    59.24310334783878|
|    min|                  0.0|
|    max|                239.0|
| median|                 66.0|
+-------+---------------------+

+-------+--------------------+
|summary|user_location_region|
+-------+--------------------+
|  count|     

In [None]:
describe_df(train_df2_2)

In [128]:
describe_df(test_df2)

+-------+-----------------+
|summary|               id|
+-------+-----------------+
|  count|          2528243|
|   mean|        1264121.0|
| stddev|729841.0326509464|
|    min|              0.0|
|    max|        2528242.0|
| median|        1232671.0|
+-------+-----------------+

+-------+------------------+
|summary|         site_name|
+-------+------------------+
|  count|           2528243|
|   mean|  9.70966556616591|
| stddev|12.271834283602988|
|    min|               0.0|
|    max|              53.0|
| median|               2.0|
+-------+------------------+

+-------+-----------------+
|summary|   posa_continent|
+-------+-----------------+
|  count|          2528243|
|   mean|2.697412788248598|
| stddev|0.765128762499465|
|    min|              0.0|
|    max|              4.0|
| median|              3.0|
+-------+-----------------+

+-------+---------------------+
|summary|user_location_country|
+-------+---------------------+
|  count|              2528243|
|   mean|    85.516

### NA 처리2 

In [11]:
def fill_na_as_mean_or_most(df, ctg) :
    col_name = df.columns
    new_df = df.select("*")
    
    for col in col_name :
        print(col)
        try :
            col_avg = new_df.agg({col : "mean"}).collect()[0][0]
            new_df = new_df.na.fill({col :col_avg})
        except :
            if col == "check_in_year" and ctg=="test": 
                new_df = new_df.na.fill({col : 2015})
            elif col == "check_out_year" and ctg=="test" : 
                new_df = new_df.na.fill({col : 2015})
            elif col == "check_in_year" and ctg=="train": 
                new_df = new_df.na.fill({col : 2013.5})
            elif col == "check_out_year" and ctg=="train" : 
                new_df = new_df.na.fill({col : 2013.5})
            elif col == "check_in_month" : 
                new_df = new_df.na.fill({col : 9})
            elif col == "check_out_month" : 
                new_df = new_df.na.fill({col : 9})
            else :
                new_df = new_df.na.fill({col : 0})
                print("error : ", col)
            
    return new_df

In [12]:
train_df3_1= train_df2_1.select("*")

In [None]:
train_df3_2 = fill_na_as_mean_or_most(train_df2_2, "train")
train_df3_2 = train_df2_2.na.drop()

In [13]:
test_df3 = fill_na_as_mean_or_most(test_df2, "test")
test_df3 = test_df3.na.fill(0)

id
site_name
posa_continent
user_location_country
user_location_region
user_location_city
orig_destination_distance
user_id
is_mobile
is_package
channel
srch_adults_cnt
srch_children_cnt
srch_rm_cnt
srch_destination_id
srch_destination_type_id
hotel_continent
hotel_country
hotel_market
reserv_year
reserv_month
check_in_year
check_in_month
check_out_year
check_out_month


### feature engineering
- 예약한 날짜와 check in한 날짜 사이의 기간
- 숙소에 머무르는 기간
- posa_continent 삭제
- cnt 삭제
- user_id 삭제
- 총 인원수
- 한 방에 몇명이 머물렀는지
- is_booking = 1만 남기기
- hotel_cluster가 target
- year 삭제

In [14]:
def interval(df) :
    print("check")
    term1 = (df.check_in_year-df.reserv_year)*365 + (df.check_in_month-df.reserv_month)*30
    new_df = df.select("*", term1.alias("reserv_check_in_interval"))
    
    term2 = (df.check_out_year-df.check_in_year)*365 + (df.check_out_month-df.check_in_month)*30
    new_df2 = new_df.select("*", term2.alias("check_in_out_interval"))
    
    new_df3 = new_df2.drop("reserv_year")
    new_df3 = new_df3.drop("check_in_year")
    new_df3 = new_df3.drop("check_out_year")
    
    return new_df3

In [15]:
def drop_df(df, lst = ["posa_continent", "user_id", "cnt"]) :
    print("check")
    if len(lst) == 3 :
        new_df = df.drop(lst[0], lst[1], lst[2]) 
    else :
        new_df = df.drop(lst[0], lst[1])
    
    return new_df

In [16]:
def total_people(df) :
    print("check")
    total = df.srch_adults_cnt + df.srch_children_cnt
    avg_room_cnt = total / df.srch_rm_cnt
    new_df = df.select("*", total.alias("total_cnt"), avg_room_cnt.alias("avg_room_cnt"))
    
    return new_df

In [18]:
def is_booking(df) :
    print("check")
    new_df = df.where(df.is_booking == 1)
    new_df = new_df.drop("is_booking").select("*")
    return new_df

In [19]:
train_df4_1 = interval(total_people(drop_df(is_booking(train_df3_1))))
print("col :",len(train_df4_1.columns))
print("row :", train_df4_1.count())
train_df4_1.show(3)

check
check
check
check
col : 24
row : 1985514
+---------+---------------------+--------------------+------------------+-------------------------+---------+----------+-------+---------------+-----------------+-----------+-------------------+------------------------+---------------+-------------+------------+-------------+------------+--------------+---------------+---------+------------+------------------------+---------------------+
|site_name|user_location_country|user_location_region|user_location_city|orig_destination_distance|is_mobile|is_package|channel|srch_adults_cnt|srch_children_cnt|srch_rm_cnt|srch_destination_id|srch_destination_type_id|hotel_continent|hotel_country|hotel_market|hotel_cluster|reserv_month|check_in_month|check_out_month|total_cnt|avg_room_cnt|reserv_check_in_interval|check_in_out_interval|
+---------+---------------------+--------------------+------------------+-------------------------+---------+----------+-------+---------------+-----------------+---------

In [None]:
train_df4_2 = interval(total_people(drop_df(is_booking(train_df3_2))))
print("col :",len(train_df4_2.columns))
print("row :", train_df4_2.count())
train_df4_2.show(3)

In [20]:
test_df4 = interval(total_people(drop_df(test_df3, lst = ["posa_continent", "user_id"])))
print("col :",len(test_df4.columns))
print("row :", test_df4.count())
test_df4.show(3)

check
check
check
col : 24
row : 2528243
+---+---------+---------------------+--------------------+------------------+-------------------------+---------+----------+-------+---------------+-----------------+-----------+-------------------+------------------------+---------------+-------------+------------+------------+--------------+---------------+---------+------------+------------------------+---------------------+
| id|site_name|user_location_country|user_location_region|user_location_city|orig_destination_distance|is_mobile|is_package|channel|srch_adults_cnt|srch_children_cnt|srch_rm_cnt|srch_destination_id|srch_destination_type_id|hotel_continent|hotel_country|hotel_market|reserv_month|check_in_month|check_out_month|total_cnt|avg_room_cnt|reserv_check_in_interval|check_in_out_interval|
+---+---------+---------------------+--------------------+------------------+-------------------------+---------+----------+-------+---------------+-----------------+-----------+-------------------

### normalization
- orig_destination_distance
- srch_adults_cnt
- srch_children_cnt
- srch_rm_cnt
- total_cnt
- avg_room_cnt
- check_in_out_interval

In [21]:
col_lst = ["orig_destination_distance","srch_adults_cnt","srch_children_cnt","srch_rm_cnt","total_cnt","avg_room_cnt"]

In [22]:
def normalize(df, col_lst) :
    new_df = df.select("*")
    
    for col in col_lst :
        print(col)
        described = new_df.describe(col).select("*")
        mean = float(described.take(3)[1][1])
        stddev = float(described.take(3)[2][1])
        
        new_df = new_df.select("*", ((new_df[col]-mean)/stddev).alias("normed_"+col))
        new_df = new_df.drop(col)
        
    return new_df

In [23]:
train_df5_1 = normalize(train_df4_1, col_lst)
train_df5_1.show(3)

orig_destination_distance
srch_adults_cnt
srch_children_cnt
srch_rm_cnt
total_cnt
avg_room_cnt
+---------+---------------------+--------------------+------------------+---------+----------+-------+-------------------+------------------------+---------------+-------------+------------+-------------+------------+--------------+---------------+------------------------+---------------------+--------------------------------+----------------------+------------------------+-------------------+-------------------+-------------------+
|site_name|user_location_country|user_location_region|user_location_city|is_mobile|is_package|channel|srch_destination_id|srch_destination_type_id|hotel_continent|hotel_country|hotel_market|hotel_cluster|reserv_month|check_in_month|check_out_month|reserv_check_in_interval|check_in_out_interval|normed_orig_destination_distance|normed_srch_adults_cnt|normed_srch_children_cnt| normed_srch_rm_cnt|   normed_total_cnt|normed_avg_room_cnt|
+---------+--------------------

In [None]:
train_df5_2 = normalize(train_df4_2, col_lst)
train_df5_2.show(3)

In [24]:
test_df5 = normalize(test_df4, col_lst)
test_df5.show(3)

orig_destination_distance
srch_adults_cnt
srch_children_cnt
srch_rm_cnt
total_cnt
avg_room_cnt
+---+---------+---------------------+--------------------+------------------+---------+----------+-------+-------------------+------------------------+---------------+-------------+------------+------------+--------------+---------------+------------------------+---------------------+--------------------------------+----------------------+------------------------+--------------------+--------------------+--------------------+
| id|site_name|user_location_country|user_location_region|user_location_city|is_mobile|is_package|channel|srch_destination_id|srch_destination_type_id|hotel_continent|hotel_country|hotel_market|reserv_month|check_in_month|check_out_month|reserv_check_in_interval|check_in_out_interval|normed_orig_destination_distance|normed_srch_adults_cnt|normed_srch_children_cnt|  normed_srch_rm_cnt|    normed_total_cnt| normed_avg_room_cnt|
+---+---------+---------------------+--------

### save

In [24]:
outdir = "file:///home/ubuntu/Session/180602_project/preprocessed_data/"

In [29]:
train_df3_1_saved = train_df3_1.drop('is_booking', 'cnt')
train_df3_1_saved.write.format("csv").save(outdir + "train1")
print("saved")

In [None]:
train_df3_2_saved = train_df3_2.drop('is_booking', 'cnt')
train_df3_2_saved.write.format("csv").save(outdir + "train2")
print("saved")

In [43]:
test_df3.write.format("csv").save(outdir + "test")
print("saved")

saved


In [26]:
train_df5_1.write.format("csv").save(outdir + "preprocessed_train1")
print("saved")

saved


In [None]:
train_df5_2.write.format("csv").save(outdir + "preprocessed_train2")
print("saved")

In [44]:
test_df5.write.format("csv").save(outdir + "preprocessed_test")
print("saved")

saved


### columns

In [62]:
print(train_df3_1_saved.columns)
print(len(train_df3_1_saved.columns))

['site_name', 'posa_continent', 'user_location_country', 'user_location_region', 'user_location_city', 'orig_destination_distance', 'user_id', 'is_mobile', 'is_package', 'channel', 'srch_adults_cnt', 'srch_children_cnt', 'srch_rm_cnt', 'srch_destination_id', 'srch_destination_type_id', 'hotel_continent', 'hotel_country', 'hotel_market', 'hotel_cluster', 'reserv_year', 'reserv_month', 'check_in_year', 'check_in_month', 'check_out_year', 'check_out_month']
25


In [63]:
print(train_df5_1.columns)
print(len(train_df5_1.columns))

['site_name', 'user_location_country', 'user_location_region', 'user_location_city', 'is_mobile', 'is_package', 'channel', 'srch_destination_id', 'srch_destination_type_id', 'hotel_continent', 'hotel_country', 'hotel_market', 'hotel_cluster', 'reserv_month', 'check_in_month', 'check_out_month', 'reserv_check_in_interval', 'check_in_out_interval', 'normed_orig_destination_distance', 'normed_srch_adults_cnt', 'normed_srch_children_cnt', 'normed_srch_rm_cnt', 'normed_total_cnt', 'normed_avg_room_cnt']
24


In [64]:
print(test_df3.columns)
print(len(test_df3.columns))

['id', 'site_name', 'posa_continent', 'user_location_country', 'user_location_region', 'user_location_city', 'orig_destination_distance', 'user_id', 'is_mobile', 'is_package', 'channel', 'srch_adults_cnt', 'srch_children_cnt', 'srch_rm_cnt', 'srch_destination_id', 'srch_destination_type_id', 'hotel_continent', 'hotel_country', 'hotel_market', 'reserv_year', 'reserv_month', 'check_in_year', 'check_in_month', 'check_out_year', 'check_out_month']
25


In [57]:
print(test_df5.columns)
print(len(test_df5.columns))

['id', 'site_name', 'user_location_country', 'user_location_region', 'user_location_city', 'is_mobile', 'is_package', 'channel', 'srch_destination_id', 'srch_destination_type_id', 'hotel_continent', 'hotel_country', 'hotel_market', 'reserv_month', 'check_in_month', 'check_out_month', 'reserv_check_in_interval', 'check_in_out_interval', 'normed_orig_destination_distance', 'normed_srch_adults_cnt', 'normed_srch_children_cnt', 'normed_srch_rm_cnt', 'normed_total_cnt', 'normed_avg_room_cnt']
24


## modeling 
- multinomial logistic
- decision tree
- random forest
- gradient boosting tree
- mlp

In [25]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.pipeline import PipelineModel

In [26]:
train1_features = ['site_name', 'posa_continent', 'user_location_country', 'user_location_region', 'user_location_city', 
                   'orig_destination_distance', 'user_id', 'is_mobile', 'is_package', 'channel', 'srch_adults_cnt', 
                   'srch_children_cnt', 'srch_rm_cnt', 'srch_destination_id', 'srch_destination_type_id', 'hotel_continent', 
                   'hotel_country', 'hotel_market', 'reserv_year', 'reserv_month', 'check_in_year', 
                   'check_in_month', 'check_out_year', 'check_out_month']

train2_features = ['site_name', 'user_location_country', 'user_location_region', 'user_location_city', 'is_mobile'
                   , 'is_package', 'channel', 'srch_destination_id', 'srch_destination_type_id', 'hotel_continent'
                   , 'hotel_country', 'hotel_market', 'reserv_month', 'check_in_month', 'check_out_month'
                   , 'reserv_check_in_interval', 'check_in_out_interval', 'normed_orig_destination_distance', 'normed_srch_adults_cnt'
                   , 'normed_srch_children_cnt', 'normed_srch_rm_cnt', 'normed_total_cnt', 'normed_avg_room_cnt']

target = "hotel_cluster"

print(len(train1_features))
print(len(train2_features))

24
23


In [27]:
# assembler

assembler1 = VectorAssembler(inputCols=train1_features, outputCol="features")
assembler2 = VectorAssembler(inputCols=train2_features, outputCol="features")

In [None]:
# multinomial logistic

lr1 = LogisticRegression(maxIter=120, regParam=0.01, labelCol=target)
lr2 = LogisticRegression(maxIter=130, regParam=0.01, labelCol=target)

pipeline1 = Pipeline(stages=[assembler1, lr1])
pipeline2 = Pipeline(stages=[assembler2, lr2])

pipelineModel1 = pipeline1.fit(train_df3_1_saved)
pipelineModel2 = pipeline2.fit(train_df5_1)

pipelineModel1.transform(test_df3).show()
pipelineModel2.transform(test_df5).show()

In [None]:
# decision tree

dt1 = DecisionTreeClassifier(labelCol=target)
dt2 = DecisionTreeClassifier(labelCol=target)

pipeline3 = Pipeline(stages=[assembler1, dt1])
pipeline4 = Pipeline(stages=[assembler2, dt2])

pipelineModel3 = pipeline3.fit(train_df3_1_saved)
pipelineModel4 = pipeline4.fit(train_df5_1)

pipelineModel3.transform(test_df3).show()
pipelineModel4.transform(test_df5).show()

In [None]:
# random forest

rf1 = RandomForestClassifier(labelCol=target, numTrees=30)
rf2 = RandomForestClassifier(labelCol=target, numTrees=40)

pipeline5 = Pipeline(stages=[assembler1, rf1])
pipeline6 = Pipeline(stages=[assembler2, rf2])

pipelineModel5 = pipeline5.fit(train_df3_1_saved)
pipelineModel6 = pipeline6.fit(train_df5_1)

pipelineModel5.transform(test_df3).show()
pipelineModel6.transform(test_df5).show()

In [None]:
# gradient boosting tree

gb1 = GBTClassifier(labelCol=target, maxIter=30)
gb2 = GBTClassifier(labelCol=target, maxIter=40)

pipeline7 = Pipeline(stages=[assembler1, gb1])
pipeline8 = Pipeline(stages=[assembler2, gb2])

pipelineModel7 = pipeline7.fit(train_df3_1_saved)
pipelineModel8 = pipeline8.fit(train_df5_1)

pipelineModel7.transform(test_df3).show()
pipelineModel8.transform(test_df5).show()

In [None]:
# mlp

layers1 = [24, 64, 128, 100]
layers2 = [23, 64, 128, 100]

mlp1 = MultilayerPerceptronClassifier(layers=layers1, labelCol=target, maxIter=150, blockSize=128)
mlp2 = MultilayerPerceptronClassifier(layers=layers2, labelCol=target, maxIter=170, blockSize=128)

pipeline9 = Pipeline(stages=[assembler1, mlp1])
pipeline10 = Pipeline(stages=[assembler2, mlp2])

pipelineModel9 = pipeline9.fit(temp)
pipelineModel10 = pipeline10.fit(train_df5_1)

pipelineModel9.transform(temp2).show()
pipelineModel10.transform(test_df5).show()


## ensemble
- multinomial logistic
- decision tree
- random forest
- gradient boosting tree
- mlp

In [1]:
import pandas as pd

In [4]:
result1 = spark.read.csv("s3://ybigta-spark-180602/data/model1_resilt/*")

Py4JJavaError: An error occurred while calling o38.csv.
: java.io.IOException: No FileSystem for scheme: s3
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:372)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:344)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


## data analysis by kernel
- https://www.kaggle.com/dvasyukova/predict-hotel-type-with-pandas
- https://www.kaggle.com/omarelgabry/explore-expedia-search-data

## Reference

- http://spark.apache.org/docs/2.1.0/api/python/index.html