In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from statsmodels.tsa.statespace.sarimax import SARIMAX
import pandas as pd

### Initializing Spark session to connect to MongoDB

In [None]:
spark = SparkSession \
    .builder \
    .appName("SentimentTimeSeries") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/TwitterAnalysisDB.RawData") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/TwitterAnalysisDB.RawData") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

### Reading data from MongoDB

In [None]:
df = spark.read.format("mongo").load()

### Sentiment analysis preprocessing pipeline

In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

### Assemble pipeline

In [None]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, lr])

### Train the sentiment analysis model

In [None]:
model = pipeline.fit(df)

### Assume the sentiment data is already in 'df' with 'timestamp' and 'sentiment' columns
#### Select relevant columns and convert to pandas for Prophet

In [None]:
pandas_df = df.select("timestamp", "sentiment").withColumnRenamed("timestamp", "ds").withColumnRenamed("sentiment", "y").toPandas()

### # Make sure the pandas dataframe is sorted and has the correct datetime type for the index

In [None]:
pandas_df['ds'] = pd.to_datetime(pandas_df['ds'])
pandas_df.set_index('ds', inplace=True)
pandas_df.sort_index(inplace=True)

### SARIMAX Model

In [None]:
sarimax_model = SARIMAX(pandas_df['y'], order=(1, 0, 0), seasonal_order=(1, 1, 0, 7))
sarimax_result = sarimax_model.fit(disp=False)

#### Perform forecasts

In [None]:
forecast_week = sarimax_result.get_forecast(steps=7)
forecast_month = sarimax_result.get_forecast(steps=30)
forecast_3month = sarimax_result.get_forecast(steps=90)

### Convert forecasts to Spark DataFrames and save to MongoDB

In [None]:
forecast_week_df = spark.createDataFrame(forecast_week.summary_frame())
forecast_week_df.write.format("mongo").mode("overwrite").option("collection", "forecast_week_collection").save()
forecast_month_df = spark.createDataFrame(forecast_month.summary_frame())
forecast_month_df.write.format("mongo").mode("overwrite").option("collection", "forecast_month_collection").save()
forecast_3month_df = spark.createDataFrame(forecast_3month.summary_frame())
forecast_3month_df.write.format("mongo").mode("overwrite").option("collection", "forecast_3month_collection").save()

### Close the Spark session

In [None]:
spark.stop()