In [1]:
!pip install pyspark==3.5.2

Collecting pyspark==3.5.2
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812364 sha256=a284f0ebedd0ffb2a1ec49f59a3fe297662e2911c357e52ea7249f2f9a18c913
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [2]:
!pip show pyspark

Name: pyspark
Version: 3.5.2
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /opt/conda/lib/python3.10/site-packages
Requires: py4j
Required-by: 


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from transformers import pipeline
import logging
import os

logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')

checkpoint_dir = "/kaggle/working/checkpoints/kafka_to_mongo"
if not os.path.exists(checkpoint_dir):
    os.makedirs(checkpoint_dir)
    
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
MONGO_URI = user_secrets.get_secret("MONGO_URI")
CONFLUENT_BOOTSTRAP_SERVER = user_secrets.get_secret("CONFLUENT_BOOTSTRAP_SERVER")
CONFLUENT_API_KEY = user_secrets.get_secret("CONFLUENT_API_KEY")
CONFLUENT_API_SECRET = user_secrets.get_secret("CONFLUENT_API_SECRET")

    
config = {
    "kafka": {
    "bootstrap.servers":CONFLUENT_BOOTSTRAP_SERVER,
    "security.protocol":"SASL_SSL",
    "sasl.mechanisms":"PLAIN",
    "sasl.username":CONFLUENT_API_KEY,
    "sasl.password":CONFLUENT_API_SECRET,
    "client.id":"json-serial-producer"
},
    "mongodb": {
        "uri":MONGO_URI,
        "database":"reviewsdb",
        "collection":"enriched_reviews_collection"
    }
}

sentiment_pipeline = pipeline("text-classification", model="distilbert-base-uncased-finetuned-sst-2-english", device=0)

def analyze_sentiment(text):
    if text and isinstance(text, str):
        try:
            result = sentiment_pipeline(text)[0]
            return result['label']
        except Exception as e:
            logging.error(f"Error in sentiment analysis: {e}")
            return "Error"
    return "Empty or Invalid"

sentiment_udf = udf(analyze_sentiment, StringType())

def read_from_kafka_and_write_to_mongo(spark):
    topic = "raw_topic"
    
    schema = StructType([
        StructField("review_id",StringType()),
        StructField("user_id",StringType()),
        StructField("business_id",StringType()),
        StructField("stars",FloatType()),
        StructField("useful",IntegerType()),
        StructField("funny",IntegerType()),
        StructField("cool",IntegerType()),
        StructField("text",StringType()),
        StructField("date",StringType())
    ])
    
    stream_df = (spark.readStream
                 .format("kafka")
                 .option("kafka.bootstrap.servers",config['kafka']['bootstrap.servers'])
                 .option("subscribe",topic)
                 .option("kafka.security.protocol", config['kafka']['security.protocol'])
                 .option("kafka.sasl.mechanism",config['kafka']['sasl.mechanisms'])
                 .option("kafka.sasl.jaas.config",
                        f'org.apache.kafka.common.security.plain.PlainLoginModule required username="{config["kafka"]["sasl.username"]}" '
                        f'password="{config["kafka"]["sasl.password"]}";')
                 .option("failOnDataLoss","false")
                 .load()
                )
    parsed_df = stream_df.select(from_json(col('value').cast("string"), schema).alias("data")).select("data.*")
    
    enriched_df = parsed_df.withColumn("sentiment", sentiment_udf(col('text')))
    
    query = (enriched_df.writeStream
             .format("mongodb")
             .option("spark.mongodb.connection.uri", config['mongodb']['uri'])
             .option("spark.mongodb.database", config['mongodb']['database'])
             .option("spark.mongodb.collection", config['mongodb']['collection'])
             .option("checkpointLocation", checkpoint_dir)
             .outputMode("append")
             .start()
             .awaitTermination()
            )
    
if __name__ == "__main__":
    spark = (SparkSession.builder
          .appName("KafkaStreamToMongo")
          .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.mongodb.spark:mongo-spark-connector_2.12:10.4.0")
          .getOrCreate()
          )
    read_from_kafka_and_write_to_mongo(spark)

config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

  self.pid = _posixsubprocess.fork_exec(


:: loading settings :: url = jar:file:/opt/conda/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c1d50c6d-a348-41a5-8b46-2784e886e94c;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.2 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.5 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
	found or