# OTA Demo
## Imports

In [None]:
import os

import pyspark.sql
from pyspark.sql import functions as F
from pyspark.sql import types as T

## Configuration
### Paths

In [None]:
# environment archive created with conda pack
ARCHIVES_PATH = "hdfs:///user/metareview/ota_demo_support/ota_demo_env.tar.gz#cluster_venv"
# take the latest date form the S3 bucket
META_REVIEW_S3_URI = "s3a://trustyou-api/meta-review/2021-02-22_23-34-19/*.jsonl.gz"
HOTEL_S3_URI = "s3a://trustyou-api/hotels/2021-02-21_00-00-00.jsonl.gz"
META_REVIEW_DUMP_PATH = "hdfs:///user/metareview/ota_demo/meta_review_dump.orc"
HOTEL_DUMP_PATH = "hdfs:///user/metareview/ota_demo/hotel_dump.orc"

### Environment Variables

In [None]:
os.environ["PYSPARK_PYTHON"] = "./cluster_venv/bin/python"

## Create Spark Session

In [None]:
spark = pyspark.sql.SparkSession.builder.master("yarn") \
        .appName("OTA Demo Sample") \
        .config("spark.executor.memory", "3g") \
        .config("spark.executor.cores", "3") \
        .config("spark.dynamicAllocation.enabled", "true") \
        .config("spark.dynamicAllocation.shuffleTracking.enabled", "true") \
        .config("spark.dynamicAllocation.maxExecutors", "4") \
        .config("spark.sql.orc.filterPushdown", "false") \
        .config("spark.sql.shuffle.partitions", "400") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY")) \
        .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_KEY")) \
        .config("spark.yarn.dist.archives", ARCHIVES_PATH) \
        .getOrCreate()

In [None]:
jdbc_url = "jdbc:postgresql://{server}:{port}/{db}?user={user}&password={pw}".format(
    server="",
    port="5432",
    db="ota_demo",
    user="ota_demo",
    pw="",
)

## Extract Meta-Review Data
### Run the dump once then after that use the cache file
#### Dump from S3, Restrict to one localisation version (here `en`) only

In [None]:
meta_review_df = spark.read.json(META_REVIEW_S3_URI, samplingRatio=0.0001, mode="FAILFAST")

In [None]:
meta_review_df = meta_review_df.where(F.col("lang") == "en")

#### Write to HDFS cache file

In [None]:
meta_review_df.write.orc(META_REVIEW_DUMP_PATH, mode="overwrite")

## Extract Hotel Data
### Run the dump once then after that use the cache file
#### Dump from S3

In [None]:
hotel_df = spark.read.json(HOTEL_S3_URI, samplingRatio=0.001, mode="FAILFAST")

#### Write to HDFS cache file

In [None]:
hotel_df.write.orc(HOTEL_DUMP_PATH, mode="overwrite")

## Read in the DataFrames from the HDFS cache file

In [None]:
meta_review_df = spark.read.orc(META_REVIEW_DUMP_PATH)

In [None]:
hotel_df = spark.read.orc(HOTEL_DUMP_PATH)

In [None]:
# TODO: Works around WIDGET-3334, remove once it is fixed
hotel_df = hotel_df.drop_duplicates(["ty_id"])

## Prepare Hotel Search Database

### Add city and coordiantes to meta-review DFs by joining with hotel DF
In practice, it's advisable to use your own hotel database instead. You could for example use the the address data in the hotel dump to map between your hotel database and our portfolio and then store the `ty_id` somewhere in your database to have a reference between them. 

In [None]:
meta_review_with_hotel_df = meta_review_df.join(hotel_df, on="ty_id").cache()

###  Flatten the data
We are aiming for a schema with columns 
* `ty_id` - the unique identifier for each property
* `trip_type` - indicating an applied filter by trip type, one of `all` (for unfiltered data), `solo`, `couple`, `family`, `business`. The filtered Meta-Reviews can be found in `trip_type_meta_review_list`.
* `language` - indicating an applied filter by language, either `all` (for unfiltered data) or a two-character ISO language code. The filtered Meta-Reviews can be found in `language_meta_review_list`.
* `city` - we got this by joining with the hotel bucket
* `country` - we got this by joining with the hotel bucket
* `latitude` - we got this by joining with the hotel bucket (in the `lat_lng` field)
* `longitude` - we got this by joining with the hotel bucket (in the `lat_lng` field)
* `datapoint` - we store here the category ID for categories from the `category_list` of for hotel_types from the `hotel_type_list` or the value `oall` for the overall datapoints from `summary`
* `score` - the score of the `datapoint`
* `review_count` - the number of reviews providing data for the `score` of the `datapoint`

That means, we have to explode the dataframe on the different lists of filtered meta-reviews and then on `category_list` and `hotel_type_list` to get the different datapoints all in separate rows. For `category_list` we additionally want to extract `sub_category_list`, treating them the same as top-level categories for this purpose.

#### Unnest the different filtered meta-reviews

In [None]:
# extract the trip type-filtered meta-reviews, only keeping the data points we are interested in
meta_review_df_tt_filter = meta_review_with_hotel_df \
    .select("ty_id", "city", "country", "lat_lng", F.explode("trip_type_meta_review_list").alias("filtered_mr")) \
    .select("ty_id", "city", "country", "lat_lng", "filtered_mr.filter.trip_type", "filtered_mr.filter.language", "filtered_mr.category_list", "filtered_mr.summary.score")

