In [31]:
import pyspark
import pyspark.sql
import io
import pandas as pd
import requests
import psycopg2
from pyspark.sql.functions import col, translate, element_at, split, to_date

In [2]:
spark = pyspark.sql.SparkSession.builder \
                    .appName("LocalSparkProject") \
                    .master("local[4]") \
                    .config("spark.jars", "./psql_adapter/postgresql-42.7.4.jar") \
                    .config("spark.executor.memory", "6g") \
                    .config("spark.executor.memory", "4g") \
                    .getOrCreate()

24/11/05 16:01:59 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.10.229.176 instead (on interface en0)
24/11/05 16:01:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/11/05 16:01:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
def load_data_from_url_as_spark_df(url, **params):
    """Loads data from url. Optional keyword arguments are passed to pandas.read_csv."""
    response = requests.get(url)
    dx = pd.read_csv(io.BytesIO(response.content), **params)  
    return spark.createDataFrame(dx)

In [4]:
listings_url = "https://data.insideairbnb.com/france/ile-de-france/paris/2024-09-06/data/listings.csv.gz"

In [14]:
listings_df = load_data_from_url_as_spark_df(listings_url, sep=',', index_col=0, quotechar='"', compression='gzip')

In [6]:
listings_df.show()

24/11/05 16:02:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/11/05 16:02:23 WARN TaskSetManager: Stage 0 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.
24/11/05 16:02:27 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

+--------------------+--------------+------------+---------------+--------------------+--------------------+---------------------+--------------------+-------+--------------------+-----------------+----------+-------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+--------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+-----------------+-----------------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+----------------+-----------

## Craete table "paris" in Postgres

In [7]:
p_url = "jdbc:postgresql://localhost:5445/postgres"

properties = {
    "user": "postgres",
    "password": "1234",
    "driver": "org.postgresql.Driver"
}

listings_df.write \
    .format("jdbc") \
    .option("url", p_url) \
    .option("dbtable", "paris") \
    .option("user", properties["user"]) \
    .option("password", properties["password"]) \
    .option("driver", properties["driver"]) \
    .mode("append") \
    .save()

24/11/05 16:02:33 WARN TaskSetManager: Stage 1 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

## Check that the table exists

In [9]:
connection = psycopg2.connect(
    dbname="postgres",
    user="postgres",
    password="1234",
    host="localhost",
    port="5445"
)

cursor = connection.cursor()

cursor.execute(
    """
    SELECT *
    FROM "public"."paris"
    LIMIT 1
    """
)

exists = cursor.fetchone()

print(exists)

cursor.close()
connection.close()

del cursor
del connection

