In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml import PipelineModel
from pyspark.ml.tuning import CrossValidatorModel
from cassandra.cluster import Cluster
from pyspark.sql.functions import col, when
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

In [2]:
spark = SparkSession.builder.getOrCreate()

# Load the saved pipelineFit model
pipelineFit = PipelineModel.load("pipelineFit")

# Load the saved cvModel model
cvModel = CrossValidatorModel.load("cvModel")

In [3]:
# Cassandra configuration
cassandra_contact_points = ['192.168.1.22']
cassandra_keyspace = 'stock_market' 
cassandra_table = 'coins_news2'

# Connect to the Cassandra cluster
cluster = Cluster(cassandra_contact_points)
session = cluster.connect()

# Switch to the keyspace
session.set_keyspace(cassandra_keyspace)

In [4]:
# Query to retrieve data from the table
query = f"SELECT id, timestamp, symbol, headline FROM {cassandra_table} LIMIT 250;"

# Execute the query
result_set = session.execute(query)

In [5]:

# Create an empty list to store the rows
result_rows = []

# Define the schema for the result DataFrame
schema = StructType([
    StructField("id", StringType(), nullable=True),
    StructField("timestamp", TimestampType(), nullable=True),
    StructField("symbol", StringType(), nullable=True),
    StructField("prediction", StringType(), nullable=True),
])

# Define a mapping for the prediction values
prediction_mapping = {0.0: "Positive", 1.0: "Neutral", 2.0: "Negative"}

# Iterate over the result_set
for row in result_set:
    # Access the values of 'id', 'timestamp', and 'symbol' columns from the row
    id_value = row.id
    timestamp_value = row.timestamp
    symbol_value = row.symbol
    
    # Create a DataFrame with the new text
    new_text_df = spark.createDataFrame([Row(clean_text=row.headline)])
    
    # Use the loaded pipelineFit to transform the new text data
    new_dataset = pipelineFit.transform(new_text_df)
    
    # Use the loaded cvModel to make predictions on the new text data
    new_predictions = cvModel.transform(new_dataset)
    
    # Access the 'prediction' column value from the new_predictions DataFrame
    prediction_value = new_predictions.select(col("prediction")).first()[0]
    
    # Map the prediction value to the corresponding label
    prediction_label = prediction_mapping.get(prediction_value, "Unknown")
    
    # Create a new row with 'id', 'timestamp', 'symbol', and 'prediction' values
    new_row = Row(id=id_value, timestamp=timestamp_value, symbol=symbol_value, prediction=prediction_label)
    print(new_row)
    # Add the new row to the result_rows list
    result_rows.append(new_row)

# Create a DataFrame from the result_rows list
new_dataframe = spark.createDataFrame(result_rows, schema)

# Display the new DataFrame
new_dataframe.show()

Row(id=UUID('e0cfff36-a30e-4f0c-b38b-ff9e8f72a888'), timestamp=datetime.datetime(2023, 12, 29, 0, 33), symbol='CAT', prediction='Positive')
Row(id=UUID('c8d69e98-0a52-4a25-aa34-8dab4c16bafb'), timestamp=datetime.datetime(2023, 12, 28, 17, 0, 10), symbol='CVX', prediction='Neutral')
Row(id=UUID('c5ca8f08-5b9f-4c9d-b214-c0abc9f84986'), timestamp=datetime.datetime(2023, 12, 30, 16, 46), symbol='CVX', prediction='Positive')
Row(id=UUID('9d7e1d26-0b08-431a-b94e-fa96fac773ca'), timestamp=datetime.datetime(2023, 12, 29, 16, 55), symbol='GOOGL', prediction='Positive')
Row(id=UUID('0cdc32d0-c42e-4a1f-9561-7949bf33fd69'), timestamp=datetime.datetime(2023, 12, 28, 16, 26, 11), symbol='GOOGL', prediction='Positive')
Row(id=UUID('12f20ee2-6262-4486-ba61-e1fd831f2cff'), timestamp=datetime.datetime(2023, 12, 29, 18, 56), symbol='AMD', prediction='Negative')
Row(id=UUID('e2a95b5d-1e68-4a4b-8e90-f7234a3aad13'), timestamp=datetime.datetime(2023, 12, 28, 19, 18, 28), symbol='GOOGL', prediction='Positive'

In [6]:
table_name_pred = "prednews"

create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {table_name_pred} (
        timestamp TIMESTAMP PRIMARY KEY,
        symbol TEXT,
        prediction TEXT
    )
"""

session.execute(create_table_query)

for row in new_dataframe.rdd.collect():
    
    insert_query = f"""
        INSERT INTO {table_name_pred} (timestamp, symbol, prediction)
        VALUES ('{row.timestamp}', '{row.symbol}', '{row.prediction}')
    """
    session.execute(insert_query)

session.shutdown()
cluster.shutdown()

In [57]:
new_dataframe.select("id").first()

Row(id='{__class__=uuid.UUID, int=302316883395429105585825085843689189883}')

In [None]:
session.shutdown()
cluster.shutdown()