## Mining Massive Datasets
### Homework 1
### Anastasiia Kasprova

In [3]:
import findspark
findspark.init('/home/anastasiia/spark-2.1.0-bin-hadoop2.7')

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HW1').getOrCreate()

In [4]:
df1 = spark.read.csv('spark_data/amsterdam_listings.csv', mode='DROPMALFORMED', header=True)
df2 = spark.read.csv('spark_data/amsterdam_reviews.csv', mode='DROPMALFORMED',header=True)

In [11]:
#checking what columns we have
df1.printSchema()
df2.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)

root
 |-- listing_id: string (nullable = true)
 |-- date: string (nullable = true)



In [9]:
# creating views to be able to sql dfs
df1.createOrReplaceTempView('listings')
df2.createOrReplaceTempView('reviews')

In [12]:
# cheking how the data looks like
spark.sql("SELECT * FROM listings limit 1").show()
spark.sql("SELECT * FROM reviews limit 1").show()

+--------+--------------------+-------+---------+-------------------+--------------------+----------------+-----------------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|      id|                name|host_id|host_name|neighbourhood_group|       neighbourhood|        latitude|        longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+--------+--------------------+-------+---------+-------------------+--------------------+----------------+-----------------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|14831696|Luxurious & space...|5476119|   Arthur|               null|De Baarsjes - Oud...|52.3611730497892|4.866754581739126|Entire home/apt|  145|             3|                4| 2016-12-31|             0.68|             

In [15]:
#joining 2 dfs: listings and reviews on id
df = spark.sql("SELECT * FROM listings l JOIN reviews r on l.id = r.listing_id")

#create a view for the new df
df.createOrReplaceTempView('combined')

#checking what did we get
spark.sql("SELECT * FROM combined limit 1").show()

+--------+--------------------+-------+---------+-------------------+--------------------+----------------+-----------------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+----------+----------+
|      id|                name|host_id|host_name|neighbourhood_group|       neighbourhood|        latitude|        longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|listing_id|      date|
+--------+--------------------+-------+---------+-------------------+--------------------+----------------+-----------------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+----------+----------+
|14831696|Luxurious & space...|5476119|   Arthur|               null|De Baarsjes - Oud...|52.3611730497892|4.866754581739126|Entire home/apt|  145|          

In [16]:
#cheking what is 'rooms,flats,everything together:)'
spark.sql("SELECT distinct room_type FROM combined").show()

+---------------+
|      room_type|
+---------------+
|    Shared room|
|Entire home/apt|
|   Private room|
+---------------+



In [17]:
#computing an average price of each type of properties
spark.sql("SELECT room_type, cast(round(avg(price),0) as int) as avg_price FROM combined GROUP BY 1").show()

+---------------+---------+
|      room_type|avg_price|
+---------------+---------+
|    Shared room|      116|
|Entire home/apt|      147|
|   Private room|       89|
+---------------+---------+



In [22]:
# adding new columns: price_deviation and price_bucket (grouping by room_type)
spark.sql("SELECT a.*, round(a.price/b.avg_price,2) as price_deviation, case when a.price>round(b.avg_price) then 'H' else 'L' end as price_bucket FROM combined a JOIN (SELECT room_type, avg(price) as avg_price FROM combined GROUP BY 1) b ON a.room_type=b.room_type").show()

+--------+--------------------+--------+---------+-------------------+-------------+------------------+-----------------+-----------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+----------+----------+---------------+------------+
|      id|                name| host_id|host_name|neighbourhood_group|neighbourhood|          latitude|        longitude|  room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|listing_id|      date|price_deviation|price_bucket|
+--------+--------------------+--------+---------+-------------------+-------------+------------------+-----------------+-----------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+----------+----------+---------------+------------+
|15537309|10 minutes from c...|99956555|  Touseef|               null|   Noord-Oost| 52.393160

In [56]:
# exporting results into csv file 
df3 = spark.sql("SELECT a.*, round(a.price/b.avg_price,2) as price_deviation, case when a.price>round(b.avg_price) then 'H' else 'L' end as price_bucket FROM combined a JOIN (SELECT room_type, avg(price) as avg_price FROM combined GROUP BY 1) b ON a.room_type=b.room_type")
df3.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save('spark_data/amsterdam_by_room_type.csv')

In [27]:
#cheking what kind neighbourhood we have in the data
spark.sql("SELECT distinct neighbourhood FROM combined").show()
spark.sql("SELECT count(distinct neighbourhood) FROM combined").show()

+--------------------+
|       neighbourhood|
+--------------------+
|           Oud-Noord|
|De Baarsjes - Oud...|
|       Bos en Lommer|
|              Osdorp|
|        Centrum-Oost|
|De Pijp - Riviere...|
|          Westerpark|
|            Oud-Oost|
|Geuzenveld - Slot...|
|Oostelijk Havenge...|
|                Zuid|
|        Centrum-West|
|          Noord-Oost|
|          Noord-West|
|IJburg - Zeeburge...|
|De Aker - Nieuw S...|
|     Watergraafsmeer|
|        Bijlmer-Oost|
|     Bijlmer-Centrum|
|Gaasperdam - Drie...|
+--------------------+
only showing top 20 rows

+-----------------------------+
|count(DISTINCT neighbourhood)|
+-----------------------------+
|                           22|
+-----------------------------+



In [33]:
# calculating avg_price based on room type and neighbourhood
spark.sql("SELECT neighbourhood, room_type, round(avg(price),2) as avg_price FROM combined GROUP BY neighbourhood, room_type ORDER BY neighbourhood, room_type").show()

+--------------------+---------------+---------+
|       neighbourhood|      room_type|avg_price|
+--------------------+---------------+---------+
|     Bijlmer-Centrum|Entire home/apt|    77.95|
|     Bijlmer-Centrum|   Private room|    51.51|
|        Bijlmer-Oost|Entire home/apt|    98.64|
|        Bijlmer-Oost|   Private room|    51.86|
|       Bos en Lommer|Entire home/apt|    106.3|
|       Bos en Lommer|   Private room|    67.46|
|       Bos en Lommer|    Shared room|    45.67|
|Buitenveldert - Z...|Entire home/apt|   117.44|
|Buitenveldert - Z...|   Private room|    69.07|
|Buitenveldert - Z...|    Shared room|     40.0|
|        Centrum-Oost|Entire home/apt|   171.04|
|        Centrum-Oost|   Private room|   109.29|
|        Centrum-Oost|    Shared room|    180.2|
|        Centrum-West|Entire home/apt|   179.25|
|        Centrum-West|   Private room|   108.99|
|        Centrum-West|    Shared room|    160.4|
|De Aker - Nieuw S...|Entire home/apt|   106.76|
|De Aker - Nieuw S..

In [58]:
# exporting results into csv file 
df4 = spark.sql("SELECT neighbourhood, room_type, round(avg(price),2) as avg_price FROM combined GROUP BY neighbourhood, room_type ORDER BY neighbourhood, room_type")
df4.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save('spark_data/amsterdam_avgprice_by_neighbourhood.csv')