In [42]:
import timeit

start_time = timeit.default_timer()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
import random

end_time = timeit.default_timer()
elapsed_time = end_time - start_time

print("Elapsed time : ",elapsed_time)

Elapsed time :  0.0001572499986650655


In [43]:
start_time = timeit.default_timer()

spark = SparkSession.builder \
  .appName("IcebergLocalDevelopment") \
  .master("local[*]") \
  .config('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2') \
  .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
  .config("spark.sql.catalog.local.type", "hadoop") \
  .config("spark.sql.catalog.local.warehouse", "spark-warehouse/iceberg") \
  .getOrCreate()

end_time = timeit.default_timer()
elapsed_time = end_time - start_time

print("Elapsed time : ",elapsed_time)

Elapsed time :  0.0035901520004699705


In [44]:
start_time = timeit.default_timer()

try:
    df = spark.read.json("expedia-lodging-listings-en_us-1-all.jsonl")
    num_rows = df.count()
    print(f"Toal number of samples in this dataset: {num_rows}")
    num_columns = len(df.columns)

    print(f"The dataset contains {num_columns} columns.")
    df.show(10)
    print("DataFrame read and displayed successfully.")
except AnalysisException as e:
    print(f"An error occurred during DataFrame reading or displaying: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

end_time = timeit.default_timer()
elapsed_time = end_time - start_time

print("Elapsed time : ",elapsed_time)



Toal number of samples in this dataset: 1000000
The dataset contains 10 columns.
+--------------------+--------------------+-------------+---------------+-------------------+--------------------+--------------------+--------------+----------+----------------+
|            bookable|       chainAndBrand|      country|inventorySource|        lastUpdated|          propertyId|        propertyType|referencePrice|starRating|vrboPropertyType|
+--------------------+--------------------+-------------+---------------+-------------------+--------------------+--------------------+--------------+----------+----------------+
|{false, false, true}|                NULL|United States|           vrbo|03-04-2023 15:35:05|{33555268, 107476...|          {7, Condo}|  {USD, 455.0}|      NULL|         {false}|
|{false, false, true}|                NULL|United States|           vrbo|03-04-2023 15:35:05|{33555279, 107476...|          {7, Condo}|  {USD, 521.0}|      NULL|         {false}|
|{false, false, true}|  

                                                                                

In [45]:
import timeit
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, IntegerType

start_time = timeit.default_timer()

try:
    target_schema = StructType([
        StructField("listing_source_site", StringType(), False),
        StructField("property_modified_date", TimestampType(), False),
        StructField("star_rating", StringType(), True),
        StructField("currency", StringType(), False),
        StructField("usd_price", DoubleType(), True),
        StructField("chain_and_brand", StructType([
            StructField("brand_id", IntegerType(), True),
            StructField("chain_id", IntegerType(), True),
            StructField("brand_name", StringType(), True),
            StructField("chain_name", StringType(), True)
        ]), True),
        StructField("country_code", StringType(), False)
    ])
    print("Schema defined successfully.")
except Exception as e:
    print(f"An error occurred while defining the schema: {e}")

end_time = timeit.default_timer()
elapsed_time = end_time - start_time

print("Elapsed time:", elapsed_time)

Schema defined successfully.
Elapsed time: 0.0008199949988920707


In [46]:
start_time = timeit.default_timer()


try:

    country_code_mapping = {

    "Bonaire Saint Eustatius and Saba": "BSES",
    "Paraguay": "PY",
    "Anguilla": "AI",
    "Macao": "MO",
    "U.S. Virgin Islands": "USV",
    "Senegal": "SN",
    "Sweden": "SE",
    "Guyana": "GY",
    "Philippines": "PH",
    "Jersey": "JE",
    "Eritrea": "ER",
    "Djibouti": "DJ",
    "Norfolk Island": "NF",
    "Tonga": "TO",
    "Singapore": "SG",
    "Malaysia": "MY",
    "Fiji": "FJ",
    "Turkey": "TR",
    "Malawi": "MW",
    "Germany": "DE",
    "Northern Mariana Islands": "MP",
    "Comoros": "KM",
    "Cambodia": "KH",
    "Maldives": "MV",
    "Ivory Coast": "IC",
    "Jordan": "JO",
    "Rwanda": "RW",
    "Palau": "PW",
    "France": "FR",
    "Turks and Caicos Islands": "TC",
    "Greece": "GR",
    "Sri Lanka": "LK",
    "Montserrat": "MS",
    "Taiwan": "TW",
    "Dominica": "DM",
    "British Virgin Islands": "VG",
    "Algeria": "DZ",
    "Togo": "TG",
    "Equatorial Guinea": "GQ",
    "Slovakia": "SK",
    "Reunion": "RE",
    "Argentina": "AR",
    "Belgium": "BE",
    "Angola": "AO",
    "San Marino": "SM",
    "Ecuador": "EC",
    "Qatar": "QA",
    "Lesotho": "LS",
    "Albania": "AL",
    "Madagascar": "MG",
    "Finland": "FI",
    "New Caledonia": "NC",
    "Ghana": "GH",
    "Myanmar": "MM",
    "Nicaragua": "NI",
    "Guernsey": "GG",
    "Peru": "PE",
    "Benin": "BJ",
    "Sierra Leone": "SL",
    "United States": "US",
    "India": "IN",
    "Bahamas": "BS",
    "China": "CN",
    "Curacao": "CUR",
    "Belarus": "BY",
    "Malta": "MT",
    "Kuwait": "KW",
    "Sao Tome and Principe": "ST",
    "Palestinian Territory": "PT",
    "Puerto Rico": "PR",
    "Chile": "CL",
    "Tajikistan": "TJ",
    "Martinique": "MQ",
    "Cayman Islands": "KY",
    "Isle of Man": "IM",
    "Croatia": "HR",
    "Burundi": "BI",
    "Nigeria": "NG",
    "Andorra": "AD",
    "Bolivia": "BO",
    "Gabon": "GA",
    "Italy": "IT",
    "Suriname": "SR",
    "Lithuania": "LT",
    "Norway": "NO",
    "Turkmenistan": "TM",
    "Spain": "ES",
    "Cuba": "CU",
    "Mauritania": "MR",
    "Guadeloupe": "GP",
    "Denmark": "DK",
    "Barbados": "BB",
    "Bangladesh": "BD",
    "Ireland": "IE",
    "Liechtenstein": "LI",
    "Swaziland": "SL",
    "Thailand": "TH",
    "Laos": "LA",
    "Christmas Island": "CX",
    "Bhutan": "BT",
    "Democratic Republic of the Congo": "DRC",
    "Morocco": "MA",
    "Monaco": "MC",
    "Panama": "PA",
    "Cape Verde": "CV",
    "Hong Kong": "HK",
    "Israel": "IL",
    "Iceland": "IS",
    "Saint Barthelemy": "SB",
    "Saint Kitts and Nevis": "KN",
    "Oman": "OM",
    "French Polynesia": "PF",
    "South Korea": "KR",
    "Cyprus": "CY",
    "Gibraltar": "GI",
    "Uruguay": "UY",
    "Mexico": "MX",
    "Aruba": "AW",
    "Montenegro": "ME",
    "Georgia": "GE",
    "Zimbabwe": "ZW",
    "Estonia": "EE",
    "Indonesia": "ID",
    "Saint Vincent and the Grenadines": "VC",
    "Guatemala": "GT",
    "Guam": "GU",
    "Mongolia": "MN",
    "Republic of the Congo": "CG",
    "Azerbaijan": "AZ",
    "Sint Maarten": "SIM",
    "Grenada": "GD",
    "Armenia": "AM",
    "Tunisia": "TN",
    "Liberia": "LR",
    "Honduras": "HN",
    "Trinidad and Tobago": "TT",
    "Saudi Arabia": "SA",
    "Uganda": "UG",
    "Wallis and Futuna": "WF",
    "French Guiana": "GF",
    "Namibia": "NA",
    "Mayotte": "YT",
    "Switzerland": "CH",
    "Zambia": "ZM",
    "Ethiopia": "ET",
    "Jamaica": "JM",
    "Latvia": "LV",
    "United Arab Emirates": "AE",
    "Brunei": "BR",
    "Saint Lucia": "LC",
    "Saint Martin": "SAM",
    "Aland Islands": "AI",
    "Guinea": "GN",
    "Canada": "CA",
    "Seychelles": "SC",
    "Kyrgyzstan": "KG",
    "Uzbekistan": "UZ",
    "Macedonia": "MD",
    "Faroe Islands": "FO",
    "Samoa": "WS",
    "Czech Republic": "CZ",
    "Mozambique": "MZ",
    "Cook Islands": "CK",
    "Brazil": "BR",
    "Belize": "BZ",
    "Kenya": "KE",
    "Gambia": "GM",
    "Lebanon": "LB",
    "Slovenia": "SI",
    "Antigua and Barbuda": "AG",
    "Dominican Republic": "DO",
    "Japan": "JP",
    "Tanzania": "TZ",
    "Botswana": "BW",
    "Luxembourg": "LU",
    "New Zealand": "NZ",
    "United States Minor Outlying Islands": "UM",
    "Bosnia and Herzegovina": "BA",
    "Greenland": "GL",
    "Haiti": "HT",
    "Poland": "PL",
    "Portugal": "PT",
    "Australia": "AU",
    "Cameroon": "CM",
    "Papua New Guinea": "PG",
    "Romania": "RO",
    "Guinea-Bissau": "GW",
    "Bulgaria": "BG",
    "Austria": "AT",
    "Nepal": "NP",
    "Egypt": "EG",
    "Costa Rica": "CR",
    "El Salvador": "SV",
    "Kazakhstan": "KZ",
    "Serbia": "RS",
    "South Africa": "ZA",
    "Burkina Faso": "BF",
    "Bermuda": "BM",
    "Bahrain": "BH",
    "Micronesia": "MC",
    "Colombia": "CO",
    "Hungary": "HU",
    "Pakistan": "PK",
    "Vanuatu": "VU",
    "Mauritius": "MU",
    "United Kingdom": "GB",
    "Moldova": "MD",
    "Vietnam": "VN",
    "Netherlands": "NL",
    "Mali": "ML",
    "Chad": "TD",
    "Svalbard and Jan Mayen": "SJ",
    "Sudan": "SD",
    "Niue": "NU",
    "Kiribati": "KI",
    "Iraq": "IQ",
    "American Samoa": "AS",
    "Saint Pierre and Miquelon": "PM",
    "Niger": "NE",
    "Solomon Islands": "SB"

}



    def get_country_code(country_name):
        if country_name is None:
            return "Unknown"
        return country_code_mapping.get(country_name, "Unknown")

    country_udf = F.udf(get_country_code, StringType())

    print("UDF registered successfully.")
except Exception as e:
    print(f"An error occurred while defining or registering the UDF: {e}")



end_time = timeit.default_timer()
elapsed_time = end_time - start_time

print("Elapsed time : ",elapsed_time)

UDF registered successfully.
Elapsed time :  0.003499652999380487


In [47]:
start_time = timeit.default_timer()

try:

  transformed_df = df.select(
      F.lit("expedia").alias("listing_source_site"),
      F.when(F.col("lastUpdated").isNull(), F.lit("' '"))
        .otherwise(F.to_timestamp("lastUpdated", "dd-MM-yyyy HH:mm:ss"))
        .alias("property_modified_date"),
      F.when(F.col("starRating").isNull(), F.lit("' '"))
        .otherwise(F.col("starRating").cast(StringType()))
        .alias("star_rating"),
      F.when(F.col("referencePrice.currency").isNull(), F.lit("' '"))
        .otherwise(F.col("referencePrice.currency"))
        .alias("currency"),
      F.when(F.col("referencePrice.value").isNull(), F.lit("' '"))
        .otherwise(F.col("referencePrice.value").cast(DoubleType()))
        .alias("usd_price"),
      F.struct(
          F.when(F.col("chainAndBrand.brandId").isNull(), F.concat(F.lit('"brand_id": '), F.lit("' '")))
          .otherwise(F.concat(F.lit('"brand_id": '), F.col("chainAndBrand.brandId").cast(IntegerType()))).alias("brand_id"),
          F.when(F.col("chainAndBrand.chainId").isNull(), F.concat(F.lit('"chain_id": '), F.lit("' '")))
          .otherwise(F.concat(F.lit('"chain_id": '), F.col("chainAndBrand.chainId").cast(IntegerType()))).alias("chain_id"),
          F.when(F.col("chainAndBrand.brandName").isNull(), F.concat(F.lit('"brand_name": '), F.lit("' '")))
          .otherwise(F.concat(F.lit('"brand_name": '), F.col("chainAndBrand.brandName").cast(StringType()))).alias("brand_name"),
          F.when(F.col("chainAndBrand.chainName").isNull(), F.concat(F.lit('"chain_name": '), F.lit("' '")))
          .otherwise(F.concat(F.lit('"chain_name": '), F.col("chainAndBrand.chainName").cast(StringType()))).alias("chain_name")
      ).alias("chain_and_brand"),
      F.col("country")  
  )

  print("DataFrame transformation successful.")
except Exception as e:
  print(f"An error occuured during DataFrame transformation: {e}")

end_time = timeit.default_timer()
elapsed_time = end_time - start_time

print("Elapsed time : ",elapsed_time)

DataFrame transformation successful.
Elapsed time :  0.16818683400015288


In [48]:
start_time = timeit.default_timer()

try:

    final_df = transformed_df.withColumn("country_code", country_udf(F.col("country"))).drop("country")
    final_df.show(20, truncate=False)
    print("DataFrame transformation and display successful.")

except AnalysisException as e:
    print(f"An error occurred during DataFrame transformation or display: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")


end_time = timeit.default_timer()
elapsed_time = end_time - start_time

print("Elapsed time : ",elapsed_time)

+-------------------+----------------------+-----------+--------+---------+----------------------------------------------------------------------------------------------------------------+------------+
|listing_source_site|property_modified_date|star_rating|currency|usd_price|chain_and_brand                                                                                                 |country_code|
+-------------------+----------------------+-----------+--------+---------+----------------------------------------------------------------------------------------------------------------+------------+
|expedia            |2023-04-03 15:35:05   |' '        |USD     |455.0    |{"brand_id": ' ', "chain_id": ' ', "brand_name": ' ', "chain_name": ' '}                                        |US          |
|expedia            |2023-04-03 15:35:05   |' '        |USD     |521.0    |{"brand_id": ' ', "chain_id": ' ', "brand_name": ' ', "chain_name": ' '}                                        |US  

In [49]:
try:
    final_df.writeTo("local.test_db") \
        .partitionedBy("country_code") \
        .createOrReplace()
    print("DataFrame successfully written to the Iceberg table.")
except AnalysisException as e:
    print(f"An error occurred while writing to the Iceberg table: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

                                                                                

DataFrame successfully written to the Iceberg table.


In [50]:
try:
    final_df.printSchema()
    print("Print the iceberg dataset Schema")
except Analysis as a:
    print(f"An error occured while printing iceberg table: {e}")
except Exception as e:
    print(f"An unexcepted error occured: {e}")

root
 |-- listing_source_site: string (nullable = false)
 |-- property_modified_date: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- usd_price: string (nullable = true)
 |-- chain_and_brand: struct (nullable = false)
 |    |-- brand_id: string (nullable = true)
 |    |-- chain_id: string (nullable = true)
 |    |-- brand_name: string (nullable = true)
 |    |-- chain_name: string (nullable = true)
 |-- country_code: string (nullable = true)

Print the iceberg dataset Schema


In [51]:

try:
    iceberg_df = spark.read \
        .format("iceberg") \
        .load("local.test_db")

    iceberg_df.show()
    print("Successfully show table into iceberg format")
except Read as r:
    print(f"An error occured while writing to Iceberg table: {e}")


+-------------------+----------------------+-----------+--------+---------+--------------------+------------+
|listing_source_site|property_modified_date|star_rating|currency|usd_price|     chain_and_brand|country_code|
+-------------------+----------------------+-----------+--------+---------+--------------------+------------+
|            expedia|   2023-04-03 15:35:06|          3|     USD|    55.56|{"brand_id": ' ',...|          PL|
|            expedia|   2023-04-03 15:35:06|        ' '|     USD|      0.0|{"brand_id": ' ',...|          PL|
|            expedia|   2023-04-03 15:35:07|          3|     USD|      0.0|{"brand_id": ' ',...|          PL|
|            expedia|   2023-04-03 15:35:07|        ' '|     USD|    61.79|{"brand_id": 9579...|          PL|
|            expedia|   2023-04-03 15:35:07|          3|     USD|    49.44|{"brand_id": 300,...|          PL|
|            expedia|   2023-04-03 15:35:07|          3|     USD|    38.43|{"brand_id": ' ',...|          PL|
|         

In [52]:


try:
    selected_df = iceberg_df.select("listing_source_site", "star_rating", "usd_price")
    
    selected_df.show()
    print("DataFrame selection and display successful.")
except AnalysisException as e:
    print(f"An error occurred during DataFrame selection or display: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")


+-------------------+-----------+---------+
|listing_source_site|star_rating|usd_price|
+-------------------+-----------+---------+
|            expedia|          3|    55.56|
|            expedia|        ' '|      0.0|
|            expedia|          3|      0.0|
|            expedia|        ' '|    61.79|
|            expedia|          3|    49.44|
|            expedia|          3|    38.43|
|            expedia|        3.5|      0.0|
|            expedia|        ' '|   142.04|
|            expedia|          5|   107.21|
|            expedia|          4|    67.37|
|            expedia|          3|    51.93|
|            expedia|        ' '|     68.9|
|            expedia|        3.5|      0.0|
|            expedia|        ' '|    76.32|
|            expedia|        ' '|   107.06|
|            expedia|          4|    88.54|
|            expedia|        ' '|    72.08|
|            expedia|        ' '|   107.06|
|            expedia|        ' '|     58.3|
|            expedia|          4