In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import avg
from pyspark.sql.types import FloatType
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
credentials_location = '/home/sal/git/data-engineering-capstone/spark/keys/my-creds.json'

In [3]:
GCS_connector = "/home/sal/git/data-engineering-capstone/spark/lib/gcs-connector-hadoop3-2.2.5.jar" 
GBQ_connector = "/home/sal/git/data-engineering-capstone/spark/lib/spark-3.3-bigquery-0.36.1.jar"
conf_jars = f"{GCS_connector},{GBQ_connector}"

In [4]:
temp_GCS_Bucket = "airbnb-data-lake"
bucket_name = "airbnb-data-lake"

In [5]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('airbnb') \
    .set("spark.jars", conf_jars) \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location) \
    .set("temporaryGcsBucket",temp_GCS_Bucket)

In [6]:
sc = SparkContext(conf=conf)

24/04/15 19:14:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
hadoop_conf = sc._jsc.hadoopConfiguration()

In [8]:
hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [9]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [10]:
df = spark.read.format('bigquery') \
  .option('parentProject', 'airbnb-de-project') \
  .option("table", "airbnb-de-project.airbnb_dw.airbnb_data") \
  .load()

In [11]:
df.printSchema()

root
 |-- listing_id: long (nullable = true)
 |-- host_id: long (nullable = true)
 |-- host_since: string (nullable = true)
 |-- host_response_rate: double (nullable = true)
 |-- host_is_superhost: string (nullable = true)
 |-- host_total_listings_count: long (nullable = true)
 |-- host_has_profile_pic: string (nullable = true)
 |-- host_identity_verified: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- district: string (nullable = true)
 |-- city: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: long (nullable = true)
 |-- bedrooms: long (nullable = true)
 |-- price: long (nullable = true)
 |-- review_scores_rating: long (nullable = true)
 |-- review_scores_accuracy: long (nullable = true)
 |-- review_scores_cleanliness: long (nullable = true)
 |-- review_scores_checkin: long (nullable = true)
 |-- 

In [12]:
df.show()

24/04/15 19:16:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+----------+---------+----------------+------------------+-----------------+-------------------------+--------------------+----------------------+-------------+--------+--------------+---------+---------+--------------------+------------+------------+--------+-----+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+---------+---------+-----------+----------------+--------------------+
|listing_id|  host_id|      host_since|host_response_rate|host_is_superhost|host_total_listings_count|host_has_profile_pic|host_identity_verified|neighbourhood|district|          city| latitude|longitude|       property_type|   room_type|accommodates|bedrooms|price|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|price_usd|review_id|reviewer_id|     review_date|listing_review_count|


In [13]:
# Convert price to USD

# Conversion rates
conversion_rates = {
    'Paris': 1.07,
    'New York': 1,
    'Bangkok': 0.027,
    'Rio de Janeiro': 0.20,
    'Sydney': 0.65,
    'Istanbul': 0.031,
    'Rome': 1.07,
    'Hong Kong': 0.13,
    'Mexico City': 0.060,
    'Cape Town': 0.053
}

# Define a UDF (user-defined function) to calculate the converted price
@udf(FloatType())
def convert_currency(city, price):
    return price * conversion_rates.get(city, 1)

# Apply the UDF to create a new column "price_usd"
df = df.withColumn("price_usd", convert_currency(col("city"), col("price")))

# Display the updated dataframe
df.head(1)

                                                                                

[Row(listing_id=37200137, host_id=77212938, host_since='2018-06-12T00:00', host_response_rate=1.0, host_is_superhost='t', host_total_listings_count=2, host_has_profile_pic='t', host_identity_verified='t', neighbourhood='Joa', district=None, city='Rio de Janeiro', latitude=-23.01489, longitude=-43.29168, property_type='Entire condominium', room_type='Entire place', accommodates=6, bedrooms=3, price=480, review_scores_rating=100, review_scores_accuracy=10, review_scores_cleanliness=10, review_scores_checkin=10, review_scores_communication=10, review_scores_location=10, review_scores_value=10, price_usd=96.0, review_id=607725284, reviewer_id=107166188, review_date='2022-02-18T00:00', listing_review_count=44)]

In [14]:
df.show(5)

+----------+--------+----------------+------------------+-----------------+-------------------------+--------------------+----------------------+-------------+--------+--------------+---------+---------+------------------+------------+------------+--------+-----+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+---------+---------+-----------+----------------+--------------------+
|listing_id| host_id|      host_since|host_response_rate|host_is_superhost|host_total_listings_count|host_has_profile_pic|host_identity_verified|neighbourhood|district|          city| latitude|longitude|     property_type|   room_type|accommodates|bedrooms|price|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|price_usd|review_id|reviewer_id|     review_date|listing_review_count|
+-----

In [15]:
# Calculate the average price_usd
avg_price_usd = df.groupBy().agg(avg('price_usd').alias('average_price_usd'))

# Show the result
avg_price_usd.show()



+-----------------+
|average_price_usd|
+-----------------+
|88.78709692960278|
+-----------------+



                                                                                

In [17]:
# # Write transformed data back to BigQuery
df.write \
    .format("bigquery") \
    .option("parentProject", "airbnb-de-project") \
    .option("temporaryGcsBucket", "airbnb-data-lake") \
    .option("table", "airbnb-de-project.airbnb_dw.airbnb_data_spark") \
    .mode("overwrite") \
    .save()


                                                                                

In [18]:
spark.stop()