<a href="https://colab.research.google.com/github/moshimoshisama/Expedia_Analysis/blob/main/3_sample_data_prep.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook sample the data, as only takes the top3 frequent countries with all their positive click through instances and we sampled negative instances to match the number and to solve unbalanced classes. Note that a transaction must happen with a click, by taking all positive click through instances, we are taking all positive transaction instances.

# **Part I: Pyspark installation & build session**

In [1]:
!pip install pyspark py4j findspark



In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import SQLContext
from pyspark.sql.types import IntegerType

In [3]:
MAX_MEMORY = "45g"

spark = SparkSession \
    .builder \
    .appName("expedia_analytics") \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .config('spark.driver.maxResultSize', '10G')\
    .getOrCreate()

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# **Part II: data Preprocessing**

In [5]:
main = spark.read.format("csv").option("header","true").load("drive/MyDrive/expedia/source/main.tsv",sep="\t")

In [6]:
# calculate exact columns for future potential use
main = main.withColumn('year_month',main.checkin_date[0:7])
main = main.withColumn('checkin_date',F.to_timestamp(main.checkin_date,'yyyy-MM-dd'))
main = main.withColumn('year',F.year(main.checkin_date))
main = main.withColumn('month',F.month(main.checkin_date))


In [10]:
# filter out the top 3 countries data
top1 = main.filter(F.col('geo_location_country') == '1')
top2 = main.filter(F.col('geo_location_country') == '2')
top3 = main.filter(F.col('geo_location_country') == '3')

In [None]:
# show top destinations of country 1
top1.groupby("destination_id").count().sort(F.col("count").desc()).show() 

+--------------+-----+
|destination_id|count|
+--------------+-----+
|             2|48020|
|             3|33069|
|            71|24508|
|             4|18974|
|             1|16872|
|            10|16470|
|             6|15298|
|            18|14503|
|             5|14307|
|             8|12927|
|             9|12601|
|             7|11874|
|            11|11626|
|            12|11326|
|            15|11315|
|            33|10494|
|            13|10156|
|            28| 9459|
|            31| 9092|
|            16| 9077|
+--------------+-----+
only showing top 20 rows



In [None]:
# show top destinations of country 2
top2.groupby("destination_id").count().sort(F.col("count").desc()).show() 

+--------------+-----+
|destination_id|count|
+--------------+-----+
|            60| 4091|
|            69| 3567|
|            58| 3436|
|            51| 3358|
|           100| 2523|
|             2| 2357|
|           111| 2251|
|           121| 2156|
|           104| 2121|
|           163| 1872|
|           161| 1774|
|           155| 1672|
|           803| 1654|
|           165| 1654|
|           160| 1607|
|           196| 1601|
|             3| 1515|
|           232| 1333|
|           202| 1296|
|           493| 1045|
+--------------+-----+
only showing top 20 rows



In [None]:
# show top destinations of country 3
top3.groupby("destination_id").count().sort(F.col("count").desc()).show() 

+--------------+-----+
|destination_id|count|
+--------------+-----+
|            57| 2497|
|            47| 1871|
|           256|  989|
|           311|  981|
|           345|  865|
|           364|  853|
|           405|  774|
|           404|  767|
|           349|  757|
|           377|  721|
|           491|  682|
|           492|  565|
|           117|  558|
|           614|  540|
|           361|  540|
|           499|  514|
|           656|  478|
|           663|  470|
|           682|  399|
|           745|  397|
+--------------+-----+
only showing top 20 rows



In [7]:
# function to expand the impression list
def get_impr(df):
  impr = df.select(["user_id","search_id","impressions"])
  impr_test = impr.withColumn("impressions",F.explode(F.split("impressions","\\|")))
  impr_test_seperate = impr_test.withColumn("rank",F.split(F.col("impressions"),",").getItem(0))\
  .withColumn("prop_id",F.split(F.col("impressions"),",").getItem(1))\
  .withColumn("is_travel_ad",F.split(F.col("impressions"),",").getItem(2))\
  .withColumn("review_rating",F.split(F.col("impressions"),",").getItem(3))\
  .withColumn("review_count",F.split(F.col("impressions"),",").getItem(4))\
  .withColumn("star_rating",F.split(F.col("impressions"),",").getItem(5))\
  .withColumn("is_free_cancellation",F.split(F.col("impressions"),",").getItem(6))\
  .withColumn("is_drr",F.split(F.col("impressions"),",").getItem(7))\
  .withColumn("price_bucket",F.split(F.col("impressions"),",").getItem(8))\
  .withColumn("num_clicks",F.split(F.col("impressions"),",").getItem(9))\
  .withColumn("is_trans",F.split(F.col("impressions"),",").getItem(10))\
  .drop("impressions")
  impr_test_seperate = impr_test_seperate.withColumn("is_clicked",F.when(F.col('num_clicks')=='0', 0).otherwise(1))
  return df.join(impr_test_seperate,["user_id","search_id"],how='inner').distinct()

In [11]:
top1_impr = get_impr(top1)
top2_impr = get_impr(top2)
top3_impr = get_impr(top3)

Sample the data and export as csv, here we guess the fractions of sampling negative instances based on previous preliminary analysis

In [None]:
top3_sample = top3_impr.sampleBy("is_clicked",
             fractions={1:1.0,
                  0:0.01},
             seed=40)

In [None]:
top3_sample = top3_sample.toPandas()

In [None]:
top3_sample.to_csv('drive/MyDrive/expedia/source/top3_sample.csv',header=True,index=False)

In [None]:
top2_sample_0 = top2_impr.sampleBy("is_clicked",
             fractions={0:0.004},
             seed=41)

In [None]:
top2_sample_1 = top2_impr.filter(F.col('is_clicked')==1)

In [None]:
top2_sample_1 = top2_sample_1.toPandas()

In [None]:
top2_sample_1.to_csv('drive/MyDrive/expedia/source/top2_1_sample.csv',header=True,index=False)

In [None]:
top2_sample_0.coalesce(1).write.format('com.databricks.spark.csv').save('drive/MyDrive/expedia/source/top2_0_sample.csv',header = 'true')

In [None]:
top1_sample_1 = top1_impr.filter(F.col('is_clicked')==1)

In [19]:
top1_sample_0 = top1_impr.sampleBy("is_clicked",
             fractions={0:0.005},
             seed=42)

In [None]:
top1_sample_1.coalesce(1).write.format('com.databricks.spark.csv').save('drive/MyDrive/expedia/source/top1_1_sample.csv',header = 'true')

In [21]:
top1_sample_0.coalesce(1).write.format('com.databricks.spark.csv').save('drive/MyDrive/expedia/source/1_0_sample_data_test.csv',header = 'true')