In [None]:
# extract the language-filtered meta-reviews. We don't yet remove unneeded data points because...
meta_review_df_lang_filter = meta_review_with_hotel_df \
    .select("ty_id", "city", "country", "lat_lng", F.explode("language_meta_review_list").alias("filtered_mr"))

In [None]:
# ...we first have to extract the meta-reviews which have both a trip type and a language filter
meta_review_df_both_filters = meta_review_df_lang_filter \
    .select("ty_id", "city", "country", "lat_lng", F.explode("filtered_mr.trip_type_meta_review_list").alias("filtered_mr")) \
    .select("ty_id", "city", "country", "lat_lng", "filtered_mr.filter.trip_type", "filtered_mr.filter.language", "filtered_mr.category_list", "filtered_mr.summary.score")

In [None]:
# now we also format the meta-reviews with only a language filter in the way we want
meta_review_df_lang_filter = meta_review_df_lang_filter.select("ty_id", "city", "country", "lat_lng", "filtered_mr.filter.trip_type", "filtered_mr.filter.language", "filtered_mr.category_list", "filtered_mr.summary.score")

In [None]:
# we also get the unfiltered meta-reviews into the same format
unfiltered_df_categories = meta_review_with_hotel_df.select(
    "ty_id", "city", "country", "lat_lng", F.lit("all").alias("trip_type"), F.lit("all").alias("language"), "category_list", "summary.score"
)

In [None]:
# since we have all of the dataframes with different filter combinations in the same format now, we can merge them together 
all_filters_df = meta_review_df_tt_filter \
    .union(meta_review_df_lang_filter) \
    .union(meta_review_df_both_filters) \
    .union(unfiltered_df_categories) \
    .cache()

#### Unnest the different datapoints we are interested in

In [None]:
# first, we extract the top-level categories from `category_list`
category_df_exploded = all_filters_df.select(
    "ty_id", "city", "country", "lat_lng", "trip_type", "language", F.explode("category_list").alias("category")
)

In [None]:
# next we extract the sub-level categories from the top-level categories we just got before
sub_category_df = category_df_exploded.select(
    "ty_id", "city", "country", "lat_lng", "trip_type", "language", F.explode("category.sub_category_list").alias("category")
).select(
    "ty_id", "city", "country", "lat_lng", "trip_type", "language", F.col("category.category_id").alias("datapoint"), F.col("category.score").alias("score"), F.col("category.review_count").alias("review_count")
)

In [None]:
# since a sub-category can count to multiple top-level categories, there will be duplicates in the sub category df.
# we drop them here.
distinct_sub_category_df = sub_category_df.drop_duplicates(["ty_id", "trip_type", "language", "datapoint"])

In [None]:
# assure category_df has same format as sub_category_df
category_df = category_df_exploded.select(
    "ty_id", "city", "country", "lat_lng", "trip_type", "language", F.col("category.category_id").alias("datapoint"), F.col("category.score").alias("score"), F.col("category.review_count").alias("review_count")
)

In [None]:
# since they are now in the same format, we can merge the top-level and sub-level category DFs for the next steps
combined_category_df = category_df.union(distinct_sub_category_df)

In [None]:
# next, we get the hotel type datapoints. They work in a similar way as categories, but only exist for the unfiltered MR.
hotel_type_df = meta_review_with_hotel_df.select(
    "ty_id", "city", "country", "lat_lng", F.lit("all").alias("trip_type"), F.lit("all").alias("language"), F.explode("hotel_type_list").alias("htype")
).select(
    "ty_id", "city", "country", "lat_lng", F.lit("all").alias("trip_type"), F.lit("all").alias("language"), F.col("htype.category_id").alias("datapoint"), F.col("htype.score").alias("score"), F.lit(0).alias("review_count")
)

In [None]:
# as the final datapoint, we get a dataframe that includes the overall score and put it into the right format
overall_score_df = all_filters_df.select(
     "ty_id", "city", "country", "lat_lng", "trip_type", "language", F.lit("oall").alias("datapoint"), "score", F.lit(0).alias("review_count")
)

In [None]:
# we have now collected all datapoints we are interested in. We merge the different dataframes and, as a final step,
# split `lat_lng` into two different columns
merged_df = combined_category_df.union(overall_score_df).union(hotel_type_df).select(
    F.col("ty_id"), F.col("trip_type"), F.col("language"),
    F.col("city"), F.col("country"), F.col("lat_lng").getItem(0).alias("latitude"), F.col("lat_lng").getItem(1).alias("longitude"),
    F.col("datapoint"), F.col("score").cast(T.FloatType()), F.col("review_count")
)

In [None]:
# the resulting dataframe we can now write to the PostgreSQL table. Note that this table was already created before
# which is important since it allows for a finer-grained definition of the schema than when Spark creates it.
merged_df.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "cluster_search") \
    .option("truncate", "true") \
    .option("stringtype", "unspecified") \
    .save()

### City Search Backend
#### Preparing a backend for search of the different cities. We use the hotel dump and group by city and country, counting the number of distinct clusters in each. 
Note: This is just for demo purposes. In practice, you will want to use your own hotel database instead for this.

In [None]:
grouped_hotel_df = meta_review_with_hotel_df.groupby("city", "country").agg(
    F.countDistinct(F.col("ty_id")).cast(T.ShortType()).alias("count")
)

In [None]:
grouped_hotel_df.write.mode("overwrite") \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", "city_search") \
    .option("truncate", "true") \
    .save()

## Shutdown Spark Session

In [None]:
spark.stop()