In [70]:
from pyspark.sql.types import StructType, StructField, FloatType, BooleanType
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark import SQLContext

import pandas as pd
import pyspark
from pathlib import Path

import pickle
import sklearn


In [11]:
conf = pyspark.SparkConf()
spark_session = SparkSession.builder.config(conf=conf).getOrCreate()
sqlcontext = SQLContext(sc)

schema = StructType([
    StructField("tweet",StringType(),True),
    StructField("timestamp",DateType(),True),
])

Spark NLP version
4.2.8


# Build NLP Pipeline

In [76]:
CLEAN_TEXT_REGEX = r'[^\w\s]|https\S+|www\S+https\S+|\@w+|\#'

MODELS_PATH = Path('models')


with open(SENTIMENT_MODEL, 'rb') as f, open(VECTORS_MODEL, 'rb') as ff:
    model = pickle.load(f)
    vectors = pickle.load(ff)

model_bc = sc.broadcast(model)
vectors_bc = sc.broadcast(vectors)


@F.udf(StringType())
def predict(text):
    vector = vectors_bc.value.transform(pd.Series(text))
    return 0 if str(model_bc.value.predict(vector)[0]) == 'Positive' else 1

In [71]:
df = spark.read.format('org.apache.spark.sql.json') \
                .schema(schema) \
                .option("multiline","true") \
                .load("tweets.json")


In [80]:
df_clean = df.select('tweet', (lower(regexp_replace('tweet', CLEAN_TEXT_REGEX, "")).alias('tweet_clean')))


df_pred = df_clean.withColumn("score", predict(df_clean['tweet_clean']))
# df_pred.show()

df_pred.select(avg("score")).show()

[Stage 55:>                                                         (0 + 1) / 1]

+----------+
|avg(score)|
+----------+
|       0.5|
+----------+



                                                                                

In [9]:
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "finance") \
    .option("startingOffsets", "earliest") \
    .load()

In [10]:
kafka_df.show()

AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka