In [None]:
# Import necessary modules
from pyspark.sql import SparkSession
from src.data_pipeline.ingestion import load_airbnb_data, load_geojson_data, load_rentals_data
from src.data_pipeline.cleaning import clean_airbnb_data
from src.data_pipeline.geo_enrichment import enrich_airbnb_data
from src.data_pipeline.transformations import (
    impute_review_scores, 
    transform_room_type, 
    calculate_investment_potential
)
from src.data_pipeline.utils import save_to_delta
from src.data_pipeline.visualization import generate_visualizations

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Databricks Pipeline") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.1.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Define file paths (adjust paths if files are in DBFS)
airbnb_path = "/dbfs/FileStore/tables/airbnb.csv"
geojson_path = "/dbfs/FileStore/tables/post_codes.geojson"
rentals_path = "/dbfs/FileStore/tables/rentals.json"
output_path = "/dbfs/delta"

# Step 1: Load Airbnb and GeoJSON data
airbnb_df = load_airbnb_data(spark, airbnb_path)
geojson_df = load_geojson_data(spark, geojson_path)

# Step 2: Clean and enrich Airbnb data
airbnb_cleaned = clean_airbnb_data(airbnb_df)
airbnb_enriched = enrich_airbnb_data(airbnb_cleaned, geojson_df)

# Step 3: Apply transformations
airbnb_transformed = impute_review_scores(airbnb_enriched)
airbnb_transformed = transform_room_type(airbnb_transformed)

# Step 4: Load rental data
rentals_df = load_rentals_data(spark, rentals_path)

# Step 5: Calculate investment potential
investment_metrics = calculate_investment_potential(airbnb_transformed, rentals_df)

# Step 6: Save outputs to Delta tables
save_to_delta(airbnb_transformed, f"{output_path}/airbnb_gold")
save_to_delta(investment_metrics, f"{output_path}/investment_metrics")

# Step 7: Generate visualizations
generate_visualizations(airbnb_transformed, f"{output_path}/visualizations")

# End Spark Session
spark.stop()

print("Pipeline executed successfully. Outputs saved to Delta tables.")