In [1]:
!pip install pyspark duckdb
!wget -O "duckdb.jar" "https://repo1.maven.org/maven2/org/duckdb/duckdb_jdbc/0.10.1/duckdb_jdbc-0.10.1.jar"
import pyspark
from pyspark.sql import SparkSession
import duckdb
from pyspark.sql import functions as F
from pyspark.sql.window import Window

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=ea575f322779c07662f2834f5d77942a373674a21ed5a49856c49c2af3aec65e
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
--2024-04-25 16:00:06--  https://repo1.maven.org/maven2/org/duckdb/duckdb_jdbc/0.10.1/duckdb_jdbc-0.10.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.20

In [2]:
conn = duckdb.connect("quality_database.duckdb")
conn.close()
spark = SparkSession.builder \
    .config("spark.jars", "duckdb.jar") \
    .getOrCreate()

We read the three datasets

In [3]:
DF_weather = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:duckdb:quality_database.duckdb") \
  .option("driver", "org.duckdb.DuckDBDriver") \
  .option("query", "SELECT * FROM weather") \
  .load()

DF_weather.show()

+----------+------+-----------------+------------------+------------------+------------------------+-----------------------------+-------------------+--------------------+--------------------+-------------------+--------------------+---------+
|      date|  city|         latitude|         longitude|avg_temperature_2m|avg_relative_humidity_2m|avg_precipitation_probability|total_precipitation|     avg_cloud_cover| avg_cloud_cover_low|avg_cloud_cover_mid|avg_cloud_cover_high|estat_cel|
+----------+------+-----------------+------------------+------------------+------------------------+-----------------------------+-------------------+--------------------+--------------------+-------------------+--------------------+---------+
|2024-03-19|Athens|       33.8797677|-83.42271378947368| 7.222916675576319|                  31.625|                          0.0|                0.0|                 0.0|                 0.0|                0.0|                 0.0|Despejado|
|2024-03-19|Athens|     

In [4]:
DF_airbnb = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:duckdb:quality_database.duckdb") \
  .option("driver", "org.duckdb.DuckDBDriver") \
  .option("query", "SELECT * FROM airbnb") \
  .load()

DF_airbnb.show()

+------------------+---------------+-----------+------------+---------------+-----------------+-----+---+------------------+--------------------------+--------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+---------+--------+--------------------+-------+
|           realSum|      room_type|room_shared|room_private|person_capacity|host_is_superhost|multi|biz|cleanliness_rating|guest_satisfaction_overall|bedrooms|              dist|        metro_dist|        attr_index|   attr_index_norm|        rest_index|   rest_index_norm|              lng|               lat|     city|day_type|cleanliness_category|cluster|
+------------------+---------------+-----------+------------+---------------+-----------------+-----+---+------------------+--------------------------+--------+------------------+------------------+------------------+------------------+------------------+------------------+------

In [5]:
DF_flights = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:duckdb:quality_database.duckdb") \
  .option("driver", "org.duckdb.DuckDBDriver") \
  .option("query", "SELECT * FROM flights") \
  .load()

DF_flights.show()

+---------------+---------+---------+---------+---------------------+--------------------+--------------------+--------------+-----------+---------------+----------------+---------+------------+--------------+-----------+
|airport_acronym|longitude| latitude|     city|flight_number_default|          owner_name|          owner_logo|origin_airport|origin_city|origin_latitude|origin_longitude|has_delay|arrival_date|departure_time|time_of_day|
+---------------+---------+---------+---------+---------------------+--------------------+--------------------+--------------+-----------+---------------+----------------+---------+------------+--------------+-----------+
|           EHAM| 4.763889|52.308609|Amsterdam|                OS373|   Austrian Airlines|https://images.fl...|           VIE|     Vienna|      48.110271|       16.569719|        0|  2024-03-21|      13:50:00|  afternoon|
|           EHAM| 4.763889|52.308609|Amsterdam|               KL1920|      KLM Cityhopper|https://images.fl...| 

We write the two datasets that we won't modify in the new duckdb database

In [6]:
DF_flights.write \
    .format("jdbc") \
    .option("url", "jdbc:duckdb:exploition_database.duckdb") \
    .option("dbtable", "airbnb") \
    .option("driver", "org.duckdb.DuckDBDriver") \
    .mode("overwrite") \
    .save()

In [7]:
DF_weather.write \
    .format("jdbc") \
    .option("url", "jdbc:duckdb:exploition_database.duckdb") \
    .option("dbtable", "airbnb") \
    .option("driver", "org.duckdb.DuckDBDriver") \
    .mode("overwrite") \
    .save()

Now we will create a table with all the airports and the coordinates of the minimum euclidean distance to the airports in the weather dataset

In [8]:
flights = DF_flights.alias("flights")
weather = DF_weather.alias("weather")

# Make a window to select a single row for each airport_acronym
window_spec = Window.partitionBy("flights.airport_acronym").orderBy("flights.airport_acronym")

# Use row_number to select a single row for each airport_acronym
flights = flights.withColumn("row_num", F.row_number().over(window_spec)).filter(F.col("row_num") == 1).drop("row_num")
flights = flights.select("airport_acronym", "latitude", "longitude")

# Make a window to select a single row for each latitude and longitude
window_spec = Window.partitionBy("weather.latitude", "weather.longitude").orderBy("weather.latitude", "weather.longitude")

# Use row_number to select a single row for each latitude and longitude
weather = weather.withColumn("row_num", F.row_number().over(window_spec)).filter(F.col("row_num") == 1).drop("row_num")

# Calculate the Euclidean distance between each airport and all weather points
distance_df = flights.crossJoin(weather).withColumn(
    "distance",
    F.sqrt(
        (F.col("flights.latitude") - F.col("weather.latitude")) ** 2 +
        (F.col("flights.longitude") - F.col("weather.longitude")) ** 2
    )
)

