# Expedia Kaggle competition
### Author: [Andrey Vykhodtsev](https://www.kaggle.com/vykhand)
### Date: 09.05.2016


## Exploiting information leak with Apache Spark and Pandas 

This script is prepared for [Expedia Kaggle Competition](https://www.kaggle.com/c/expedia-hotel-recommendations)

Details about information leak can be found here: [Data leak forum post](https://www.kaggle.com/c/expedia-hotel-recommendations/forums/t/20345/data-leak)

From the perspective of logic, this is a direct copy of the [ZFTurbo's](https://www.kaggle.com/zfturbo) script:

https://www.kaggle.com/zfturbo/expedia-hotel-recommendations/leakage-solution/run/231363


## Preparation

In order to run this notebook, you need to have Apache Spark and Hive installed.

I use [IBM Apache Hadoop](https://www.ibm.com/support/knowledgecenter/SSPT3X_4.0.0/com.ibm.swg.im.infosphere.biginsights.install.doc/doc/bi_install_iop_biginsights.html) distribution and standalone [Apache Spark 1.6.1](spark.apache.org)

This notebook assumes that spark is located in /opt/spark

This notebook also relies on [Spark-CSV](https://github.com/databricks/spark-csv) package developed by DataBricks.

## Setting up Spark

I use manual setup from the plain vanilla python kernel:

In [1]:
import sys
import os
import pandas as pd
import numpy as np
#sys.path.append('/opt/spark/python')
#os.environ['SPARK_HOME']='/opt/spark'
#os.environ['HADOOP_CONF_DIR']='/etc/hadoop/conf'
#os.environ['HIVE_CONF_DIR']='/etc/hive/conf'
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[8] --packages com.databricks:spark-csv_2.11:1.4.0 pyspark-shell'

In [2]:
sys.path

[u'/tmp/spark-c89bc58e-4f18-4c4b-89ac-21e276102e58/userFiles-5344e583-c1dc-41cc-9230-6f0ac9ad9a91',
 '',
 '/home/jiahong/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip',
 '/home/jiahong/spark-2.1.0-bin-hadoop2.7/python',
 '/home/jiahong/Expedia-kaggle',
 '/home/jiahong/anaconda2/lib/python27.zip',
 '/home/jiahong/anaconda2/lib/python2.7',
 '/home/jiahong/anaconda2/lib/python2.7/plat-linux2',
 '/home/jiahong/anaconda2/lib/python2.7/lib-tk',
 '/home/jiahong/anaconda2/lib/python2.7/lib-old',
 '/home/jiahong/anaconda2/lib/python2.7/lib-dynload',
 '/home/jiahong/anaconda2/lib/python2.7/site-packages',
 '/home/jiahong/anaconda2/lib/python2.7/site-packages/Sphinx-1.5.1-py2.7.egg',
 '/home/jiahong/anaconda2/lib/python2.7/site-packages/setuptools-27.2.0-py2.7.egg',
 '/home/jiahong/anaconda2/lib/python2.7/site-packages/IPython/extensions',
 '/home/jiahong/.ipython']

In [3]:
from pyspark import SparkContext, SparkConf, HiveContext, SQLContext

In [4]:
conf = SparkConf()
conf.setAppName('Expedia')

<pyspark.conf.SparkConf at 0x7fc3f849b710>

In [5]:
conf

<pyspark.conf.SparkConf at 0x7fc3f849b710>

In [6]:
conf.set("spark.driver.memory", "12g")
conf.set("spark.executor.memory", "8g")

<pyspark.conf.SparkConf at 0x7fc3f849b710>

In [7]:
sc

<pyspark.context.SparkContext at 0x7fc4042dfc50>

In [8]:
#sc = SparkContext(conf=conf)
sqlContext  = HiveContext(sc)

In [9]:
sqlContext

<pyspark.sql.context.HiveContext at 0x7fc3f849b5d0>

sc = SparkContext(conf=conf)
sqlContext  = HiveContext(sc)

conf = SparkConf()
"""conf.setAppName('Expedia')
conf.set("spark.driver.memory", "12g")
conf.set("spark.executor.memory", "8g")
"""
sc = SparkContext(conf=conf)
sqlContext  = HiveContext(sc)
#sqlContext  = SQLContext(sc)

You can also install a separate kernel that uses pyspark shell settings. Instruction [can be found here](http://thepowerofdata.io/configuring-jupyteripython-notebook-to-work-with-pyspark-1-4-0). Note that some library names have changed, here is my config:

```
{
 "display_name": "pySpark (Spark 1.6.1)",
 "language": "python",
 "argv": [
  "python",
  "-m",
  "IPython.kernel",
  "-f",
  "{connection_file}"
 ],
 "env": {
  "SPARK_HOME": "/opt/spark",
  "PYTHONPATH": "/opt/spark/python/:/opt/spark/python/lib/py4j-0.9-src.zip",
  "PYTHONSTARTUP": "/opt/spark/python/pyspark/shell.py",
  "PYSPARK_SUBMIT_ARGS": "--master local --packages com.databricks:spark-csv_2.11:1.4.0 pyspark-shell"
 }
}
```

### Checking contexts

In [10]:
sc.version

u'2.1.0'

In [11]:
sqlContext

<pyspark.sql.context.HiveContext at 0x7fc3f849b5d0>

## Loading data

I put files into HDFS but it is possible to read local files too, use [file:///<your filepath>]()

In [15]:
%%time
train = (sqlContext
         .read
         .format('com.databricks.spark.csv')
         .options(header='true', inferschema='true')
         .load('file:///home/jiahong/expedia_data/train.csv'))

CPU times: user 64 ms, sys: 24 ms, total: 88 ms
Wall time: 7min 2s


In [18]:
train.rdd.getNumPartitions()

31

Inferring schema is a lengthy process (takes 2.5 min on my machine). You can monitor it using Spark Monitor [address:4040]()

In [19]:
train.printSchema()

root
 |-- date_time: timestamp (nullable = true)
 |-- site_name: integer (nullable = true)
 |-- posa_continent: integer (nullable = true)
 |-- user_location_country: integer (nullable = true)
 |-- user_location_region: integer (nullable = true)
 |-- user_location_city: integer (nullable = true)
 |-- orig_destination_distance: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- is_mobile: integer (nullable = true)
 |-- is_package: integer (nullable = true)
 |-- channel: integer (nullable = true)
 |-- srch_ci: timestamp (nullable = true)
 |-- srch_co: timestamp (nullable = true)
 |-- srch_adults_cnt: integer (nullable = true)
 |-- srch_children_cnt: integer (nullable = true)
 |-- srch_rm_cnt: integer (nullable = true)
 |-- srch_destination_id: integer (nullable = true)
 |-- srch_destination_type_id: integer (nullable = true)
 |-- is_booking: integer (nullable = true)
 |-- cnt: integer (nullable = true)
 |-- hotel_continent: integer (nullable = true)
 |-- hotel_country: 

#### as you see many methods are similar to Pandas

In [20]:
train.dtypes

[('date_time', 'timestamp'),
 ('site_name', 'int'),
 ('posa_continent', 'int'),
 ('user_location_country', 'int'),
 ('user_location_region', 'int'),
 ('user_location_city', 'int'),
 ('orig_destination_distance', 'double'),
 ('user_id', 'int'),
 ('is_mobile', 'int'),
 ('is_package', 'int'),
 ('channel', 'int'),
 ('srch_ci', 'timestamp'),
 ('srch_co', 'timestamp'),
 ('srch_adults_cnt', 'int'),
 ('srch_children_cnt', 'int'),
 ('srch_rm_cnt', 'int'),
 ('srch_destination_id', 'int'),
 ('srch_destination_type_id', 'int'),
 ('is_booking', 'int'),
 ('cnt', 'int'),
 ('hotel_continent', 'int'),
 ('hotel_country', 'int'),
 ('hotel_market', 'int'),
 ('hotel_cluster', 'int')]

In [21]:
train.schema

StructType(List(StructField(date_time,TimestampType,true),StructField(site_name,IntegerType,true),StructField(posa_continent,IntegerType,true),StructField(user_location_country,IntegerType,true),StructField(user_location_region,IntegerType,true),StructField(user_location_city,IntegerType,true),StructField(orig_destination_distance,DoubleType,true),StructField(user_id,IntegerType,true),StructField(is_mobile,IntegerType,true),StructField(is_package,IntegerType,true),StructField(channel,IntegerType,true),StructField(srch_ci,TimestampType,true),StructField(srch_co,TimestampType,true),StructField(srch_adults_cnt,IntegerType,true),StructField(srch_children_cnt,IntegerType,true),StructField(srch_rm_cnt,IntegerType,true),StructField(srch_destination_id,IntegerType,true),StructField(srch_destination_type_id,IntegerType,true),StructField(is_booking,IntegerType,true),StructField(cnt,IntegerType,true),StructField(hotel_continent,IntegerType,true),StructField(hotel_country,IntegerType,true),StructF

#### Saving schema in case we need it later, so we don't run inferschema once more

In [22]:
import pickle
pickle.dump(train.schema, open('../train.schema.p','wb'))

#### actual caching happens when we first time access the data

In [23]:
train.cache()

DataFrame[date_time: timestamp, site_name: int, posa_continent: int, user_location_country: int, user_location_region: int, user_location_city: int, orig_destination_distance: double, user_id: int, is_mobile: int, is_package: int, channel: int, srch_ci: timestamp, srch_co: timestamp, srch_adults_cnt: int, srch_children_cnt: int, srch_rm_cnt: int, srch_destination_id: int, srch_destination_type_id: int, is_booking: int, cnt: int, hotel_continent: int, hotel_country: int, hotel_market: int, hotel_cluster: int]

In [24]:
%%time 
train.limit(5).toPandas()

CPU times: user 16 ms, sys: 0 ns, total: 16 ms
Wall time: 33.6 s


Unnamed: 0,date_time,site_name,posa_continent,user_location_country,user_location_region,user_location_city,orig_destination_distance,user_id,is_mobile,is_package,...,srch_children_cnt,srch_rm_cnt,srch_destination_id,srch_destination_type_id,is_booking,cnt,hotel_continent,hotel_country,hotel_market,hotel_cluster
0,2014-08-11 07:46:59,2,3,66,348,48862,2234.2641,12,0,1,...,0,1,8250,1,0,3,2,50,628,1
1,2014-08-11 08:22:12,2,3,66,348,48862,2234.2641,12,0,1,...,0,1,8250,1,1,1,2,50,628,1
2,2014-08-11 08:24:33,2,3,66,348,48862,2234.2641,12,0,0,...,0,1,8250,1,0,1,2,50,628,1
3,2014-08-09 18:05:16,2,3,66,442,35390,913.1932,93,0,0,...,0,1,14984,1,0,1,2,50,1457,80
4,2014-08-09 18:08:18,2,3,66,442,35390,913.6259,93,0,0,...,0,1,14984,1,0,1,2,50,1457,21


#### let's save train as Parquet file for later reuse

In [25]:
train.write.parquet('/home/jiahong/expedia_data/train.parquet')
train.unpersist()

DataFrame[date_time: timestamp, site_name: int, posa_continent: int, user_location_country: int, user_location_region: int, user_location_city: int, orig_destination_distance: double, user_id: int, is_mobile: int, is_package: int, channel: int, srch_ci: timestamp, srch_co: timestamp, srch_adults_cnt: int, srch_children_cnt: int, srch_rm_cnt: int, srch_destination_id: int, srch_destination_type_id: int, is_booking: int, cnt: int, hotel_continent: int, hotel_country: int, hotel_market: int, hotel_cluster: int]

In [12]:
train = sqlContext.read.load('/home/jiahong/expedia_data/train.parquet')
train.cache()

DataFrame[date_time: timestamp, site_name: int, posa_continent: int, user_location_country: int, user_location_region: int, user_location_city: int, orig_destination_distance: double, user_id: int, is_mobile: int, is_package: int, channel: int, srch_ci: timestamp, srch_co: timestamp, srch_adults_cnt: int, srch_children_cnt: int, srch_rm_cnt: int, srch_destination_id: int, srch_destination_type_id: int, is_booking: int, cnt: int, hotel_continent: int, hotel_country: int, hotel_market: int, hotel_cluster: int]

In [13]:
train.unpersist()

DataFrame[date_time: timestamp, site_name: int, posa_continent: int, user_location_country: int, user_location_region: int, user_location_city: int, orig_destination_distance: double, user_id: int, is_mobile: int, is_package: int, channel: int, srch_ci: timestamp, srch_co: timestamp, srch_adults_cnt: int, srch_children_cnt: int, srch_rm_cnt: int, srch_destination_id: int, srch_destination_type_id: int, is_booking: int, cnt: int, hotel_continent: int, hotel_country: int, hotel_market: int, hotel_cluster: int]

In [14]:
%%time
train.limit(5).toPandas()

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 697 ms


Unnamed: 0,date_time,site_name,posa_continent,user_location_country,user_location_region,user_location_city,orig_destination_distance,user_id,is_mobile,is_package,...,srch_children_cnt,srch_rm_cnt,srch_destination_id,srch_destination_type_id,is_booking,cnt,hotel_continent,hotel_country,hotel_market,hotel_cluster
0,2014-05-19 06:34:45,2,3,66,442,30104,5170.5864,1003865,0,0,...,0,1,8788,1,0,1,6,77,2,11
1,2014-05-19 06:55:14,2,3,66,442,30104,5118.3778,1003865,0,0,...,0,1,8741,1,0,1,6,144,13,5
2,2014-05-27 01:31:49,2,3,55,12,40448,,1003865,0,0,...,0,1,8741,1,0,1,6,144,13,36
3,2014-05-27 01:32:29,2,3,55,12,40448,,1003865,0,0,...,0,1,8741,1,0,1,6,144,13,2
4,2014-05-27 01:38:39,2,3,55,12,40448,,1003865,0,0,...,0,1,8741,1,0,2,6,144,13,46


#### Now reading takes half as much time - 1.2 min vs 2.6 min and it is much better compressed:

In [15]:
!hadoop fs -du -h /home/jiahong/expedia_data/input

/bin/sh: 1: hadoop: not found


## Now let's get to work 

Our plan is simple - we will build 3 aggregates with Spark SQL, then we will merge them together leaving only 5 most important rows. We will use windowed analytic functions to achieve this

In [16]:
from pyspark.sql.functions import desc, row_number

In [17]:
from pyspark.sql.functions import  dense_rank

In [18]:
import pyspark.sql.types as typs
from pyspark.sql import Window
from pyspark.sql.functions import desc

In [19]:
w1 = Window.partitionBy("user_location_city", "orig_destination_distance").orderBy(desc('cnt'))

#### An aggregate of hotel clusters aggregated by user_location_city and orig_destination_distance, then assigned ranks by the count

In [20]:
agg_ulc_odd_hc = (train
                  # omitting empty orig_destination_distance
                  .filter(train['orig_destination_distance'].isNotNull())
                  # changing the datatype to integer as I *think* join on integer should be faster
                  # though this is something to be tested
                  .withColumn('orig_destination_distance',
                              ( train['orig_destination_distance']*1e5).cast(typs.IntegerType()))
                  .select(['user_location_city', 'orig_destination_distance', 'hotel_cluster', 'is_booking'])
                  .groupBy('user_location_city', 'orig_destination_distance', 'hotel_cluster')
                  .count()
                  #rename count to cnt
                  .withColumnRenamed('count', 'cnt')
                  #windowed aggregate - assigns numbers in order of descending count
                  .select('*', row_number().over(w1).alias('rn'))
                  .filter('rn <= 5')
                  )

In [21]:
#agg_ulc_odd_hc.cache()

In [22]:
%%time
agg_ulc_odd_hc.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 25.6 s


10105676

In [23]:
%%time
agg_ulc_odd_hc.limit(10).toPandas()

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 12.9 s


Unnamed: 0,user_location_city,orig_destination_distance,hotel_cluster,cnt,rn
0,0,3655430,18,1,1
1,0,5646600,23,2,1
2,0,11296200,9,2,1
3,0,689798510,17,2,1
4,0,791848730,57,1,1
5,3,54330160,25,5,1
6,3,54330160,82,2,2
7,3,57978850,11,3,1
8,3,58088210,86,1,1
9,3,67399560,30,1,1


In [24]:
train.filter(train['srch_destination_id'].isNull()).count()

0

In [25]:
train.filter(train['hotel_country'].isNull()).count()

0

In [26]:
train.filter(train['hotel_market'].isNull()).count()

0

In [27]:
formula1 = 3+17*train['is_booking']
formula2 = 2+5*train['is_booking']

In [28]:
w2 = Window.partitionBy('srch_destination_id', 'hotel_country', 'hotel_market').orderBy(desc('sum_wb'))

In [29]:
agg_best_search_dest_ctry = (train
                        .filter('year(date_time) = 2014')
                        .select('srch_destination_id', 'hotel_country', 'hotel_market', 'hotel_cluster', (formula1).alias('wb'))
                        .groupby('srch_destination_id', 'hotel_country', 'hotel_market', 'hotel_cluster')
                        .sum('wb')
                        .withColumnRenamed('sum(wb)', 'sum_wb')
                        .orderBy(desc('sum_wb'))
                        .select('*', row_number().over(w2).alias("rn"))
                        .filter('rn <= 5')
                        )

In [30]:
#agg_best_search_dest_ctry.cache()

In [31]:
%%time
agg_best_search_dest_ctry.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 11.2 s


174673

In [32]:
agg_best_search_dest_ctry.limit(20).toPandas()

Unnamed: 0,srch_destination_id,hotel_country,hotel_market,hotel_cluster,sum_wb,rn
0,43,50,429,76,53,1
1,43,50,429,60,3,2
2,698,130,1384,36,176,1
3,698,130,1384,62,162,2
4,698,130,1384,43,41,3
5,698,130,1384,61,33,4
6,698,130,1384,14,32,5
7,1220,50,694,33,6,1
8,1220,50,694,13,3,2
9,1220,50,694,14,3,3


In [33]:
w3 = Window.partitionBy('srch_destination_id').orderBy(desc('sum_wb'))

In [34]:
agg_best_search_dest_2 = (train
                        .select('srch_destination_id',  'hotel_cluster', (formula1).alias('wb'))
                        .groupby('srch_destination_id', 'hotel_cluster')
                        .sum('wb')
                        .withColumnRenamed('sum(wb)', 'sum_wb')
                        .orderBy(desc('sum_wb'))
                        .select('*', row_number().over(w3).alias("rn"))
                        .filter('rn <= 5'))
#agg_best_search_dest_2.cache()

In [35]:
%%time
agg_best_search_dest_2.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 8.35 s


184373

In [36]:
agg_best_search_dest_2.limit(20).toPandas()

Unnamed: 0,srch_destination_id,hotel_cluster,sum_wb,rn
0,148,6,10105,1
1,148,59,7579,2
2,148,91,5277,3
3,148,16,4958,4
4,148,47,4885,5
5,463,30,6,1
6,463,78,6,2
7,463,36,3,3
8,463,62,3,4
9,463,3,3,5


In [37]:
agg_popular_hotel_cluster = (train
                            .select('hotel_cluster', (formula2).alias('wb'))
                            .groupby('hotel_cluster')
                            .sum('wb')
                            .orderBy(desc('sum(wb)'))
                            .withColumnRenamed('sum(wb)', 'sum_wb'))
#agg_popular_hotel_cluster.cache()

In [38]:
%%time
agg_popular_hotel_cluster.count()

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 2.76 s


100

In [39]:
agg_popular_hotel_cluster.limit(20).toPandas()

Unnamed: 0,hotel_cluster,sum_wb
0,91,2692300
1,48,1934951
2,41,1794361
3,64,1649553
4,59,1473627
5,42,1468825
6,5,1454058
7,65,1449310
8,98,1423161
9,18,1368863


In [40]:
%%time
test = (sqlContext
         .read
         .format('com.databricks.spark.csv')
         .options(header='true', inferschema='true')
         .load('file:///home/jiahong/expedia_data/test.csv'))

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 26.2 s


In [None]:
%%time
test.write.parquet('/home/jiahong/expedia_data/test.parquet')

#### for the next time we can load like this

In [42]:
%%time
test = sqlContext.read.load('/home/jiahong/expedia_data/test.parquet')

CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 163 ms


In [43]:
test = (test.withColumn('orig_destination_distance', (test['orig_destination_distance']*1e5).cast(typs.IntegerType())))
#test.cache()
test.limit(10).toPandas()

Unnamed: 0,id,date_time,site_name,posa_continent,user_location_country,user_location_region,user_location_city,orig_destination_distance,user_id,is_mobile,...,srch_ci,srch_co,srch_adults_cnt,srch_children_cnt,srch_rm_cnt,srch_destination_id,srch_destination_type_id,hotel_continent,hotel_country,hotel_market
0,0,2015-09-03 17:09:54,2,3,66,174,37449,553905670,1,1,...,2016-05-19,2016-05-23,2,0,1,12243,6,6,204,27
1,1,2015-09-24 17:38:35,2,3,66,174,37449,587329230,1,1,...,2016-05-12,2016-05-15,2,0,1,14474,7,6,204,1540
2,2,2015-06-07 15:53:02,2,3,66,142,17440,397597760,20,0,...,2015-07-26,2015-07-27,4,0,1,11353,1,2,50,699
3,3,2015-09-14 14:49:10,2,3,66,258,34156,150859750,28,0,...,2015-09-14,2015-09-16,2,0,1,8250,1,2,50,628
4,4,2015-07-17 09:32:04,2,3,66,467,36345,6679130,50,0,...,2015-07-22,2015-07-23,2,0,1,11812,1,2,50,538
5,5,2015-07-21 11:58:45,2,3,66,311,48189,35985210,51,0,...,2015-07-22,2015-07-24,4,0,2,11827,1,2,50,447
6,6,2015-07-29 07:58:39,2,3,66,311,48189,23734650,51,0,...,2015-08-02,2015-08-03,2,0,1,8271,1,2,50,696
7,7,2015-08-01 20:13:15,2,3,66,348,24811,21657850,51,0,...,2015-08-03,2015-08-04,2,0,1,8291,1,2,50,191
8,8,2015-11-07 12:29:09,2,3,66,311,48189,233767540,51,0,...,2015-12-30,2015-12-31,2,0,1,8250,1,2,50,628
9,9,2015-11-08 16:21:37,2,3,66,311,48189,253979950,51,0,...,2016-01-02,2016-01-03,2,0,1,9145,1,2,50,364


#### Some stats about test

In [44]:
test.count()

2528243

#### all ids are unique

In [45]:
n_test_ids = test.select('id').distinct().count()
n_test_ids

2528243

In [46]:
test.groupby('srch_destination_id').count().orderBy(desc('count')).limit(5).toPandas()

Unnamed: 0,srch_destination_id,count
0,8250,62718
1,8267,41098
2,8279,23141
3,8745,21162
4,8268,20906


#### Joining first aggregate that represents data leak

In [47]:
test_join_1 = (test.join(agg_ulc_odd_hc, ['user_location_city', 'orig_destination_distance'], 'inner')
                   .select('id', 'hotel_cluster', 'rn')
                   .orderBy('id', 'rn')
              )
#test_join_1.cache()

In [48]:
%%time
test_join_1.limit(10).toPandas()

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 22.3 s


Unnamed: 0,id,hotel_cluster,rn
0,2,91,1
1,3,1,1
2,4,51,1
3,4,50,2
4,8,88,1
5,25,98,1
6,25,95,2
7,26,31,1
8,27,39,1
9,29,71,1


#### How many IDs are matched by the first join

In [49]:
n_ids_j1 = test_join_1.select('id').distinct().count()
n_ids_j1

852711

#### Percentage of test records affected by dataleak -  1/3rd

In [50]:
n_ids_j1 / n_test_ids * 100

0

#### joining 2nd aggregate

In [51]:
test_join_2 = (test.join(agg_best_search_dest_ctry, ['srch_destination_id', 'hotel_country', 'hotel_market'], 'inner')
                   .select('id', 'hotel_cluster', (10*agg_best_search_dest_ctry['rn']).alias('rn'))
                   .orderBy('id', 'rn')
              )
#test_join_2.cache()

In [52]:
test_join_2.count()

12326935

#### Almost all test ids have some match with the second aggreagate

In [53]:
n_ids_j2 = test_join_2.select('id').distinct().count()
n_ids_j2

2510819

#### Majority of IDs have 5 or more matches 

In [54]:
test_join_2.groupby('id').count().groupby('count').count().toPandas()

Unnamed: 0,count,count.1
0,5,2415057
1,1,20625
2,3,23621
3,2,22951
4,4,28565


#### a quick look:

In [55]:
test_join_2.limit(10).toPandas()

Unnamed: 0,id,hotel_cluster,rn
0,0,5,10
1,0,37,20
2,0,55,30
3,0,22,40
4,0,11,50
5,1,5,10
6,2,0,10
7,2,31,20
8,2,96,30
9,2,91,40


#### joining with 3rd aggregate

In [56]:
test_join_3 = (test.join(agg_best_search_dest_2, ['srch_destination_id'])
                   .select('id', 'hotel_cluster',  (100*agg_best_search_dest_2['rn']).alias('rn'))
                   .orderBy('id', 'rn')
              )
#test_join_3.cache()

In [57]:
%%time
test_join_3.count()

CPU times: user 0 ns, sys: 4 ms, total: 4 ms
Wall time: 20.7 s


12380153

#### how many ids matched

In [58]:
n_ids_3 = test_join_3.select('id').distinct().count()
n_ids_3

2514207

In [59]:
test_join_3.limit(10).toPandas()

Unnamed: 0,id,hotel_cluster,rn
0,0,5,100
1,0,37,200
2,0,55,300
3,0,11,400
4,0,22,500
5,1,5,100
6,2,0,100
7,2,31,200
8,2,96,300
9,2,91,400


#### looking at the IDs that didnot match any of the aggregates

In [60]:
not_matched_ids = (test.select('id')
                     .subtract(test_join_1.select('id').distinct())
                     .subtract(test_join_2.select('id').distinct())
                     .subtract(test_join_3.select('id').distinct())
                     )
#not_matched_ids.cache()

In [61]:
not_matched_ids.count()

11960

#### a glimpse at those ids that have not matched

In [62]:
pd.options.display.max_columns = 500
test.join(not_matched_ids, on = 'id', how='left_semi').limit(20).toPandas()

Unnamed: 0,id,date_time,site_name,posa_continent,user_location_country,user_location_region,user_location_city,orig_destination_distance,user_id,is_mobile,is_package,channel,srch_ci,srch_co,srch_adults_cnt,srch_children_cnt,srch_rm_cnt,srch_destination_id,srch_destination_type_id,hotel_continent,hotel_country,hotel_market
0,286,2015-09-13 08:37:02,2,3,215,780,38000,,1038,0,0,10,2015-11-05,2015-11-06,2,0,1,65671,6,3,106,781
1,357,2015-08-30 13:54:04,24,2,3,51,9527,,1243,0,0,5,2015-12-19,2015-12-21,1,0,1,13679,4,3,126,232
2,445,2015-06-01 01:12:57,28,1,66,442,18617,,1462,0,0,10,2015-05-31,2015-06-01,2,0,1,44373,6,2,50,1654
3,458,2015-06-21 01:22:42,28,1,66,442,467,,1462,0,0,1,2015-06-20,2015-06-21,2,0,1,65106,6,2,50,1640
4,627,2015-09-14 19:23:45,24,2,167,51,11261,,1939,0,0,10,2015-10-27,2015-10-28,2,0,1,51983,7,3,106,160
5,748,2015-12-01 11:58:37,11,3,205,354,13320,7883610.0,2471,0,0,0,2015-12-01,2015-12-02,2,0,1,41557,6,2,198,395
6,913,2015-11-02 15:59:39,24,2,3,50,5224,,3011,0,0,5,2016-01-14,2016-01-15,5,0,5,65450,5,0,63,1687
7,1055,2015-07-24 12:19:40,8,4,77,824,22219,3914870.0,3373,0,0,2,2015-12-13,2015-12-14,2,0,1,34219,4,0,63,1473
8,1110,2015-04-18 11:41:56,2,3,66,174,28950,596498030.0,3606,0,0,9,2015-06-02,2015-06-03,2,0,1,18380,7,6,105,892
9,1222,2015-06-10 16:42:24,28,1,68,480,2861,,3962,0,0,10,2015-10-14,2015-10-15,4,0,2,58246,4,5,39,803


#### generating reminder

In [63]:
not_matched_ids.limit(5).show()

+-----+
|   id|
+-----+
|15957|
|20497|
|26708|
|47084|
|65867|
+-----+



In [64]:
not_matched_ids.count()

11960

In [65]:
agg_popular_hotel_cluster.limit(5).show()

+-------------+-------+
|hotel_cluster| sum_wb|
+-------------+-------+
|           91|2692300|
|           48|1934951|
|           41|1794361|
|           64|1649553|
|           59|1473627|
+-------------+-------+



In [66]:
agg_popular_hotel_cluster.count()

100

In [67]:
test_remainder = not_matched_ids.join(agg_popular_hotel_cluster).selectExpr('id', 'hotel_cluster', '999 as rn')
test_remainder.limit(10).toPandas()

AnalysisException: u'Detected cartesian product for INNER join between logical plans\nAggregate [id#536], [id#536]\n+- Join LeftAnti, (id#536 <=> id#909)\n   :- Aggregate [id#536], [id#536]\n   :  +- Join LeftAnti, (id#536 <=> id#882)\n   :     :- Aggregate [id#536], [id#536]\n   :     :  +- Join LeftAnti, (id#536 <=> id#855)\n   :     :     :- Project [id#536]\n   :     :     :  +- Relation[id#536,date_time#537,site_name#538,posa_continent#539,user_location_country#540,user_location_region#541,user_location_city#542,orig_destination_distance#543,user_id#544,is_mobile#545,is_package#546,channel#547,srch_ci#548,srch_co#549,srch_adults_cnt#550,srch_children_cnt#551,srch_rm_cnt#552,srch_destination_id#553,srch_destination_type_id#554,hotel_continent#555,hotel_country#556,hotel_market#557] parquet\n   :     :     +- Aggregate [id#855], [id#855]\n   :     :        +- Project [id#855]\n   :     :           +- Sort [id#855 ASC NULLS FIRST, rn#221 ASC NULLS FIRST], true\n   :     :              +- Project [id#855, rn#221]\n   :     :                 +- Join Inner, ((user_location_city#861 = user_location_city#5) && (orig_destination_distance#581 = orig_destination_distance#171))\n   :     :                    :- Project [id#855, user_location_city#861, cast((orig_destination_distance#862 * 100000.0) as int) AS orig_destination_distance#581]\n   :     :                    :  +- Filter (isnotnull(orig_destination_distance#862) && (isnotnull(cast((orig_destination_distance#862 * 100000.0) as int)) && isnotnull(user_location_city#861)))\n   :     :                    :     +- Relation[id#855,date_time#856,site_name#857,posa_continent#858,user_location_country#859,user_location_region#860,user_location_city#861,orig_destination_distance#862,user_id#863,is_mobile#864,is_package#865,channel#866,srch_ci#867,srch_co#868,srch_adults_cnt#869,srch_children_cnt#870,srch_rm_cnt#871,srch_destination_id#872,srch_destination_type_id#873,hotel_continent#874,hotel_country#875,hotel_market#876] parquet\n   :     :                    +- Project [user_location_city#5, orig_destination_distance#171, rn#221]\n   :     :                       +- Filter (isnotnull(rn#221) && (rn#221 <= 5))\n   :     :                          +- Window [row_number() windowspecdefinition(user_location_city#5, orig_destination_distance#171, cnt#214L DESC NULLS LAST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#221], [user_location_city#5, orig_destination_distance#171], [cnt#214L DESC NULLS LAST]\n   :     :                             +- Aggregate [user_location_city#5, orig_destination_distance#171, hotel_cluster#23], [user_location_city#5, orig_destination_distance#171, count(1) AS cnt#214L]\n   :     :                                +- Project [user_location_city#5, cast((orig_destination_distance#6 * 100000.0) as int) AS orig_destination_distance#171, hotel_cluster#23]\n   :     :                                   +- Filter (isnotnull(orig_destination_distance#6) && (isnotnull(user_location_city#5) && isnotnull(cast((orig_destination_distance#6 * 100000.0) as int))))\n   :     :                                      +- Relation[date_time#0,site_name#1,posa_continent#2,user_location_country#3,user_location_region#4,user_location_city#5,orig_destination_distance#6,user_id#7,is_mobile#8,is_package#9,channel#10,srch_ci#11,srch_co#12,srch_adults_cnt#13,srch_children_cnt#14,srch_rm_cnt#15,srch_destination_id#16,srch_destination_type_id#17,is_booking#18,cnt#19,hotel_continent#20,hotel_country#21,hotel_market#22,hotel_cluster#23] parquet\n   :     +- Aggregate [id#882], [id#882]\n   :        +- Project [id#882]\n   :           +- Sort [id#882 ASC NULLS FIRST, rn#747 ASC NULLS FIRST], true\n   :              +- Project [id#882, (rn#371 * 10) AS rn#747]\n   :                 +- Join Inner, (((srch_destination_id#899 = srch_destination_id#16) && (hotel_country#902 = hotel_country#21)) && (hotel_market#903 = hotel_market#22))\n   :                    :- Project [id#882, srch_destination_id#899, hotel_country#902, hotel_market#903]\n   :                    :  +- Filter ((isnotnull(hotel_country#902) && isnotnull(hotel_market#903)) && isnotnull(srch_destination_id#899))\n   :                    :     +- Relation[id#882,date_time#883,site_name#884,posa_continent#885,user_location_country#886,user_location_region#887,user_location_city#888,orig_destination_distance#889,user_id#890,is_mobile#891,is_package#892,channel#893,srch_ci#894,srch_co#895,srch_adults_cnt#896,srch_children_cnt#897,srch_rm_cnt#898,srch_destination_id#899,srch_destination_type_id#900,hotel_continent#901,hotel_country#902,hotel_market#903] parquet\n   :                    +- Project [srch_destination_id#16, hotel_country#21, hotel_market#22, rn#371]\n   :                       +- Filter (isnotnull(rn#371) && (rn#371 <= 5))\n   :                          +- Window [row_number() windowspecdefinition(srch_destination_id#16, hotel_country#21, hotel_market#22, sum_wb#362L DESC NULLS LAST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#371], [srch_destination_id#16, hotel_country#21, hotel_market#22], [sum_wb#362L DESC NULLS LAST]\n   :                             +- Sort [sum_wb#362L DESC NULLS LAST], true\n   :                                +- Aggregate [srch_destination_id#16, hotel_country#21, hotel_market#22, hotel_cluster#23], [srch_destination_id#16, hotel_country#21, hotel_market#22, sum(cast(wb#341 as bigint)) AS sum_wb#362L]\n   :                                   +- Project [srch_destination_id#16, hotel_country#21, hotel_market#22, hotel_cluster#23, ((is_booking#18 * 17) + 3) AS wb#341]\n   :                                      +- Filter ((year(cast(date_time#0 as date)) = 2014) && ((isnotnull(hotel_market#22) && isnotnull(srch_destination_id#16)) && isnotnull(hotel_country#21)))\n   :                                         +- Relation[date_time#0,site_name#1,posa_continent#2,user_location_country#3,user_location_region#4,user_location_city#5,orig_destination_distance#6,user_id#7,is_mobile#8,is_package#9,channel#10,srch_ci#11,srch_co#12,srch_adults_cnt#13,srch_children_cnt#14,srch_rm_cnt#15,srch_destination_id#16,srch_destination_type_id#17,is_booking#18,cnt#19,hotel_continent#20,hotel_country#21,hotel_market#22,hotel_cluster#23] parquet\n   +- Aggregate [id#909], [id#909]\n      +- Project [id#909]\n         +- Sort [id#909 ASC NULLS FIRST, rn#822 ASC NULLS FIRST], true\n            +- Project [id#909, (rn#418 * 100) AS rn#822]\n               +- Join Inner, (srch_destination_id#926 = srch_destination_id#16)\n                  :- Project [id#909, srch_destination_id#926]\n                  :  +- Filter isnotnull(srch_destination_id#926)\n                  :     +- Relation[id#909,date_time#910,site_name#911,posa_continent#912,user_location_country#913,user_location_region#914,user_location_city#915,orig_destination_distance#916,user_id#917,is_mobile#918,is_package#919,channel#920,srch_ci#921,srch_co#922,srch_adults_cnt#923,srch_children_cnt#924,srch_rm_cnt#925,srch_destination_id#926,srch_destination_type_id#927,hotel_continent#928,hotel_country#929,hotel_market#930] parquet\n                  +- Project [srch_destination_id#16, rn#418]\n                     +- Filter (isnotnull(rn#418) && (rn#418 <= 5))\n                        +- Window [row_number() windowspecdefinition(srch_destination_id#16, sum_wb#411L DESC NULLS LAST, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn#418], [srch_destination_id#16], [sum_wb#411L DESC NULLS LAST]\n                           +- Sort [sum_wb#411L DESC NULLS LAST], true\n                              +- Aggregate [srch_destination_id#16, hotel_cluster#23], [srch_destination_id#16, sum(cast(wb#396 as bigint)) AS sum_wb#411L]\n                                 +- Project [srch_destination_id#16, hotel_cluster#23, ((is_booking#18 * 17) + 3) AS wb#396]\n                                    +- Filter isnotnull(srch_destination_id#16)\n                                       +- Relation[date_time#0,site_name#1,posa_continent#2,user_location_country#3,user_location_region#4,user_location_city#5,orig_destination_distance#6,user_id#7,is_mobile#8,is_package#9,channel#10,srch_ci#11,srch_co#12,srch_adults_cnt#13,srch_children_cnt#14,srch_rm_cnt#15,srch_destination_id#16,srch_destination_type_id#17,is_booking#18,cnt#19,hotel_continent#20,hotel_country#21,hotel_market#22,hotel_cluster#23] parquet\nand\nProject [hotel_cluster#23]\n+- Sort [sum(wb)#447L DESC NULLS LAST], true\n   +- Aggregate [hotel_cluster#23], [hotel_cluster#23, sum(cast(wb#439 as bigint)) AS sum(wb)#447L]\n      +- Project [hotel_cluster#23, ((is_booking#18 * 5) + 2) AS wb#439]\n         +- Relation[date_time#0,site_name#1,posa_continent#2,user_location_country#3,user_location_region#4,user_location_city#5,orig_destination_distance#6,user_id#7,is_mobile#8,is_package#9,channel#10,srch_ci#11,srch_co#12,srch_adults_cnt#13,srch_children_cnt#14,srch_rm_cnt#15,srch_destination_id#16,srch_destination_type_id#17,is_booking#18,cnt#19,hotel_continent#20,hotel_country#21,hotel_market#22,hotel_cluster#23] parquet\nJoin condition is missing or trivial.\nUse the CROSS JOIN syntax to allow cartesian products between these relations.;'

In [None]:
test_remainder = not_matched_ids.join(agg_popular_hotel_cluster.limit(5)).selectExpr('id', 'hotel_cluster', '999 as rn')
test_remainder.limit(10).toPandas()

#### Now merging all the records together

In [69]:
w4 = Window.partitionBy('id').orderBy('rn')
test_union = (test_join_1
              .unionAll(test_join_2)
              .unionAll(test_join_3)
              #.unionAll(test_remainder)
              .select('*', row_number().over(w4).alias('rn_all'))
              .filter('rn_all <= 5')
              .orderBy('id', 'rn_all'))
#test_union.cache()

In [70]:
%%time
test_union.count()

CPU times: user 12 ms, sys: 8 ms, total: 20 ms
Wall time: 1min 43s


12506694

In [71]:
test_union.limit(10).toPandas()

Unnamed: 0,id,hotel_cluster,rn,rn_all
0,0,5,10,1
1,0,37,20,2
2,0,55,30,3
3,0,22,40,4
4,0,11,50,5
5,1,5,10,1
6,1,5,100,2
7,2,91,1,1
8,2,0,10,2
9,2,31,20,3


#### Testing how many ids  have less than 5 hotel_clusters

In [72]:
test_union.groupBy('id').count().filter('count < 5').count()

35465

In [73]:
test_union.groupBy('id').count().filter('count < 5').limit(10).toPandas()

Unnamed: 0,id,count
0,2366,4
1,33722,3
2,46994,2
3,63964,4
4,90019,1
5,118989,3
6,144991,3
7,145504,4
8,153555,2
9,155255,4


## Generating submission

### To concatenate clusters together I could not come up with the better idea than to convert the result to RDD, append most popular clusters and convert back to DataFrame

This operation is also slow. Need to think how to do it differently

In [74]:
top5_hotels = (agg_popular_hotel_cluster.limit(5).rdd.keys().collect())
top5_hotels

[91, 48, 41, 64, 59]

#### So-called "broadcast" variable - value that is transferred to all the nodes

In [75]:
top5_bc = sc.broadcast(top5_hotels)
top5_bc

<pyspark.broadcast.Broadcast at 0x7fc3f39ac8d0>

In [76]:
from pyspark.sql import Row

In [78]:
%%time
submission = (test_union
  #  .filter('id in (0,1,2)')
    .orderBy('id', 'rn_all')
    .rdd
    .map(lambda x: (x.id, [x.hotel_cluster,]))
    .reduceByKey(_ ++ _)
    .mapValues(lambda x: (x + top5_bc.value)[:5])
    .mapValues(lambda x: " ".join([str(i) for i in x]))
    .map(lambda x: Row(id = x[0], hotel_cluster = x[1]))
    .toDF()
#    .orderBy('id')
    ).toPandas()

TypeError: bad operand type for unary +: 'Broadcast'

In [None]:
submission.set_index('id', inplace=True)
submission.sort_index(inplace=True)

In [None]:
submission.shape

In [None]:
submission.head()

In [None]:
submission.to_csv('/home/jiahong/expedia_data/submissions-leakage_spark.csv')

In [358]:
!rm ../submissions/leakage_spark.csv.gz
!gzip ../submissions/leakage_spark.csv

#### The submission scores 0.49604 on Public LB, just on par with the source script

## All-in-one script
#### You can play with formula 1 and formula2 to achieve better results

In [1]:
import sys
import os
import pandas as pd
import numpy as np
sys.path.append('/opt/spark/python')
os.environ['SPARK_HOME']='/opt/spark'
os.environ['HADOOP_CONF_DIR']='/etc/hadoop/conf'
os.environ['HIVE_CONF_DIR']='/etc/hive/conf'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[8] --packages com.databricks:spark-csv_2.11:1.4.0 pyspark-shell'

from pyspark import SparkContext, SparkConf, HiveContext, SQLContext
import pyspark.sql.types as typs
from pyspark.sql import Window, Row
from pyspark.sql.functions import desc, rowNumber, denseRank
from operator import add

In [2]:
conf = SparkConf()
conf.setAppName('Expedia')
conf.set("spark.driver.memory", "12g")
conf.set("spark.executor.memory", "8g")

sc = SparkContext(conf=conf)
sqlContext  = HiveContext(sc)

In [3]:
def run_solution():
    train = sqlContext.read.load('/projects/kaggle-expedia/input/train.parquet')
    train.cache()
    
    ### changed according to best performing script
    formula1 = 3+12*train['is_booking']
    formula2 = 3+5*train['is_booking']
    
    
    w1 = Window.partitionBy("user_location_city", "orig_destination_distance").orderBy(desc('cnt'))
    ## aggregate 1
    agg_ulc_odd_hc = (train
                  # omitting empty orig_destination_distance
                  .filter(train['orig_destination_distance'].isNotNull())
                  # changing the datatype to integer as I *think* join on integer should be faster
                  # though this is something to be tested
                  .withColumn('orig_destination_distance',
                              ( train['orig_destination_distance']*1e5).cast(typs.IntegerType()))
                  .select(['user_location_city', 'orig_destination_distance', 'hotel_cluster', 'is_booking'])
                  .groupBy('user_location_city', 'orig_destination_distance', 'hotel_cluster')
                  .count()
                  #rename count to cnt
                  .withColumnRenamed('count', 'cnt')
                  #windowed aggregate - assigns numbers in order of descending count
                  .select('*', rowNumber().over(w1).alias('rn'))
                  .filter('rn <= 5')
                  )
    agg_ulc_odd_hc.cache()
    # aggregate 2
    w2 = Window.partitionBy('srch_destination_id', 'hotel_country', 'hotel_market').orderBy(desc('sum_wb'))
    agg_best_search_dest_ctry = (train
                        .filter('year(date_time) = 2014')
                        .select('srch_destination_id', 'hotel_country', 'hotel_market', 'hotel_cluster', (formula1).alias('wb'))
                        .groupby('srch_destination_id', 'hotel_country', 'hotel_market', 'hotel_cluster')
                        .sum('wb')
                        .withColumnRenamed('sum(wb)', 'sum_wb')
                        .orderBy(desc('sum_wb'))
                        .select('*', rowNumber().over(w2).alias("rn"))
                        .filter('rn <= 5')
                        )
    agg_best_search_dest_ctry.cache()
    
    # aggregate 3
    w3 = Window.partitionBy('srch_destination_id').orderBy(desc('sum_wb'))
    agg_best_search_dest_2 = (train
                        .select('srch_destination_id',  'hotel_cluster', (formula1).alias('wb'))
                        .groupby('srch_destination_id', 'hotel_cluster')
                        .sum('wb')
                        .withColumnRenamed('sum(wb)', 'sum_wb')
                        .orderBy(desc('sum_wb'))
                        .select('*', rowNumber().over(w3).alias("rn"))
                        .filter('rn <= 5'))
    agg_best_search_dest_2.cache()
    
    #most popular hotels
    agg_popular_hotel_cluster = (train
                            .select('hotel_cluster', (formula2).alias('wb'))
                            .groupby('hotel_cluster')
                            .sum('wb')
                            .orderBy(desc('sum(wb)'))
                            .withColumnRenamed('sum(wb)', 'sum_wb'))
    agg_popular_hotel_cluster.cache()
    
    # broadcasting top5 hotels
    top5_hotels = (agg_popular_hotel_cluster.limit(5).rdd.keys().collect())
    top5_bc = sc.broadcast(top5_hotels)
    
    #flushing train from cache
    train.unpersist()
    
    test = sqlContext.read.load('/projects/kaggle-expedia/input/test.parquet')
    test = (test.withColumn('orig_destination_distance', (test['orig_destination_distance']*1e5).cast(typs.IntegerType())))
    test.cache()
    
    test_join_1 = (test.join(agg_ulc_odd_hc, ['user_location_city', 'orig_destination_distance'], 'inner')
                   .select('id', 'hotel_cluster', 'rn')
                   .orderBy('id', 'rn')
              )
    test_join_2 = (test.join(agg_best_search_dest_ctry, ['srch_destination_id', 'hotel_country', 'hotel_market'], 'inner')
                   .select('id', 'hotel_cluster', (10*agg_best_search_dest_ctry['rn']).alias('rn'))
                   .orderBy('id', 'rn')
              )
    test_join_3 = (test.join(agg_best_search_dest_2, ['srch_destination_id'])
                   .select('id', 'hotel_cluster',  (100*agg_best_search_dest_2['rn']).alias('rn'))
                   .orderBy('id', 'rn')
              )
    not_matched_ids = (test.select('id')
                     .subtract(test_join_1.select('id').distinct())
                     .subtract(test_join_2.select('id').distinct())
                     .subtract(test_join_3.select('id').distinct())
                     )
    test_remainder = not_matched_ids.join(agg_popular_hotel_cluster.limit(5)).selectExpr('id', 'hotel_cluster', '999 as rn')
    # merging results together
    w4 = Window.partitionBy('id').orderBy('rn')
    test_union = (test_join_1
                  .unionAll(test_join_2)
                  .unionAll(test_join_3)
                  .unionAll(test_remainder)
                  .select('*', rowNumber().over(w4).alias('rn_all'))
                  .filter('rn_all <= 5')
                  .orderBy('id', 'rn_all'))
    

    submission = (test_union
      #  .filter('id in (0,1,2)')
        .orderBy('id', 'rn_all')
        .rdd
        .map(lambda x: (x.id, [x.hotel_cluster,]))
        .reduceByKey(add)
        .mapValues(lambda x: (x + top5_bc.value)[:5])
        .mapValues(lambda x: " ".join([str(i) for i in x]))
        .map(lambda x: Row(id = x[0], hotel_cluster = x[1]))
        .toDF()
    #    .orderBy('id')
        ).toPandas()
    
    # generating submission file
    submission.set_index('id', inplace=True)
    submission.sort_index(inplace=True)
    
    return submission

#### it takes a whole 7min of time to run the solition alltogether. I am sure this performance is not optimal. Something to work on
#### as mentioned earlier, you can monitor jobs on [spark_machine:4040]() by default

In [4]:
%%time
sub = run_solution()

CPU times: user 31.9 s, sys: 840 ms, total: 32.7 s
Wall time: 7min 48s


In [5]:
sub.head()

Unnamed: 0_level_0,hotel_cluster
id,Unnamed: 1_level_1
0,5 37 55 22 11
1,5 5 91 48 41
2,91 0 31 96 91
3,1 1 45 79 24
4,50 51 91 2 59


In [None]:
submission.to_csv('../submissions/leakage_spark.csv')

In [None]:
!rm ../submissions/leakage_spark.csv.gz
!gzip ../submissions/leakage_spark.csv