# Práctica SDPD T2. Restaurantes Europeos de Tripadvisor

Autor: Marina Kurmanova

## Introducción

El trabajo de esta práctica consiste en procesar el dataset de Kaggle sobre información de restaurantes procedente de Tripadvisor en 31 ciudades europeas.

El dataset contiene 125527 entradas sobre restaurantes y en esta primera aproximación asumiremos que los datos de cada restaurante son únicos. En cuanto a datos faltantes, se conoce que estos aparecen en el dataset como NaN, sin embargo, no vamos a emplear ninguna técnica de imputación, pero en cambio tendremos en cuenta su existencia para evitar errores.

El objetivo de la práctica es realizar un estudio de los datos de restaurantes utilizando la API de programación de datos estructurados de Apache Spark haciendo consultas sobre datos bien mediante sentencias SQL o mediante operaciones con objetos DataFrame. Por tanto, en primer lugar nuestro objetivo es convertir los datos de entrada en formato csv a un dataframe.

## Preparación de datos

Comenzamos creando una sesión de Spark

In [142]:
import pyspark
from pyspark.sql import SparkSession
spark = (SparkSession.builder
    .master("local[*]")
    .config("spark.driver.cores", 1)
    .appName("Practica Restaurantes")
    .getOrCreate() )
sc = spark.sparkContext

Importamos los tipos y las funciones del paquete SQL de Pyspark para definir el schema de los datos y para realizar un tratamiento de los datos si fuese necesario. A continuación crearemos el esquema que permitirá importar los datos evitando errores y a la vez optimizando el espacio que ocupen los datos en memoria. A pesar de que Spark es un entorno de programación distribuida y orientado a Big Data, cuando los datos son grandes el impacto de sobredimensionamiento de cada elemento que puede producir un tipado incorrecto crece de forma considerable con cada dato que se cree.

El formato de los datos de entrada es csv, con lo cual todas variables se encuentran en la cabecera del fichero de datos. Realizando una inspección visual de los datos y tras varias pruebas, se obtiene el siguiente esquema:

In [143]:
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import functions as func
from pyspark.sql.functions import rank, dense_rank, split, regexp_replace, col, explode, count, desc, asc

# Name,City,Cuisine Style,Ranking,Rating,Price Range,Number of Reviews,Reviews,URL_TA,ID_TA
restSchema = StructType([
    StructField("id", IntegerType(), True), #0
    StructField("Name", StringType(), True), #1
    StructField("City", StringType(), True), #2
    StructField("CuisineStyle", StringType(), True), #3
    StructField("Ranking", DecimalType(), True), #4
    StructField("Rating", DecimalType(), True), #5
    StructField("PriceRange", StringType(), True), #6
    StructField("NumReviews", FloatType(), True), #7
    StructField("Reviews", StringType(), True), #8
    StructField("URL_TA", StringType(), True), #9
    StructField("ID_TA", StringType(), False), #10
    ])



Inicialmente se ha intentado realizar el import de la misma forma que se hizo en clase con el ejemplo de la comunidad italiana de StackExchange, es decir, importando los datos como líneas de texto a las que se les aplicaría una función lambda para preformatear los datos de manera que coincidan con el esquema que se haya definido y una vez preformateados obtener un RDD que se convertiría en DataFrame. 

Sin embargo, en algún punto de este proceso ha habido fallos por lo que se ha optado por este otro método que consiste en usar la clase *sqlContext* y la librería databricks Spark csv https://github.com/databricks/spark-csv para parseo y consultas de datos csv en Apache Spark para Spark SQL y DataFrames. Un DataFrame es equivalente a tabla relacional en Spark SQL y el método *read* permite crear uno automáticamente. En nuestro caso se le han pasado tales opciones como la indicación de que nuestros datos contienen cabeceras, que la librería no infiera un esquema sino use el que le pasamos como parámetro, que nuestros datos contienen doble comilla (en el case de las variables *Cuisine Style* y *Reviews*) y que el delimitador de csv es una coma.


In [144]:
sqlContext = SparkSession(sc)

