In [1]:
from pprint import pprint

In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName('ddam_project')
         .config('spark.some.config.option','some-value')
         .getOrCreate()
         )

In [2]:
df_cleaned = spark.read.csv("hdfs://masterbig-1.itc.unipi.it:54310/user/student18/df_cleaned.csv", header = True, inferSchema = True)
df_cleaned.printSchema()

root
 |-- id: long (nullable = true)
 |-- Hotel_Address: string (nullable = true)
 |-- Additional_Number_of_Scoring: integer (nullable = true)
 |-- Review_Date: string (nullable = true)
 |-- Average_Score: double (nullable = true)
 |-- Hotel_Name: string (nullable = true)
 |-- Reviewer_Nationality: string (nullable = true)
 |-- Review_Total_Negative_Word_Counts: integer (nullable = true)
 |-- Total_Number_of_Reviews: integer (nullable = true)
 |-- Review_Total_Positive_Word_Counts: integer (nullable = true)
 |-- Total_Number_of_Reviews_Reviewer_Has_Given: integer (nullable = true)
 |-- Reviewer_Score: double (nullable = true)
 |-- Tags: string (nullable = true)
 |-- days_since_review: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)
 |-- Review: string (nullable = true)



In [3]:
df_cleaned.take(3)

[Row(id=14, Hotel_Address='s Gravesandestraat 55 Oost 1092 AA Amsterdam Netherlands', Additional_Number_of_Scoring=194, Review_Date='7/4/2017', Average_Score=7.7, Hotel_Name='Hotel Arena', Reviewer_Nationality='Canada', Review_Total_Negative_Word_Counts=35, Total_Number_of_Reviews=1403, Review_Total_Positive_Word_Counts=15, Total_Number_of_Reviews_Reviewer_Has_Given=1, Reviewer_Score=8.8, Tags="[' Leisure trip ', ' Family with young children ', ' Large King Room ', ' Stayed 5 nights ', ' Submitted from a mobile device ']", days_since_review='30 days', lat='52.3605759', lng='4.9159683', Review='the staff in the restaurant could of been more pleasant we only visited once but that wouldn t stop us from booking again it might of just been an off night for him .  it was very good very historic building that s why I chose it'),
 Row(id=374, Hotel_Address='s Gravesandestraat 55 Oost 1092 AA Amsterdam Netherlands', Additional_Number_of_Scoring=194, Review_Date='11/3/2015', Average_Score=7.7, H

## Hotel City-Nationality Extraction

In [4]:
import reverse_geocode

In [5]:
# check na value for lat/lng
df_cleaned.select('lat','lng').rdd.filter(lambda x: x['lng']== 'NA').count()

3267

In [6]:
coord =[ (43.6753176,10.5408628) ]
reverse_geocode.search(coord)

[{'city': 'Cascina', 'country': 'Italy', 'country_code': 'IT'}]

In [7]:
   
df_coord = ( df_cleaned.select('id', 'lat', 'lng').rdd
            .filter(lambda x: x['lat']!='NA' and x['lng']!='NA')
             .map(lambda x: (x['id'], [ [float(x['lat']), float(x['lng'])] ] )) 
             .map(lambda x: (x[0], reverse_geocode.search(x[1])))
             .map(lambda x: (x[0] , x[1][0]['city'], x[1][0]['country'], x[1][0]['country_code']))
            ).toDF(['id','city','country','country_code'])

In [7]:
pprint(df_coord.take(5))

[Row(id=14, city='Amsterdam', country='Netherlands', country_code='NL'),
 Row(id=374, city='Amsterdam', country='Netherlands', country_code='NL'),
 Row(id=681, city='Kensington', country='United Kingdom', country_code='GB'),
 Row(id=860, city='Kensington', country='United Kingdom', country_code='GB'),
 Row(id=1014, city='Clerkenwell', country='United Kingdom', country_code='GB')]


In [8]:
df_coord.groupby('country').count().orderBy('count', ascending = False).show()

+--------------+------+
|       country| count|
+--------------+------+
|United Kingdom|262194|
|         Spain| 59907|
|        France| 59514|
|   Netherlands| 57190|
|         Italy| 37188|
|       Austria| 36239|
+--------------+------+



In [59]:
df_coord.groupby('country_code', 'country', 'city').count().orderBy('country','count',ascending = False).show(100)

+------------+--------------+--------------------+-----+
|country_code|       country|                city|count|
+------------+--------------+--------------------+-----+
|          GB|United Kingdom|              London|42532|
|          GB|United Kingdom|          Kensington|36917|
|          GB|United Kingdom|           Bayswater|33520|
|          GB|United Kingdom|  West End of London|31523|
|          GB|United Kingdom|      City of London|20081|
|          GB|United Kingdom| City of Westminster|10565|
|          GB|United Kingdom|        Canary Wharf|10238|
|          GB|United Kingdom|            Barbican| 9897|
|          GB|United Kingdom|             Chelsea| 9731|
|          GB|United Kingdom|              Poplar| 9629|
|          GB|United Kingdom|             Wembley| 6535|
|          GB|United Kingdom|         Camden Town| 6227|
|          GB|United Kingdom|        Belsize Park| 5173|
|          GB|United Kingdom|         Clerkenwell| 4452|
|          GB|United Kingdom|  

In [23]:
hotel_countries = df_coord.select('country').distinct().rdd.map(lambda x: x['country']).collect()

In [20]:
import re 

In [16]:
hotel_countries

['France', 'Italy', 'Spain', 'Austria', 'United Kingdom', 'Netherlands']

In [24]:
# estrarre la nazionalità dall'indirizzo
def extractCountry(row):
    for country in hotel_countries:
        if country.lower() in row['Hotel_Address'].lower():
            return (row['id'], country )
    return 'EMPTY'
        
            
rdd_hotel_countries = df_cleaned.select('id', 'Hotel_Address').rdd.map(extractCountry)

In [39]:
# check if all reviews have an associated hotel_country 
rdd_hotel_countries.filter(lambda x: x[1] == 'EMPTY').count()


0

In [41]:
rdd_hotel_countries.take(1)

[(14, 'Netherlands')]

In [25]:
df_hotel_countries = rdd_hotel_countries.toDF(['id', 'Hotel_Country'])

In [54]:
df_hotel_countries.head(10)

[Row(id=14, Hotel_Country='Netherlands'),
 Row(id=374, Hotel_Country='Netherlands'),
 Row(id=681, Hotel_Country='United Kingdom'),
 Row(id=860, Hotel_Country='United Kingdom'),
 Row(id=1014, Hotel_Country='United Kingdom'),
 Row(id=1123, Hotel_Country='United Kingdom'),
 Row(id=1162, Hotel_Country='United Kingdom'),
 Row(id=1318, Hotel_Country='United Kingdom'),
 Row(id=1375, Hotel_Country='United Kingdom'),
 Row(id=1705, Hotel_Country='United Kingdom')]

In [55]:
df_cleaned.select('Reviewer_Nationality').distinct().count()

227

In [11]:
df_cleaned.select('Reviewer_Nationality').distinct()


227

In [26]:
df_cleaned.createTempView('Reviewer_Nationality')

AnalysisException: "Temporary table 'Reviewer_Nationality' already exists;"

In [20]:
spark.sql("SELECT Reviewer_Nationality, COUNT(*) AS N \
                                                FROM Reviewer_Nationality \
                                                GROUP BY Reviewer_Nationality \
                                                ORDER BY N DESC\
                                                ").take(50)


[Row(Reviewer_Nationality='United Kingdom', N=245189),
 Row(Reviewer_Nationality='United States of America', N=35433),
 Row(Reviewer_Nationality='Australia', N=21683),
 Row(Reviewer_Nationality='Ireland', N=14821),
 Row(Reviewer_Nationality='United Arab Emirates', N=10228),
 Row(Reviewer_Nationality='Saudi Arabia', N=8947),
 Row(Reviewer_Nationality='Netherlands', N=8768),
 Row(Reviewer_Nationality='Switzerland', N=8672),
 Row(Reviewer_Nationality='Germany', N=7937),
 Row(Reviewer_Nationality='Canada', N=7893),
 Row(Reviewer_Nationality='France', N=7281),
 Row(Reviewer_Nationality='Israel', N=6609),
 Row(Reviewer_Nationality='Italy', N=6100),
 Row(Reviewer_Nationality='Belgium', N=6022),
 Row(Reviewer_Nationality='Turkey', N=5441),
 Row(Reviewer_Nationality='Kuwait', N=4917),
 Row(Reviewer_Nationality='Spain', N=4732),
 Row(Reviewer_Nationality='Romania', N=4547),
 Row(Reviewer_Nationality='Russia', N=3899),
 Row(Reviewer_Nationality='South Africa', N=3819),
 Row(Reviewer_Nationality='

In [13]:
df_cleaned.select('Reviewer_Nationality').groupby('Reviewer_Nationality').count().take(20)

[Row(Reviewer_Nationality='Turks Caicos Islands', count=14),
 Row(Reviewer_Nationality='Russia', count=3899),
 Row(Reviewer_Nationality='Paraguay', count=28),
 Row(Reviewer_Nationality='Anguilla', count=1),
 Row(Reviewer_Nationality='Yemen', count=16),
 Row(Reviewer_Nationality='St Maarten', count=11),
 Row(Reviewer_Nationality='Senegal', count=24),
 Row(Reviewer_Nationality='Sweden', count=3336),
 Row(Reviewer_Nationality='Kiribati', count=2),
 Row(Reviewer_Nationality='Guyana', count=5),
 Row(Reviewer_Nationality='Philippines', count=1073),
 Row(Reviewer_Nationality='Jersey', count=863),
 Row(Reviewer_Nationality='Eritrea', count=2),
 Row(Reviewer_Nationality='Djibouti', count=2),
 Row(Reviewer_Nationality='Singapore', count=3091),
 Row(Reviewer_Nationality='Malaysia', count=1882),
 Row(Reviewer_Nationality='Fiji', count=12),
 Row(Reviewer_Nationality='Turkey', count=5441),
 Row(Reviewer_Nationality='Malawi', count=10),
 Row(Reviewer_Nationality='Iraq', count=290)]

In [22]:
df_nationality_tmp = df_hotel_countries.join(df_cleaned.select('id','Reviewer_Nationality', 'Reviewer_Score'), ['id'] )  

NameError: name 'df_hotel_countries' is not defined

In [58]:
df_nationality_tmp.explain()

== Physical Plan ==
*Project [id#251L, Hotel_Country#252, Reviewer_Nationality#6, Reviewer_Score#11]
+- *SortMergeJoin [id#251L], [id#0L], Inner
   :- *Sort [id#251L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#251L, 200)
   :     +- *Filter isnotnull(id#251L)
   :        +- Scan ExistingRDD[id#251L,Hotel_Country#252]
   +- *Sort [id#0L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#0L, 200)
         +- *Project [id#0L, Reviewer_Nationality#6, Reviewer_Score#11]
            +- *Filter isnotnull(id#0L)
               +- *FileScan csv [id#0L,Reviewer_Nationality#6,Reviewer_Score#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[hdfs://masterbig-1.itc.unipi.it:54310/user/student18/df_cleaned.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,Reviewer_Nationality:string,Reviewer_Score:double>


In [64]:
# adding column with Abroad feature: 1 if reviewer was abroad, 0 otherwise

df_abroad_temp = df_nationality_tmp.rdd.map(lambda row: (row['id'], 0 if row['Reviewer_Nationality'] == row['Hotel_Country'] else 1 )).toDF(['id','Abroad'])

df_nationality = df_nationality_tmp.join(df_abroad_temp, ['id'])

df_nationality.take(3)




[Row(id=26, Hotel_Country='Netherlands', Reviewer_Nationality='United Kingdom', Reviewer_Score=9.6, Abroad=1),
 Row(id=29, Hotel_Country='Netherlands', Reviewer_Nationality='Hungary', Reviewer_Score=9.2, Abroad=1),
 Row(id=474, Hotel_Country='United Kingdom', Reviewer_Nationality='United States of America', Reviewer_Score=6.3, Abroad=1)]

In [70]:
df_nationality.rdd.filter(lambda x: x['Abroad']== 0).count()

178357

In [72]:
df_nationality.printSchema()

root
 |-- id: long (nullable = true)
 |-- Hotel_Country: string (nullable = true)
 |-- Reviewer_Nationality: string (nullable = true)
 |-- Reviewer_Score: double (nullable = true)
 |-- Abroad: long (nullable = true)



In [71]:
df_nationality.createTempView('nationality')

In [84]:
# 
df_mean_score_by_Hotel_nationality =  spark.sql("SELECT Reviewer_Nationality, AVG(Reviewer_Score) AS AVG_SCORE \
                                                FROM nationality \
                                                GROUP BY Reviewer_Nationality \
                                                ORDER BY AVG_SCORE DESC\
                                                ")
df_mean_score_by_Hotel_nationality.show()

+--------------------+-----------------+
|Reviewer_Nationality|        AVG_SCORE|
+--------------------+-----------------+
|              Crimea|             10.0|
|  Svalbard Jan Mayen|             10.0|
|   Equatorial Guinea|             10.0|
|             Comoros|             10.0|
|          Cape Verde|              9.6|
|          Tajikistan|          9.35625|
|Central Africa Re...|              9.3|
|        Saint Martin|            9.275|
|              Gambia|             9.26|
|             Burundi|9.200000000000001|
|         South Sudan|              9.2|
|               Benin|9.166666666666666|
|               Congo|9.166666666666666|
|               Niger|             9.15|
|             Vanuatu|9.075000000000001|
|Bonaire St Eustat...|9.066666666666666|
|         Saint Barts|9.033333333333333|
|          Kyrgyzstan|8.995238095238093|
|St Pierre and Miq...|             8.95|
|           Cocos K I|             8.95|
+--------------------+-----------------+
only showing top

In [21]:
df_mean_score_by_Hotel_Country =  spark.sql("SELECT Hotel_Country, AVG(Reviewer_Score) AS AVG_SCORE \
                                                FROM nationality \
                                                GROUP BY Hotel_Country \
                                                ORDER BY AVG_SCORE DESC\
                                                ")
df_mean_score_by_Hotel_Country.show()

AnalysisException: 'Table or view not found: nationality; line 1 pos 108'

#### Quant'è la differenza tra media aritmetica e media geografica sferica/ellissoide?