('https://www.airbnb.com/rooms/32604779', 20240906025355, '2024-09-06', 'previous scrape', 'Appartement de standing avec double salon', 'Apartment of 84 m2 very bright located on the 8th floor consisting of a double living-room. Living room of 50m2 crossing with 2 large balconies on the living room side and bedroom/kitchen side.<br />Luxury building with concierge.<br />Bathroom renovated in 2019. Room adjoining the bathroom. Separate toilet, clothes dryer.<br />Ideally located 5 minutes from the Eiffel Tower and Mars fields.<br />Nearby transport: metro, RER, bus (69, 42, 82, 87), Air France bus.<br />Many nearby shops and restaurants.', 'NaN', 'https://a0.muscache.com/pictures/33e3c3c6-2069-4556-aa96-f4f12fff2b0f.jpg', 245091812, 'https://www.airbnb.com/users/show/245091812', 'Françoise Bernard', '2019-02-24', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'f', 'https://a0.muscache.com/im/pictures/user/6c9f1137-99ab-461b-8819-38646d9db54f.jpg?aki_policy=profile_small', 'https://a0.muscache.com/i

In [12]:
listings_df.columns

['listing_url',
 'scrape_id',
 'last_scraped',
 'source',
 'name',
 'description',
 'neighborhood_overview',
 'picture_url',
 'host_id',
 'host_url',
 'host_name',
 'host_since',
 'host_location',
 'host_about',
 'host_response_time',
 'host_response_rate',
 'host_acceptance_rate',
 'host_is_superhost',
 'host_thumbnail_url',
 'host_picture_url',
 'host_neighbourhood',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'host_has_profile_pic',
 'host_identity_verified',
 'neighbourhood',
 'neighbourhood_cleansed',
 'neighbourhood_group_cleansed',
 'latitude',
 'longitude',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bathrooms_text',
 'bedrooms',
 'beds',
 'amenities',
 'price',
 'minimum_nights',
 'maximum_nights',
 'minimum_minimum_nights',
 'maximum_minimum_nights',
 'minimum_maximum_nights',
 'maximum_maximum_nights',
 'minimum_nights_avg_ntm',
 'maximum_nights_avg_ntm',
 'calendar_updated',
 'has_availability',
 'availability_30',
 'av

In [16]:
columns_to_keep = [
    'host_id','host_since','host_is_superhost','latitude','longitude','property_type','room_type','accommodates','bathrooms','bathrooms_text','bedrooms','beds','amenities','price','minimum_nights','maximum_nights', 'number_of_reviews','review_scores_rating','license','instant_bookable','reviews_per_month'
]

base_df = listings_df.select(columns_to_keep)
clean_df = base_df.dropna()
clean_df.show()

24/11/05 16:09:53 WARN TaskSetManager: Stage 13 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.
[Stage 13:>                                                         (0 + 1) / 1]

+-------+----------+-----------------+-----------------+-----------------+------------------+---------------+------------+---------+--------------+--------+----+--------------------+---------+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|host_id|host_since|host_is_superhost|         latitude|        longitude|     property_type|      room_type|accommodates|bathrooms|bathrooms_text|bedrooms|beds|           amenities|    price|minimum_nights|maximum_nights|number_of_reviews|review_scores_rating|             license|instant_bookable|reviews_per_month|
+-------+----------+-----------------+-----------------+-----------------+------------------+---------------+------------+---------+--------------+--------+----+--------------------+---------+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|   3631|2008-10-14|                f|        

24/11/05 16:09:57 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 13 (TID 117): Attempting to kill Python Worker
                                                                                

In [17]:
from pyspark.sql.functions import col, translate

fixed_price_df = base_df.withColumn("price", translate(col("price"), "$,", "").cast("float"))

fixed_price_df.show()

24/11/05 16:10:41 WARN TaskSetManager: Stage 14 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.
[Stage 14:>                                                         (0 + 1) / 1]

+-------+----------+-----------------+-----------------+-----------------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|host_id|host_since|host_is_superhost|         latitude|        longitude|       property_type|      room_type|accommodates|bathrooms|bathrooms_text|bedrooms|beds|           amenities|price|minimum_nights|maximum_nights|number_of_reviews|review_scores_rating|             license|instant_bookable|reviews_per_month|
+-------+----------+-----------------+-----------------+-----------------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|   3631|2008-10-14|                f|         48.83

24/11/05 16:10:45 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 14 (TID 118): Attempting to kill Python Worker
                                                                                

In [20]:
clean_df = fixed_price_df.dropna()
clean_df.show()

24/11/05 16:12:22 WARN TaskSetManager: Stage 24 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.


+---------+----------+-----------------+-----------------+------------------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|  host_id|host_since|host_is_superhost|         latitude|         longitude|       property_type|      room_type|accommodates|bathrooms|bathrooms_text|bedrooms|beds|           amenities|price|minimum_nights|maximum_nights|number_of_reviews|review_scores_rating|             license|instant_bookable|reviews_per_month|
+---------+----------+-----------------+-----------------+------------------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
| 13132777|2014-03-14|                f|   

                                                                                

In [21]:
clean_df = clean_df.repartition(100) 
clean_df.select("price").describe().show()

24/11/05 16:12:36 WARN TaskSetManager: Stage 27 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.


+-------+-----------------+
|summary|            price|
+-------+-----------------+
|  count|            45916|
|   mean|212.8508363097831|
| stddev|404.3842515189901|
|    min|              8.0|
|    max|          24200.0|
+-------+-----------------+



In [22]:
clean_df \
    .groupBy("minimum_nights").count() \
    .orderBy(col("count").desc(), col("minimum_nights")) \
    .show()

24/11/05 16:13:23 WARN TaskSetManager: Stage 33 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.


+--------------+-----+
|minimum_nights|count|
+--------------+-----+
|             2|12533|
|             1|11501|
|             3|10124|
|             4| 3309|
|            30| 2857|
|             5| 2272|
|             7|  831|
|             6|  647|
|           365|  444|
|            90|  292|
|            31|  235|
|            10|  150|
|             8|   81|
|            91|   72|
|            14|   71|
|            15|   70|
|            60|   62|
|            28|   47|
|            20|   45|
|            12|   26|
+--------------+-----+
only showing top 20 rows



In [24]:
min_nights_df = clean_df.filter(col("minimum_nights") <= 90)

min_nights_df.show()

24/11/05 16:14:13 WARN TaskSetManager: Stage 39 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.


+---------+----------+-----------------+-----------------+-----------------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|  host_id|host_since|host_is_superhost|         latitude|        longitude|       property_type|      room_type|accommodates|bathrooms|bathrooms_text|bedrooms|beds|           amenities|price|minimum_nights|maximum_nights|number_of_reviews|review_scores_rating|             license|instant_bookable|reviews_per_month|
+---------+----------+-----------------+-----------------+-----------------+--------------------+---------------+------------+---------+--------------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|  7355630|2013-07-08|                t|      

                                                                                

In [26]:
min_nights_df.select('bathrooms', 'bathrooms_text').show()

24/11/05 16:14:31 WARN TaskSetManager: Stage 42 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.


+---------+----------------+
|bathrooms|  bathrooms_text|
+---------+----------------+
|      3.0|         3 baths|
|      1.0|   1 shared bath|
|      1.0|   1 shared bath|
|      1.0|   1 shared bath|
|      1.0|   1 shared bath|
|      1.5|1.5 shared baths|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
|      1.0|          1 bath|
+---------+----------------+
only showing top 20 rows



In [27]:
from pyspark.sql.functions import element_at, split
bathrooms_df = min_nights_df.withColumn('bathrooms', element_at(split('bathrooms_text', ' '), 1).cast('double')).drop('bathrooms_text')

bathrooms_df.show()

24/11/05 16:15:02 WARN TaskSetManager: Stage 45 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.


+---------+----------+-----------------+-----------------+-----------------+--------------------+---------------+------------+---------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|  host_id|host_since|host_is_superhost|         latitude|        longitude|       property_type|      room_type|accommodates|bathrooms|bedrooms|beds|           amenities|price|minimum_nights|maximum_nights|number_of_reviews|review_scores_rating|             license|instant_bookable|reviews_per_month|
+---------+----------+-----------------+-----------------+-----------------+--------------------+---------------+------------+---------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|  8450558|2013-08-27|                t|         48.86514|          2.34003|  Entire rental

                                                                                

In [28]:
boolean_df = (
  bathrooms_df
  .withColumn('instant_bookable', col('instant_bookable') == 't')
  .withColumn('host_is_superhost', col('host_is_superhost') == 't')
)

In [30]:
from pyspark.sql.functions import explode, lower

amenities_df = boolean_df.withColumn('amenities', split(translate('amenities', '\\]\\[\\"', ''), ','))


amenities_df \
  .select(explode('amenities')) \
  .withColumn('item', lower('col')) \
  .groupBy('item').count() \
  .sort('count') \
  .show()


24/11/05 16:15:57 WARN TaskSetManager: Stage 48 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.

+--------------------+-----+
|                item|count|
+--------------------+-----+
| 56 inch hdtv wit...|    1|
| dr barbara sturm...|    1|
| samsung  stainle...|    1|
| rosiu00e8re stai...|    1|
| 1 inch hdtv with...|    1|
| rosiere.  ne pas...|    1|
| garnier ultra do...|    1|
| timoteu00ef cond...|    1|
| electrical stain...|    1|
| les choses simpl...|    1|
| les choses simpl...|    1|
|   hema  conditioner|    1|
| muzzu produit ar...|    1|
| fast wifi u2013 ...|    1|
|         divers oven|    1|
|   respire body soap|    1|
| fast wifi u2013 ...|    1|
| 39 inch hdtv wit...|    1|
| 85 inch hdtv wit...|    1|
| u00abu00a0 jbl o...|    1|
+--------------------+-----+
only showing top 20 rows



                                                                                

## Ultimate function which combines all previous transformation

In [32]:
def prepare_paris_listings(raw_df):

  columns_to_keep = [
      'host_id','host_since','host_is_superhost','latitude','longitude','property_type','room_type','accommodates','bathrooms','bathrooms_text','bedrooms','beds','amenities','price','minimum_nights','maximum_nights', 'number_of_reviews','review_scores_rating','license','instant_bookable','reviews_per_month'
  ]
  
  df = (
    raw_df.select(columns_to_keep)
    .withColumn("price", translate(col("price"), "$,", "").cast("float"))
    .dropna()
    .filter(col("price") <= 10000)
    .filter(col("minimum_nights") <= 90)
    .withColumn('bathrooms', element_at(split('bathrooms_text', ' '), 1).cast('double')).drop('bathrooms_text')
    .withColumn('host_since', to_date('host_since'))
    .withColumn('instant_bookable', col('instant_bookable') == 't')
    .withColumn('host_is_superhost', col('host_is_superhost') == 't')
    .withColumn('amenities', split(translate('amenities', '\\]\\[\\"', ''), ','))
  )

  return df 

df = prepare_paris_listings(listings_df)
df.show()

24/11/05 16:18:29 WARN TaskSetManager: Stage 54 contains a task of very large size (41281 KiB). The maximum recommended task size is 1000 KiB.
[Stage 54:>                                                         (0 + 1) / 1]

+-------+----------+-----------------+-----------------+-----------------+------------------+---------------+------------+---------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|host_id|host_since|host_is_superhost|         latitude|        longitude|     property_type|      room_type|accommodates|bathrooms|bedrooms|beds|           amenities|price|minimum_nights|maximum_nights|number_of_reviews|review_scores_rating|             license|instant_bookable|reviews_per_month|
+-------+----------+-----------------+-----------------+-----------------+------------------+---------------+------------+---------+--------+----+--------------------+-----+--------------+--------------+-----------------+--------------------+--------------------+----------------+-----------------+
|   3631|2008-10-14|            false|         48.83191|           2.3187|Entire rental unit|Entire hom

24/11/05 16:18:33 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 54 (TID 565): Attempting to kill Python Worker
                                                                                