In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import expr
from pyspark.sql.types import TimestampType, StructType, StructField, StringType

window_size = "10 seconds"

# Create a Spark session
spark = SparkSession.builder \
    .appName("Tweet sentiment analysis") \
    .getOrCreate()

# Read the CSV file into a Spark DataFrame
df = spark.read \
    .options(header=True, inferSchema=True) \
    .csv("gs://6893_bigdata_isaac/largescale_data_processing/6889project/tweet1.csv")

# Convert the 'time' column into a TimestampType
df = df.withColumn("time", F.col("time").cast(TimestampType()))


candidate_array = ["biden", "nikki haley", "ramaswamy", "trump", "asa hutchinson", "marianne williamson"]

data = [(candidate,) for candidate in candidate_array]

schema = StructType([
    StructField("candidate", StringType(), True)
])

# Create a DataFrame from the candidate array
distinct_candidates = spark.createDataFrame(data, schema)

distinct_windows = df.select(F.window(F.col("time"), window_size).alias("window")).distinct()

# Create a DataFrame with all possible combinations of candidates and windows
all_combinations = distinct_candidates.crossJoin(distinct_windows)

# Group by 'candidate' and tumbling window based on the 'time' column
grouped_df = df.groupBy(
    F.col("candidate"),
    F.window(F.col("time"), window_size)
)

# Calculate the mean value of the sentiment score for each group
sentiment =  grouped_df.agg(
    F.mean("tweet").alias("sentiment"),
    F.count("tweet").alias("tweet_count")
)

result = all_combinations.join(sentiment, on=["candidate", "window"], how="left_outer")

result = result.fillna({"sentiment": 0, "tweet_count": 0})

df2 = spark.read \
    .options(header=True, inferSchema=True) \
    .csv("gs://6893_bigdata_isaac/largescale_data_processing/6889project/wordcount1.csv")

df2 = df2.select(
    "time",
    "candidate",
    F.col("word"),
    F.col("count").alias("word_count")
)

# Convert the 'time' column of the new DataFrame into a TimestampType
df2 = df2.withColumn("time", F.col("time").cast(TimestampType()))

# Create a window column in the new DataFrame
df2 = df2.withColumn("window", F.window(F.col("time"), window_size))

final_result = result.join(df2, on=["candidate", "window"], how="left_outer")

result = final_result.drop("window", "time").orderBy(["window.start", "candidate"], ascending=[0, 1])

result.show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/03 00:01:56 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/05/03 00:01:56 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/05/03 00:01:56 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/05/03 00:01:56 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator
                                                                                

In [None]:
# Count the total number of rows in the result DataFrame
total_rows = result.count()

# Calculate the number of DataFrames needed
#num_dfs = (total_rows + 5) // 6
num_dfs = 5

# Split the result DataFrame into smaller DataFrames with 6 rows each
smaller_dfs = []
for i in range(num_dfs):
    start_index = i * 6
    end_index = start_index + 6
    rows = result.take(end_index)[-6:]
    # Create a smaller DataFrame with the fetched rows
    smaller_df = spark.createDataFrame(rows, schema=result.schema)
    smaller_dfs.append(smaller_df)

# Show the smaller DataFrames
for index, small_df in enumerate(smaller_dfs, start=1):
    output_file = f"gs://6893_bigdata_isaac/largescale_data_processing/6889project/result1/window_{index}.csv"
    small_df.toPandas().to_csv(output_file, index=None)
    small_df.show()


In [None]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

sentiment_df = final_result.select(
    "candidate",
    "window",
    "sentiment"
)
window_spec = Window.partitionBy("candidate").orderBy("window")

sentiment_df = sentiment_df.withColumn("index", row_number().over(window_spec))

sentiment_df = sentiment_df.drop("window")

sentiment_df.show()


In [None]:
sentiment_df.toPandas().to_csv("gs://6893_bigdata_isaac/largescale_data_processing/6889project/result1/sentiment.csv", index=None)