df = (sqlContext.read.format("com.databricks.spark.csv")
        .option("header", "true")
        .option("inferSchema", "false")
        .option("delimiter", ",")
        .option("quote", "\"")
        .schema(restSchema)
        .load("./krakow-ta-restaurans-data-raw/TA_restaurants_curated.csv"))


Una vez obtenido el DataFrame hacemos una inspección de los tipos, del número de registros y del esquema de los datos. Podemos observar que el número de entradas es el que tiene nuestro dataset original y que los tipos son los que hemos predefinido con el esquema. Sin embargo, las variables *Cuisine Style* y *Reviews* presentan una cadena de caracteres y no un array, cosa que puede impedir hacer consultas sobre los datos que traen consigo esas variables.

In [145]:
print(df.count())
print(df.dtypes)
print(df.printSchema())

125527
[('id', 'int'), ('Name', 'string'), ('City', 'string'), ('CuisineStyle', 'string'), ('Ranking', 'decimal(10,0)'), ('Rating', 'decimal(10,0)'), ('PriceRange', 'string'), ('NumReviews', 'float'), ('Reviews', 'string'), ('URL_TA', 'string'), ('ID_TA', 'string')]
root
 |-- id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- CuisineStyle: string (nullable = true)
 |-- Ranking: decimal(10,0) (nullable = true)
 |-- Rating: decimal(10,0) (nullable = true)
 |-- PriceRange: string (nullable = true)
 |-- NumReviews: float (nullable = true)
 |-- Reviews: string (nullable = true)
 |-- URL_TA: string (nullable = true)
 |-- ID_TA: string (nullable = true)

None


Haremos una conversión de los tipos de las variables *Cuisine Style* y *Reviews* y comprobaremos que se ha realizado correctamente. Hacemos también una breve inspección visual de los datos.

In [146]:
df2 = df.withColumn('CuisineStyle',split(regexp_replace(col('CuisineStyle'), '\[\'|\'\]|\'',''),',').cast('array<string>'))
df3 = df2.withColumn('Reviews',split(regexp_replace(col('Reviews'), '\[\'|\'\]|\'',''),',').cast('array<string>'))
print(df3.count())
df3.na.drop()
print(df3.count())
print(df3.printSchema())
df3.show()

125527
125527
root
 |-- id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- CuisineStyle: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- Ranking: decimal(10,0) (nullable = true)
 |-- Rating: decimal(10,0) (nullable = true)
 |-- PriceRange: string (nullable = true)
 |-- NumReviews: float (nullable = true)
 |-- Reviews: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- URL_TA: string (nullable = true)
 |-- ID_TA: string (nullable = true)

None
+---+--------------------+---------+--------------------+-------+------+----------+----------+--------------------+--------------------+---------+
| id|                Name|     City|        CuisineStyle|Ranking|Rating|PriceRange|NumReviews|             Reviews|              URL_TA|    ID_TA|
+---+--------------------+---------+--------------------+-------+------+----------+----------+--------------------+--------------------+---------+
|

## Consultas

Una vez creado el DataFrame puede usarse para hacer consultas sobre él mediante varias funciones DSL (domain-specific-language) definidas en la clase DataFrame o Column.

#1. Obtener el número total de restaurantes que aparecen en la muestra en cada una de las
31 ciudades

In [147]:
df.groupBy("City").count().collect()

