In [None]:
pip install pyspark == 3.5.2

In [3]:
!pip show pyspark

Name: pyspark
Version: 3.5.2
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: py4j
Required-by: 


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf 
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType # For defining the schema of our Kafka messages
from transformers import pipeline
import logging
import os

logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s') # Configure logging to only display errors


# Create a checkpoint dir where Spark will store its progress
checkpoint_dir = "/kaggle/working/checkpoints/kafka_to_mongo"
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)

# Create dict which holds our Kafka and MongoDB creds
config = {
    "kafka": {
    'bootstrap.servers': '',
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms':'PLAIN',
    'sasl.username':'',
    'sasl.password':'',
    'client.id':'json-serial-producer'
},
    "mongodb": {
        "uri": 'mongodb+srv://<username>:<password>@yelp-cluster.tp3ga.mongodb.net/?retryWrites=true&w=majority',
        "database": 'reviewsdb',
        "collection": "enriched_reviews_collection"
    }
}

sentiment_pipeline = pipeline("text-classification", model = "distilbert-base-uncased-finetuned-sst-2-english")

# analyze_sentiment takes text as input and return the sentiment label (Positive/Negative)
def analyze_sentiment(text):
    if text and isinstance(text, str):
        try:
            result = sentiment_pipeline(text)[0]
            return result['label']
        except Exception as e:
            logging.error(f"Error in sentiment analysis: {e}")
            return "Error"
    return "Empty or Invalid"

# Wrap analyze_sentiment in UDF which allows Spark to appy it to the streaming data
sentiment_udf = udf(analyze_sentiment, StringType())

def read_from_kafka_and_write_to_mongo(spark):
    topic = "raw_topic"

    # Define the schema of the Kafka messages
    schema = StructType([
        StructField("review_id",StringType()),
        StructField("user_id",StringType()),
        StructField("business_id",StringType()),
        StructField("stars",FloatType()),
        StructField("useful",IntegerType()),
        StructField("funny",IntegerType()),
        StructField("cool",IntegerType()),
        StructField("text",StringType()),
        StructField("date",StringType())
    ])
    
    stream_df = (spark.readStream
                 .format("kafka")
                 .option("kafka.bootstrap.servers",config['kafka']['bootstrap.servers'])
                 .option("subscribe",topic) # Subscribe to the raw_topic
                 .option("kafka.security.protocol", config['kafka']['security.protocol'])
                 .option("kafka.sasl.mechanism",config['kafka']['sasl.mechanisms'])
                 .option("kafka.sasl.jaas.config",
                        f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{config["kafka"]["sasl.username"]}" '
                        f'password="{config["kafka"]["sasl.password"]}";')
                 .option("failOnDataLoss","false")
                 .load()
                )
    
    # Convert the raw Kafka messages into structured data frame using from json
    parsed_df = stream_df.select(from_json(col('value').cast("string"), schema).alias("data")).select("data.*")
    
    # Enrich the parsed data frame by adding a new column called 'sentiment'
    enriched_df = parsed_df.withColumn("sentiment", sentiment_udf(col('text')))
    
    # Write the enriched data frame to MongoDB
    query = (enriched_df.writeStream
             .format("mongodb")
             .option("spark.mongodb.connection.uri", config['mongodb']['uri'])
             .option("spark.mongodb.database", config['mongodb']['database'])
             .option("spark.mongodb.collection", config['mongodb']['collection'])
             .option("checkpointLocation", checkpoint_dir)
             .outputMode("append")
             .start()
             .awaitTermination()
            )
    
if __name__ == "__main__":
    spark = (SparkSession.builder
          .appName("KafkaStreamToMongo")
          .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.mongodb.spark:mongo-spark-connector_2.12:10.4.0")
          .getOrCreate()
          )
    read_from_kafka_and_write_to_mongo(spark)