# Make a window to select the row with the minimum distance for each airport
windowSpec = Window.partitionBy("flights.airport_acronym").orderBy("distance")

# Select the row with the minimum distance for each airport
min_distance_df = distance_df.withColumn("row_num", F.row_number().over(windowSpec)).filter(F.col("row_num") == 1)

# Select the columns to show
final_df_airports = min_distance_df.select(
    F.col("flights.airport_acronym").alias("airport_acronym"),
    F.col("weather.latitude").alias("latitude"),
    F.col("weather.longitude").alias("longitude")
)

# Show the final dataframe
final_df_airports.show()

+---------------+------------------+--------------------+
|airport_acronym|          latitude|           longitude|
+---------------+------------------+--------------------+
|           CYXU|        34.0397677|          -83.296398|
|           EDDB|52.437036500000005|          13.4688599|
|           EGGW|51.587445599999995|-0.20776529999999999|
|           EGKB|        51.4274456|          -0.0477653|
|           EGKK|        51.4274456| -0.1909231947368421|
|           EGLC| 51.50323507368421|          -0.0477653|
|           EGLL| 51.46955086315789|-0.20776529999999999|
|           EGMC| 51.57060349473684|          -0.0477653|
|           EGSS|51.587445599999995|          -0.0477653|
|           EGTK|51.587445599999995|-0.20776529999999999|
|           EHAM| 52.30992170526316|           4.8124534|
|           FAEL|        41.8133203|          12.5629321|
|           KAHN| 33.95555717368421|  -83.32166115789474|
|           KGON|        34.0397677|          -83.296398|
|           KL

In [9]:
final_df_airports.write \
    .format("jdbc") \
    .option("url", "jdbc:duckdb:quality_database.duckdb") \
    .option("dbtable", "airports") \
    .option("driver", "org.duckdb.DuckDBDriver") \
    .save()

Now, we will add the minimum euclidean distance coordinades from the weather dataset inside the airbnb dataset. We will not create another dataset because this new dataset will have the same rows as the airbnb dataset.

In [10]:
DF_airbnb = DF_airbnb.withColumn("id", F.monotonically_increasing_id()) # Add an id column to the airbnb DataFrame

airbnb = DF_airbnb.alias("airbnb")
weather = DF_weather.alias("weather")

# Get all the cities in both DataFrames
cities = airbnb.select("city").distinct().union(weather.select("city").distinct()).distinct().collect()

# List to store the DataFrames with the results for each city
results_per_city = []

for city_row in cities:
    # For each city
    city = city_row.city
    print(f"City: {city}")

    # Filter the DataFrames by the city
    airbnb_city = airbnb.filter(F.col("city") == city)
    weather_city = weather.filter(F.col("city") == city)

    # Calculate the Euclidean distance between each airbnb and all weather points
    distance_df = airbnb_city.crossJoin(weather_city).withColumn(
        "distance",
        F.sqrt(
            (F.col("airbnb.lat") - F.col("weather.latitude")) ** 2 +
            (F.col("airbnb.lng") - F.col("weather.longitude")) ** 2
        )
    )

    # Make a window to select the row with the minimum distance for each airbnb
    window_spec_airbnb = Window.partitionBy("airbnb.id").orderBy("distance")
    closest_weather = distance_df.withColumn("row_num", F.row_number().over(window_spec_airbnb))\
                                .filter(F.col("row_num") == 1)\
                                .select(
                                    F.col("airbnb.id").alias("id"),
                                    F.col("weather.latitude").alias("latitude_w"),
                                    F.col("weather.longitude").alias("longitude_w")
                                )

    # Append the results to the list
    results_per_city.append(closest_weather)

# Union all the DataFrames in the list
final_df = results_per_city[0]
for df in results_per_city[1:]:
    final_df = final_df.union(df)

City: Lisbon
City: Berlin
City: London
City: Vienna
City: Paris
City: Athens
City: Barcelona
City: Amsterdam
City: Rome
City: Budapest


In [11]:
# Join the final DataFrame with the airbnb DataFrame
result_df = DF_airbnb.join(final_df, "id", "left")

In [12]:
# Define the types of the columns
column_types = {
    "id": "INT",
    "realSum": "DOUBLE",
    "room_type": "STRING",
    "room_shared": "BOOLEAN",
    "room_private": "BOOLEAN",
    "person_capacity": "DOUBLE",
    "host_is_superhost": "BOOLEAN",
    "multi": "STRING",
    "biz": "STRING",
    "cleanliness_rating": "DOUBLE",
    "guest_satisfaction_overall": "DOUBLE",
    "bedrooms": "INT",
    "dist": "DOUBLE",
    "metro_dist": "DOUBLE",
    "attr_index": "DOUBLE",
    "attr_index_norm": "DOUBLE",
    "rest_index": "DOUBLE",
    "rest_index_norm": "DOUBLE",
    "lng": "DOUBLE",
    "lat": "DOUBLE",
    "city": "STRING",
    "day_type": "STRING",
    "cleanliness_category": "STRING",
    "cluster": "INT",
    "latitude_w": "DOUBLE",
    "longitude_w": "DOUBLE"
}

# Convert the dictionary to a string
column_types_str = ", ".join([f"{col} {data_type}" for col, data_type in column_types.items()])

# Save the DataFrame to the database
result_df.write \
    .format("jdbc") \
    .option("url", "jdbc:duckdb:exploition_database.duckdb") \
    .option("dbtable", "airbnb") \
    .option("driver", "org.duckdb.DuckDBDriver") \
    .option("createTableColumnTypes", column_types_str) \
    .mode("overwrite") \
    .save()