In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark =SparkSession.builder.appName('appname').getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:9999
streamingDF = spark.readStream\
                .format('socket')\
                .option('host','localhost')\
                .option('port',9999)\
                .load()
# Split the lines into words
words = streamingDF.select(
        explode(
            split(streamingDF.value, ' ')
        ).alias('word')
)
# Generate running word count
wordCounts = words.groupBy('word').count()

In [None]:
df = spark.readStream.format('socket').option('host','localhost').option('port',9999).load()

In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Step 1: Initialize Spark Session
spark = SparkSession.builder \
    .appName("Streaming Transformation Example") \
    .getOrCreate()

# Step 2: Define the Streaming Source
# Assuming the stream is coming from a directory with continuous input files
input_stream = spark.readStream \
    .format("json") \
    .option("path", "path/to/input/directory") \
    .option("maxFilesPerTrigger", 1) \
    .load()

# Step 3: Define the Transformation
# Example transformation: filtering the data
transformed_stream = input_stream.filter(col("someColumn") > 10)

# Step 4: Output the Transformed Data
# Writing the results to the console
query = transformed_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Start the stream and wait for it to finish
query.awaitTermination()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session
spark = SparkSession.builder \
    .appName("Filter Streaming Data") \
    .getOrCreate()

# Read the streaming data from a source
streamingDF = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Apply a filter transformation
filteredDF = streamingDF.filter(col("value") > 100)

# Write the results to the console
query = filteredDF.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create a Spark session
spark = SparkSession.builder \
    .appName("Aggregate Streaming Data") \
    .getOrCreate()

# Read the streaming data from a source
streamingDF = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Define aggregation
aggregatedDF = streamingDF.groupBy("someGroupingColumn").count()

# Write the results to the console
query = aggregatedDF.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()


If you want to read streaming data from a website, you generally need that website to provide data in a stream-friendly format, such as through a WebSocket or continuously open HTTP connection. However, Spark itself doesn't natively support reading directly from WebSockets or similar protocols without additional implementations or tools.

A common approach within Spark’s ecosystem is to use Apache Kafka as an intermediary, where the website pushes data to a Kafka topic, and Spark reads from this Kafka topic. However, Spark can directly read data streams over TCP sockets, which is useful for simple streaming applications or for learning purposes.

Here’s an example of how you might set up a simple stream reader in Spark to read data from a TCP socket:
In this setup, you'd need a server on localhost running on port 9999 sending data. For production use, replace "localhost" and "9999" with the appropriate IP address and port of your data source.

In [None]:
from pyspark.sql import SparkSession

# Initialize Spark Session for Structured Streaming
spark = SparkSession.builder.appName("Socket Stream Example").getMaster("local").getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark.readStream.format("socket")\
    .option("host", "localhost")\
    .option("port", 9999).load()

# This DataFrame now represents an unbounded table containing the streaming text data

# Assuming 'lines' is a DataFrame created from readStream as shown above
query = lines.writeStream.outputMode("append").format("console").start()

# Start running the query that prints the output to the console
query.awaitTermination()



In [None]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("Join Stream with Static Data") \
    .getOrCreate()

# Static DataFrame
staticDF = spark.read.json("path/to/static/data.json")

# Streaming DataFrame
streamingDF = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Join operation
joinedDF = streamingDF.join(staticDF, "key")

# Write the results to the console
query = joinedDF.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("File Streaming") \
    .getOrCreate()

# Read streaming data from a directory
streamingDF = spark.readStream \
    .format("csv") \
    .option("header", "true") \
    .schema(your_schema)  # Define the schema of the CSV files
    .load("path_to_directory_containing_csv_files")

# Query to test the stream
query = streamingDF.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Kafka Streaming") \
    .getOrCreate()

# Read streaming data from Kafka
kafkaDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
    .option("subscribe", "topic_name") \
    .load()

# Query to test the stream
query = kafkaDF.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Parquet Streaming") \
    .getOrCreate()

# Define the schema of the Parquet files (optional but recommended for performance)
schema = StructType([...])  # Define your schema here based on the Parquet file structure

# Directory containing incoming Parquet files
input_directory = 'path_to_input_directory/'

# Read streaming data from Parquet files
parquetStream = spark.readStream \
    .format("parquet") \
    .schema(schema)  # Use this if you want to specify the schema, or remove it to infer schema
    .load(input_directory)

# Define a simple query to process and output the streamed data
query = parquetStream.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()  # Outputs to the console for demonstration; replace with other sinks as needed

query.awaitTermination()

# Stop the Spark session
spark.stop()
