In [1]:
from pyspark import SparkContext as sc
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession, functions, types
from pyspark.sql.types import *
from IPython.display import display
import pandas as pd
import math, re, urllib, requests
from datetime import datetime as dt

In [2]:
sc = sc(appName="attraction")
sqlContext = SQLContext(sc)
spark = SparkSession.builder.appName('attraction analysis').getOrCreate()



In [3]:
det_path = 'outputs/attraction_details/part-00000-24d79c41-ab21-4f52-bb5e-fed77bf823f8-c000.snappy.parquet'
rev_path = 'outputs/attraction_reviews/*'

In [4]:
det_df = spark.read.parquet(det_path)
det_df.createOrReplaceTempView('det_df')

In [5]:
display(det_df.count())
display(det_df.orderBy('attraction_id').toPandas().head(11))

1986

Unnamed: 0,attraction_id,city,country,location,name,price,province,rating
0,0,vancouver,canada,"{""lat"":49.1978322,""lng"":-123.0649959}",vancouver_city_sightseeing_tour,80.0,british_columbia,4.5
1,1,vancouver,canada,"{""lat"":49.1978322,""lng"":-123.0649959}",vancouver_to_victoria_and_butchart_gardens_tou...,210.0,british_columbia,5.0
2,2,montreal,canada,"{""lat"":45.5001458,""lng"":-73.5720264}",quebec_city_and_montmorency_falls_day_trip_fro...,115.0,quebec,4.5
3,3,toronto,canada,"{""lat"":43.6561507,""lng"":-79.3842642}",niagara_falls_day_trip_from_toronto,169.0,ontario,5.0
4,4,niagara_falls,canada,"{""lat"":43.0857136,""lng"":-79.0824311}","best_of_niagara_falls_tour_from_niagara_falls,...",158.0,ontario,5.0
5,5,niagara_falls,canada,"{""lat"":43.102436,""lng"":-78.961638}",niagara_falls_in_one_day:_deluxe_sightseeing_t...,204.42,ontario,5.0
6,6,vancouver,canada,"{""lat"":49.1978322,""lng"":-123.0649959}",whistler_small-group_day_trip_from_vancouver,145.0,british_columbia,5.0
7,7,niagara_falls,canada,"{""lat"":43.0857136,""lng"":-79.0824311}",ultimate_niagara_falls_tour_plus_helicopter_ri...,317.42,ontario,5.0
8,8,vancouver_island,canada,nil,"local_food,_craft_beverage_and_estate_winery_t...",150.0,british_columbia,5.0
9,9,vancouver,canada,"{""lat"":49.2869235,""lng"":-123.12216}",private_tour:_vancouver_to_victoria_island,670.0,british_columbia,5.0


In [6]:
clean_loc_udf = functions.udf(lambda x: re.sub('things_to_do_in_','',x),StringType())
det_df = det_df.withColumn('city',clean_loc_udf(det_df.city))
det_df = det_df.withColumn('country',clean_loc_udf(det_df.country))

In [7]:
rat_nonull = det_df.where(det_df.rating.isNull()).withColumn("rating", functions.lit(-1))
out_df = det_df.where(det_df.rating.isNotNull()).union(rat_nonull)
display(out_df.count())
out_df.createOrReplaceTempView('out_df')

1986

In [8]:
def myround(x, base=.5):
    return float(round(x/base)*base)

avg_rat_df = spark.sql("SELECT province, category, AVG(rating) as avg_rating FROM out_df WHERE rating != -1 GROUP BY province, category")
round_udf = functions.udf(lambda x: myround(x), FloatType())
avg_rat_df = avg_rat_df.withColumn('updated_rating',round_udf(avg_rat_df.avg_rating)).drop('avg_rating')

out_df = out_df.join(avg_rat_df, ['province','category'],'left').orderBy('attraction_id')
out_df = out_df.withColumn("rating", functions.when(out_df["rating"]== -1, out_df["updated_rating"]).otherwise(out_df["rating"])).drop('updated_rating')

out_df.createOrReplaceTempView('out_df')
display(out_df.count())

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `category` cannot be resolved. Did you mean one of the following? [`city`, `country`, `location`, `name`, `rating`].; line 1 pos 17;
'Aggregate [province#6, 'category], [province#6, 'category, avg(rating#7) AS avg_rating#98]
+- Filter NOT (rating#7 = cast(-1 as double))
   +- SubqueryAlias out_df
      +- View (`out_df`, [attraction_id#0L,city#31,country#41,location#3,name#4,price#5,province#6,rating#7])
         +- Union false, false
            :- Filter isnotnull(rating#7)
            :  +- Project [attraction_id#0L, city#31, <lambda>(country#2)#40 AS country#41, location#3, name#4, price#5, province#6, rating#7]
            :     +- Project [attraction_id#0L, <lambda>(city#1)#30 AS city#31, country#2, location#3, name#4, price#5, province#6, rating#7]
            :        +- Relation [attraction_id#0L,city#1,country#2,location#3,name#4,price#5,province#6,rating#7] parquet
            +- Project [attraction_id#67L, city#68, country#69, location#70, name#71, price#72, province#73, cast(rating#74 as double) AS rating#75]
               +- Project [attraction_id#59L AS attraction_id#67L, city#31 AS city#68, country#41 AS country#69, location#62 AS location#70, name#63 AS name#71, price#64 AS price#72, province#65 AS province#73, rating#50 AS rating#74]
                  +- Project [attraction_id#59L, city#31, country#41, location#62, name#63, price#64, province#65, -1 AS rating#50]
                     +- Filter isnull(rating#66)
                        +- Project [attraction_id#59L, city#31, <lambda>(country#61)#40 AS country#41, location#62, name#63, price#64, province#65, rating#66]
                           +- Project [attraction_id#59L, <lambda>(city#60)#30 AS city#31, country#61, location#62, name#63, price#64, province#65, rating#66]
                              +- Relation [attraction_id#59L,city#60,country#61,location#62,name#63,price#64,province#65,rating#66] parquet


In [None]:
avg_price_df = spark.sql("SELECT province, category, AVG(price) as avg_price FROM out_df WHERE price > 0 GROUP BY province, category")
round_price_udf = functions.udf(lambda x: round(x,2), FloatType())
avg_price_df = avg_price_df.withColumn('updated_price', round_price_udf(avg_price_df["avg_price"]))

upd_price_df = out_df.join(avg_price_df, ['province','category'],'left_outer').orderBy('attraction_id')
det_df = upd_price_df.withColumn("price", functions.when(det_df["price"] == -1.00, upd_price_df["updated_price"]).otherwise(upd_price_df["price"])).drop('updated_price')

det_df.createOrReplaceTempView('det_df')
display(det_df.count())

In [None]:
def find_loc(x):
    toOut = re.findall('[+,-]*\d+\.\d+',x)
    if len(toOut) == 0:
        return [None,None]
    else:
        return [float(x) for x in toOut]

loc_udf = functions.udf(lambda x: find_loc(x), ArrayType(FloatType()))
det_loc_df = det_df.withColumn('location', loc_udf(det_df.location)).orderBy('attraction_id')
det_loc_df = det_loc_df.withColumn('latitude',det_loc_df.location[0]).withColumn('longitude',det_loc_df.location[1]).drop('location')

display(det_loc_df.count())
det_loc_df.createOrReplaceTempView('det_loc_df')

In [None]:
det_loc_df.createOrReplaceTempView('det_loc_df')
avg_cc_loc = spark.sql("SELECT city, category, AVG(latitude) as cc_lat, AVG(longitude) as cc_lon FROM det_loc_df WHERE ISNULL(latitude) = false AND ISNULL(longitude) = false GROUP BY city, category")

det_avgloc_df = det_loc_df.join(avg_cc_loc, ['city','category'],'left_outer')
det_avgloc_df = det_avgloc_df.withColumn('latitude', functions.when(det_avgloc_df['latitude'].isNull(),det_avgloc_df['cc_lat']).otherwise(det_avgloc_df['latitude'])).withColumn('longitude', functions.when(det_avgloc_df['longitude'].isNull(),det_avgloc_df['cc_lon']).otherwise(det_avgloc_df['longitude'])).drop(det_avgloc_df['cc_lat']).drop(det_avgloc_df['cc_lon']).orderBy('attraction_id')

display(det_avgloc_df.count())
display(det_avgloc_df.toPandas().head(11))

In [None]:
# insert your google maps api key below
def get_loc(address, position, maps_key='----'):
    maps_api_url = 'https://maps.googleapis.com/maps/api/geocode/json'
    request_url = maps_api_url + '?' + urllib.parse.urlencode({'address':address,'key':maps_key})
    response = requests.get(request_url)
    resp_json_payload = response.json()
    out = resp_json_payload['results'][0]['geometry']['location']
    if position == 'latitude':
        return float(out['lat'])
    elif position == 'longitude':
        return float(out['lng'])

get_lat_udf = functions.udf(lambda x: get_loc(x,'latitude'), FloatType())
get_lon_udf = functions.udf(lambda x: get_loc(x,'longitude'), FloatType())
det_avgloc_df = det_avgloc_df.withColumn('latitude',functions.when(det_avgloc_df['latitude'].isNull(),get_lat_udf(det_avgloc_df['city']+','+det_avgloc_df['province'])).otherwise(det_avgloc_df['latitude']))
det_avgloc_df = det_avgloc_df.withColumn('longitude',functions.when(det_avgloc_df['longitude'].isNull(),get_lon_udf(det_avgloc_df['city']+','+det_avgloc_df['province'])).otherwise(det_avgloc_df['longitude']))

display(det_avgloc_df.count())

In [None]:
det_avgloc_df.coalesce(8).write.parquet('etl/attractions',mode='overwrite')
det_avg_log.toPandas().to_json('etl/attractions.json',orient='records',index=True)

In [None]:
rev_df = spark.read.parquet(rev_path).repartition(160)
display(rev_df.count())
display(rev_df.show())

In [None]:
def convert_date(ip_date):
    try:
        op_date = dt.strptime(ip_date, "%B %d, %Y").strftime("%d-%m-%Y")
        return op_date
    except:
        return ip_date
convert_df_udf = functions.udf(lambda x: convert_date(x),StringType())
out_df = rev_df.withColumn('review_date',convert_df_udf(rev_df['review_date']))
out_df.createOrReplaceTempView('rev_df')

In [None]:
out_df.show()

In [None]:
user_rev_count = spark.sql("SELECT user, COUNT(*) as rev_count FROM rev_df GROUP BY user ORDER BY rev_count DESC")
user_rev_count.show()

In [None]:
print( "Reviews are available for {att_no} attractions.".format(att_no = len(rev_df.select('attraction_id').distinct().collect())))
print( "Matrix will be higly sparse as the maximum number of reviews provided by an user is {val}.".format(val=user_rev_count.select('rev_count').limit(1).collect()[0][0]))

In [None]:
user_df = user_rev_count.toPandas()
user_df['user_id'] = user_df.index
user_rev = spark.createDataFrame(user_df)
rev_etled = out_df.join(user_rev.drop('rev_count'),'user')

In [None]:
rev_etled.coalesce(8).write.parquet('etl/attraction_reviews', mode = 'overwrite')
rev_etled.toPandas().to_json('etl/attraction_reviews.json',orient='records',index=True)