[Row(City='Madrid', count=9543),
 Row(City='Prague', count=4859),
 Row(City='Edinburgh', count=1865),
 Row(City='Lisbon', count=3986),
 Row(City='Stockholm', count=2705),
 Row(City='Oslo', count=1213),
 Row(City='Dublin', count=2082),
 Row(City='Berlin', count=7078),
 Row(City='London', count=18212),
 Row(City='Vienna', count=3724),
 Row(City='Paris', count=14874),
 Row(City='Hamburg', count=3131),
 Row(City='Athens', count=1938),
 Row(City='Lyon', count=2930),
 Row(City='Ljubljana', count=501),
 Row(City='Zurich', count=1667),
 Row(City='Krakow', count=1354),
 Row(City='Milan', count=6687),
 Row(City='Barcelona', count=8425),
 Row(City='Warsaw', count=2352),
 Row(City='Amsterdam', count=3434),
 Row(City='Brussels', count=3204),
 Row(City='Helsinki', count=1228),
 Row(City='Copenhagen', count=2109),
 Row(City='Oporto', count=1580),
 Row(City='Bratislava', count=1067),
 Row(City='Rome', count=5949),
 Row(City='Geneva', count=1572),
 Row(City='Luxembourg', count=657),
 Row(City='Budapest

In [148]:
df.groupBy("City").agg({"*": "count"}).collect()

[Row(City='Madrid', count(1)=9543),
 Row(City='Prague', count(1)=4859),
 Row(City='Edinburgh', count(1)=1865),
 Row(City='Lisbon', count(1)=3986),
 Row(City='Stockholm', count(1)=2705),
 Row(City='Oslo', count(1)=1213),
 Row(City='Dublin', count(1)=2082),
 Row(City='Berlin', count(1)=7078),
 Row(City='London', count(1)=18212),
 Row(City='Vienna', count(1)=3724),
 Row(City='Paris', count(1)=14874),
 Row(City='Hamburg', count(1)=3131),
 Row(City='Athens', count(1)=1938),
 Row(City='Lyon', count(1)=2930),
 Row(City='Ljubljana', count(1)=501),
 Row(City='Zurich', count(1)=1667),
 Row(City='Krakow', count(1)=1354),
 Row(City='Milan', count(1)=6687),
 Row(City='Barcelona', count(1)=8425),
 Row(City='Warsaw', count(1)=2352),
 Row(City='Amsterdam', count(1)=3434),
 Row(City='Brussels', count(1)=3204),
 Row(City='Helsinki', count(1)=1228),
 Row(City='Copenhagen', count(1)=2109),
 Row(City='Oporto', count(1)=1580),
 Row(City='Bratislava', count(1)=1067),
 Row(City='Rome', count(1)=5949),
 Row(Ci

#2. Obtener el número total de restaurantes en cada rango de precio en Barcelona.

In [149]:
df_2 = df.filter(df.City == "Barcelona").groupBy("PriceRange").count().alias('count')
df_2.collect()

[Row(PriceRange='$$$$', count=263),
 Row(PriceRange=None, count=3018),
 Row(PriceRange='$$ - $$$', count=3680),
 Row(PriceRange='$', count=1464)]

#3. Obtener nombre, ID y URL de los 5 restaurantes que tienen el mayor número de reviews
en las ciudades: Amsterdam, Berlín, Lisboa, Viena y Zurich.

In [150]:
from pyspark.sql.window import Window

window = Window.partitionBy(df['City']).orderBy(df['NumReviews'].desc())

((df.filter(df.City.isin("Amsterdam", "Berlin", "Lisbon", "Vienna", "Zurich")))
            .select(df['id'], 
                    df['City'], 
                    df['Name'], 
                    df['URL_TA'], 
                    rank().over(window).alias('rank')).filter(col('rank') <= 5).show(25, truncate = 35)
)


+---+---------+----------------------------+-----------------------------------+----+
| id|     City|                        Name|                             URL_TA|rank|
+---+---------+----------------------------+-----------------------------------+----+
|111|   Lisbon|           Cervejaria Ramiro|/Restaurant_Review-g189158-d2703...|   1|
|157|   Lisbon|         Solar dos Presuntos|/Restaurant_Review-g189158-d1058...|   2|
|122|   Lisbon|             Time Out Market|/Restaurant_Review-g189158-d1076...|   3|
| 76|   Lisbon|      Restaurante Sacramento|/Restaurant_Review-g189158-d1124...|   4|
|183|   Lisbon|       Hard Rock Cafe Lisboa|/Restaurant_Review-g189158-d1047...|   5|
|137|   Berlin|      Hofbrau Munchen Berlin|/Restaurant_Review-g187323-d3208...|   1|
|180|   Berlin|Augustiner am Gendarmenmarkt|/Restaurant_Review-g187323-d2005...|   2|
|  9|   Berlin|               Burgermeister|/Restaurant_Review-g187323-d9470...|   3|
| 40|   Berlin|     Mustafa's Gemuese Kebab|/Restauran

#4. Obtener los 5 estilos de cocina más frecuentes en los restaurantes de las ciudades:
Amsterdam, Berlín, Lisboa, Viena y Zurich.

Lo primero, he probado a responder a la pregunta para los restaurantes únicamente de Amsterdam.

In [151]:
df4 = ((df3.filter(df3.City.isin("Amsterdam")))
            .select(df3['City'], 
                    df3['Name'], 
                    explode(df3.CuisineStyle).alias("CuisineUniqueStyle"))
)

w = Window.partitionBy('CuisineUniqueStyle')
df5 = (df4.select('City','CuisineUniqueStyle', 
                 func.count('CuisineUniqueStyle').over(w).alias('n'))
        .dropDuplicates(["CuisineUniqueStyle", "n"])
        .sort(desc("n"))
      )

w = Window.partitionBy('City').orderBy(df5['n'].desc())
(df5.select(df5['City'], 
            df5['CuisineUniqueStyle'],
            dense_rank().over(w).alias('rank')).
     filter(col('rank') <= 5)
     .show(truncate=35)
)

+---------+--------------------+----+
|     City|  CuisineUniqueStyle|rank|
+---------+--------------------+----+
|Amsterdam|            European|   1|
|Amsterdam| Vegetarian Friendly|   2|
|Amsterdam|               Dutch|   3|
|Amsterdam|       Vegan Options|   4|
|Amsterdam| Gluten Free Options|   5|
+---------+--------------------+----+



Lo siguiente he procedido a realizar la consulta por ventanas de las 5 ciudades, pero me di cuenta de que al hacer dropDuplicates en el segundo paso, no lo hace por "City", "CuisineUniqueStyle", "n" sino probablemente por "CuisineUniqueStyle", "n" y por tanto me salen que en todas las ciudades los top 5 cocinas son las mismas. Es decir, en realidad obtengo las 5 top cocinas entre las 5 ciudades, cuando mi objetivo era mostrar el resultado en cada ciudad.

Por tanto he modificado la consulta para que el count lo haga agrupando en dos niveles, *City* y *CuisineUniqueStyle*, y luego ya hago el rank en ventana de cada ciudad, ordenando los valores en orden descendente según lo pide la función de *rank*.

La consulta se realiza siguiendo el siguiente orden de los pasos. Cada paso se realiza sobre el DataFrame creado en el paso anterior.

- Crear un DataFrame que contenga info de las 5 ciudades de interés separando los datos del array de la columna CuisineStyle en strings separados y creando una fila por cada estilo de cocina.
- Crear un DataFrame obtenido haciendo un count de valores de estilos de cocina dentro de cada una de las 5 ciudades
- Crear una ventana por ciudad ordenando el valor de count obtenido en orden descendente.
- Seleccionar los 5 primeros valores de cada ventana que coinciden con los 5 restaurantes que tienen en mayor número de counts, es decir que tienen los 5 estilos más frecuentes.

In [152]:
df4 = ((df3.filter(df3.City.isin("Amsterdam", "Berlin", "Lisbon", "Vienna", "Zurich")))
            .select(df3['City'], 
                    df3['Name'], 
                    explode(df3.CuisineStyle).alias("CuisineUniqueStyle"))
)

df5 = (df4.select(df4['City'], df4['CuisineUniqueStyle'])
                    .groupBy("City", "CuisineUniqueStyle")
                    .agg(func.count("CuisineUniqueStyle").alias("n"))
      )

w2 = Window.partitionBy('City').orderBy(df5['n'].desc())
(df5.select(df5['City'], 
            df5['CuisineUniqueStyle'],
            rank().over(w2).alias('rank')).
     filter(col('rank') <= 5)
     .show(25, truncate=35)
)

+---------+--------------------+----+
|     City|  CuisineUniqueStyle|rank|
+---------+--------------------+----+
|   Lisbon|          Portuguese|   1|
|   Lisbon|            European|   2|
|   Lisbon|            European|   3|
|   Lisbon| Vegetarian Friendly|   4|
|   Lisbon|       Mediterranean|   5|
|   Berlin| Vegetarian Friendly|   1|
|   Berlin|            European|   2|
|   Berlin|             Italian|   3|
|   Berlin|              German|   4|
|   Berlin|       Vegan Options|   5|
|   Vienna|            European|   1|
|   Vienna| Vegetarian Friendly|   2|
|   Vienna|            Austrian|   3|
|   Vienna|    Central European|   4|
|   Vienna|             Italian|   5|
|   Zurich| Vegetarian Friendly|   1|
|   Zurich|            European|   2|
|   Zurich|             Italian|   3|
|   Zurich|               Swiss|   4|
|   Zurich|       Mediterranean|   5|
|Amsterdam|            European|   1|
|Amsterdam| Vegetarian Friendly|   2|
|Amsterdam|               Dutch|   3|
|Amsterdam| 

#5. Obtener nombre, ID y URL de los 5 restaurantes que tengan el mayor número de
reviews, que incluyan estilo de cocina francesa y estén en el rango de precios más caro
en las ciudades: Amsterdam, Berlín y Lisboa.

La consulta se realiza siguiendo el siguiente orden de los pasos. Cada paso se realiza sobre el DataFrame creado en el paso anterior.

- Crear un nuevo DataFrame separando los datos del array de la columna CuisineStyle en strings separados y creando una fila por cada estilo de cocina.
- Hacer un drop de los datos faltantes (NaN o Null) para eliminarlos
- Crear un nuevo DataFrame que filtre los datos de 3 ciudades, estilo de cocina francesa y que estén en el rango de precios más caro.
- Crear una ventana por ciudad ordenando el número de reviews en orden descendente.
- Seleccionar los 5 primeros valores de cada ciudad que coinciden con los 5 restaurantes que tienen en mayor número de reviews.

In [153]:
df_5 = (df3.select(df3['id'], 
                    df3['City'],
                    df3['Name'],
                    df3['URL_TA'],
                    df3['PriceRange'],
                    df3['NumReviews'],
                    explode(df3.CuisineStyle).alias("CuisineUniqueStyle"))
)

print(df_5.count())
clean_df = df_5.na.drop()
print(clean_df.count())

df_6 = clean_df.filter(clean_df.City.isin("Amsterdam", "Berlin","Lisbon") & 
                clean_df.CuisineUniqueStyle.isin("French") &
                clean_df.PriceRange.isin("$$$$"))

window = Window.partitionBy(df['City']).orderBy(df['NumReviews'].desc())
(df_6.select(df_6['id'], 
              df_6['City'],
              df_6['Name'], 
              df_6['URL_TA'], 
              df_6['PriceRange'],
              df_6['NumReviews'],
              rank().over(window).alias('rank')).filter(col('rank') <= 5).show(25, truncate = 35)
)


286649
251602
+----+---------+-------------------+-----------------------------------+----------+----------+----+
|  id|     City|               Name|                             URL_TA|PriceRange|NumReviews|rank|
+----+---------+-------------------+-----------------------------------+----------+----------+----+
|1659|   Lisbon|              Local|              'Love Is In The Air']|      $$$$|       3.0|   1|
| 608|   Berlin|          borchardt|/Restaurant_Review-g187323-d6925...|      $$$$|    1399.0|   1|
|  26|   Berlin|            Heising|/Restaurant_Review-g187323-d9587...|      $$$$|    1024.0|   2|
| 101|   Berlin|          Entrecote|/Restaurant_Review-g187323-d6956...|      $$$$|     868.0|   3|
|1192|   Berlin|              Grosz|/Restaurant_Review-g187323-d3727...|      $$$$|     367.0|   4|
|  83|   Berlin|Belmondo Restaurant|/Restaurant_Review-g187323-d1526...|      $$$$|     355.0|   5|
|  14|Amsterdam| Restaurant Daalder|/Restaurant_Review-g188590-d1408...|      $$$$|   

In [154]:
# Eliminamos el contexto de Spark.
sc.stop()