In [3]:
import sparknlp
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StringType, StructType, StructField
from sparknlp.base import DocumentAssembler, Finisher, TokenAssembler
from sparknlp.annotator import Tokenizer, Normalizer, StopWordsCleaner, SentenceDetector, Stemmer, Lemmatizer

In [4]:
# Create Spark session
spark = SparkSession.builder.appName("KafkaRead")\
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0,com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.2") \
        .getOrCreate()

In [9]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

In [14]:
# Define Kafka parameters
kafka_topic_name = "reddit_topic_gaza"
kafka_bootstrap_servers = "localhost:9092"

# Read from Kafka
df = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)\
    .option("subscribe", kafka_topic_name)\
    .load()

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of Structured Streaming + Kafka Integration Guide.

In [None]:
# Define schema of your JSON data
json_schema = StructType([
    StructField("someField", StringType(), True),
    # Add more fields here based on your JSON structure
])

In [None]:
# Parse JSON data
parsed_df = df.select(from_json(col("value").cast("string"), json_schema).alias("data"))

In [23]:
df.printSchema()
df.show(5, truncate=30)

root
 |-- tweet_avatar: string (nullable = true)
 |-- tweet_id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- query: string (nullable = true)
 |-- text: string (nullable = true)
 |-- username: string (nullable = true)
 |-- fullname: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- replies: string (nullable = true)
 |-- retweets: string (nullable = true)
 |-- quotes: string (nullable = true)
 |-- images: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- tweet_links: string (nullable = true)
 |-- tweet_mentions: string (nullable = true)
 |-- tweet_hashtags: string (nullable = true)
 |-- in_reply_to: string (nullable = true)

+------------------------------+-------------------+------------------------------+------------------------------+------------------------------+----------------+-----------------------+-------------------------+-------+--------+------+------------------------------+-----+------------------------------+-------

In [24]:
# Document Assembler
documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")\
    .setCleanupMode("shrink")

In [17]:
  # Tokenizer
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

In [32]:
sentenceDetector= SentenceDetector()\
    .setInputCols(["document"])\
    .setOutputCol("sentence")

In [28]:
# Normalizer
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized") \
    .setLowercase(True)\
    .setCleanupPatterns(["[^\w\d\s]"]) #remove punctuations, keep alphanumeric chars.

In [29]:
#Stopwords

stopwordsCleaner =StopWordsCleaner()\
    .setInputCols(["token"])\
    .setOutputCol("cleaned_tokens")\
    .setCaseSensitive(True)

In [36]:
#TokenAssembler
tokenAssembler= TokenAssembler()\
    .setInputCols(["sentence", "cleaned_tokens"])\
    .setOutputCol("assembled")

In [38]:
#Stemming
stemmer= Stemmer()\
    .setInputCols(["token"])\
    .setOutputCol("stem")

In [40]:
#lemmatization
lemmatizer= Lemmatizer()\
    .setInputCols(["token"])\
    .setOutputCol("lemma")\
    .setDictionary("/content/AntBNC_lemmas_ver_001.txt",
    value_delimiter="\t", key_delimiter="->")

In [19]:
# Finisher: converts tokens to human-readable output
finisher = Finisher() \
    .setInputCols(["normalized"]) \
    .setOutputCols(["ntokens"]) \
    .setOutputAsArray(True) \
    .setCleanAnnotations(False)

In [41]:
# Define the pipeline
from sparknlp.base import Pipeline

nlpPipeline = Pipeline(stages=[
    documentAssembler,
    tokenizer,
    sentenceDetector,
    normalizer,
    stopwordsCleaner,
    tokenAssembler,
    stemmer,
    lemmatizer
])

In [58]:
# Apply the pipeline to the DataFrame
result = nlpPipeline.fit(parsed_df).transform(parsed_df)


In [None]:
# Define the Kafka topic to send data to
output_kafka_topic_name = "processed_topic"  # Change this to your output topic

# Write processed data back to Kafka
query = result.selectExpr("to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .outputMode("append") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("topic", output_kafka_topic_name) \
    .start()

query.awaitTermination()