# Kafka Consumer Setup for Airbnb Data Flow

## Import Necessary Libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType, BooleanType, IntegerType, LongType, MapType

## Creating a Spark Session
We'll create a spark session which connects with both local kafka and local mongodb

In [None]:
spark = SparkSession.builder \
    .appName("PySparkKafkaMongoDB") \
    .master("local[1]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/project_budt737.airbnb_data") \
    .getOrCreate()

# Set log level to ERROR to reduce verbosity
spark.sparkContext.setLogLevel("ERROR")

## JSON Schema and Kafka Parameters

In [None]:
# Define Kafka parameters
kafka_bootstrap_servers = 'localhost:9092'
kafka_topic_name = 'airbnb-test'

schema = StructType([
    StructField("id", StringType(), True),
    StructField("listing_url", StringType(), True),
    StructField("scrape_id", StringType(), True),
    StructField("last_scraped", StringType(), True),
    StructField("name", StringType(), True),
    StructField("summary", StringType(), True),
    StructField("space", StringType(), True),
    StructField("description", StringType(), True),
    StructField("experiences_offered", StringType(), True),
    StructField("neighborhood_overview", StringType(), True),
    StructField("notes", StringType(), True),
    StructField("transit", StringType(), True),
    StructField("access", StringType(), True),
    StructField("interaction", StringType(), True),
    StructField("house_rules", StringType(), True),
    StructField("thumbnail_url", StringType(), True),
    StructField("medium_url", StringType(), True),
    StructField("picture_url", StringType(), True),
    StructField("xl_picture_url", StringType(), True),
    StructField("host_id", StringType(), True),
    StructField("host_url", StringType(), True),
    StructField("host_name", StringType(), True),
    StructField("host_since", StringType(), True),
    StructField("host_location", StringType(), True),
    StructField("host_about", StringType(), True),
    StructField("host_response_time", StringType(), True),
    StructField("host_response_rate", IntegerType(), True),
    StructField("host_acceptance_rate", StringType(), True),  # Assuming this can be null or a string
    StructField("host_thumbnail_url", StringType(), True),
    StructField("host_picture_url", StringType(), True),
    StructField("host_neighbourhood", StringType(), True),
    StructField("host_listings_count", IntegerType(), True),
    StructField("host_total_listings_count", IntegerType(), True),
    StructField("host_verifications", ArrayType(StringType()), True),
    StructField("street", StringType(), True),
    StructField("neighbourhood", StringType(), True),
    StructField("neighbourhood_cleansed", StringType(), True),
    StructField("neighbourhood_group_cleansed", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zipcode", StringType(), True),
    StructField("market", StringType(), True),
    StructField("smart_location", StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("property_type", StringType(), True),
    StructField("room_type", StringType(), True),
    StructField("accommodates", IntegerType(), True),
    StructField("bathrooms", DoubleType(), True),
    StructField("bedrooms", IntegerType(), True),
    StructField("beds", IntegerType(), True),
    StructField("bed_type", StringType(), True),
    StructField("amenities", ArrayType(StringType()), True),
    StructField("square_feet", StringType(), True),  # Assuming this can be null or a string
    StructField("price", IntegerType(), True),
    StructField("weekly_price", StringType(), True),  # Assuming this can be null or a string
    StructField("monthly_price", StringType(), True),  # Assuming this can be null or a string
    StructField("security_deposit", IntegerType(), True),
    StructField("cleaning_fee", IntegerType(), True),
    StructField("guests_included", IntegerType(), True),
    StructField("extra_people", IntegerType(), True),
    StructField("minimum_nights", IntegerType(), True),
    StructField("maximum_nights", IntegerType(), True),
    StructField("calendar_updated", StringType(), True),
    StructField("has_availability", StringType(), True),  # Assuming this can be null or a string
    StructField("availability_30", IntegerType(), True),
    StructField("availability_60", IntegerType(), True),
    StructField("availability_90", IntegerType(), True),
    StructField("availability_365", IntegerType(), True),
    StructField("availability_365", IntegerType(), True),
    StructField("calendar_last_scraped", StringType(), True),
    StructField("number_of_reviews", IntegerType(), True),
    StructField("first_review", StringType(), True),
    StructField("last_review", StringType(), True),
    StructField("review_scores_rating", IntegerType(), True),
    StructField("review_scores_accuracy", IntegerType(), True),
    StructField("review_scores_cleanliness", IntegerType(), True),
    StructField("review_scores_checkin", IntegerType(), True),
    StructField("review_scores_communication", IntegerType(), True),
    StructField("review_scores_location", IntegerType(), True),
    StructField("review_scores_value", IntegerType(), True),
    StructField("license", StringType(), True),
    StructField("jurisdiction_names", StringType(), True),  # Assuming single value; change if it's an array or a complex type
    StructField("cancellation_policy", StringType(), True),
    StructField("calculated_host_listings_count", IntegerType(), True),
    StructField("reviews_per_month", DoubleType(), True),
    StructField("geolocation", StructType([
        StructField("lon", DoubleType(), True),
        StructField("lat", DoubleType(), True)
    ]), True),
    StructField("features", ArrayType(StringType()), True)
])

## Reading from Kafka and writing to Mongo

In [None]:
# Read from Kafka
df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)\
    .option("subscribe", kafka_topic_name)\
    .option("startingOffsets", "latest")\
    .load()

df_string = df.selectExpr("CAST(value AS STRING)")

# Parse the JSON string
df_structured = df_string.select(from_json(col("value"), schema).alias("data")).select("data.*")

df_transformed = df_structured.withColumnRenamed('id', '_id')

# Write the structured stream to MongoDB
def write_to_mongo(batch_df, epoch_id):
    batch_df.write.format("mongo").mode("append").save()

query = df_transformed.writeStream.foreachBatch(write_to_mongo).start()

query.awaitTermination()