In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Initialize the SparkContext
sc = SparkContext("local[*]", "PySparkStreamingExample")

# Initialize the StreamingContext with a 1-second batch duration
ssc = StreamingContext(sc, 1)

# Define the hostname and port to listen to
hostname = "localhost"
port = 9999

# Create a DStream that will connect to hostname:port
lines = ssc.socketTextStream(hostname, port)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Initialize the SparkContext
sc = SparkContext("local[*]", "LogMonitoringExample")

# Initialize the StreamingContext with a 1-second batch duration
ssc = StreamingContext(sc, 1)

# Define the hostname and port to listen to
hostname = "localhost"
port = 9999

# Create a DStream that will connect to hostname:port
lines = ssc.socketTextStream(hostname, port)

# Filter the lines to only include error messages
errors = lines.filter(lambda line: "ERROR" in line)

# Count each error message in each batch
errorCounts = errors.map(lambda error: (error, 1)).reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
errorCounts.pprint()

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import json

# Function to extract text and metadata from raw tweets
def extract_text_and_metadata(raw_tweet):
    tweet = json.loads(raw_tweet)
    return tweet['text'], tweet['language']

# Function to check if the tweet is in English
def is_english(tweet):
    return tweet[1] == 'en'

# Function to analyze sentiment (dummy function for illustration)
def analyze_sentiment(tweet):
    text = tweet[0]
    if "love" in text or "great" in text:
        return 1
    elif "ERROR" in text or "wrong" in text:
        return -1
    else:
        return 0

# Initialize the SparkContext
sc = SparkContext("local[*]", "SentimentAnalysis")

# Initialize the StreamingContext with a 1-second batch duration
ssc = StreamingContext(sc, 1)

# Define the hostname and port to listen to
hostname = "localhost"
port = 9999

# Create a DStream that will connect to hostname:port
lines = ssc.socketTextStream(hostname, port)

# Extract text and metadata from raw tweets
tweets = lines.map(lambda raw_tweet: extract_text_and_metadata(raw_tweet))

# Filter out non-English tweets
english_tweets = tweets.filter(lambda tweet: is_english(tweet))

# Perform sentiment analysis and get sentiment scores
sentiment_scores = english_tweets.map(lambda tweet: analyze_sentiment(tweet))

# Aggregate sentiment scores in each batch
total_sentiment = sentiment_scores.reduce(lambda x, y: x + y)

# Print the aggregated sentiment score
total_sentiment.pprint()

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()

In [None]:
#Accumulators and Broadcast Variables in Apache Spark

In [None]:
from pyspark import SparkContext

sc = SparkContext("local", "Accumulator Example")
accum = sc.accumulator(0)

def process_line(line):
    global accum
    if "ERROR" in line:
        accum += 1
    return line

log_rdd = sc.textFile("logs.txt")
log_rdd.foreach(process_line)

print("Total Errors:", accum.value)

In [None]:
sc.stop()

In [None]:
from pyspark import SparkContext

sc = SparkContext("local", "Broadcast Example")
lookup_data = {"a": 1, "b": 2, "c": 3}
broadcast_var = sc.broadcast(lookup_data)

def transform(line):
    key = line.split(",")[0]
    return broadcast_var.value.get(key, 0)

rdd = sc.parallelize(["a,alpha", "b,beta", "c,gamma"])
result = rdd.map(transform).collect()

print(result)  # Output: [1, 2, 3]

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Function to update the state with new values
def updateFunction(new_values, running_count):
    return sum(new_values) + (running_count or 0)

# Initialize the SparkContext
sc = SparkContext("local[*]", "StatefulWordCount")

# Initialize the StreamingContext with a 1-second batch duration
ssc = StreamingContext(sc, 1)

# Set the checkpoint directory
ssc.checkpoint("checkpoint")

# Define the hostname and port to listen to
hostname = "localhost"
port = 9999

# Create a DStream that will connect to hostname:port
lines = ssc.socketTextStream(hostname, port)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Map each word to a pair (word, 1)
pairs = words.map(lambda word: (word, 1))

# Update the cumulative count using updateStateByKey
statefulWordCounts = pairs.updateStateByKey(updateFunction)

# Print the first ten elements of each RDD generated in this DStream to the console
statefulWordCounts.pprint()

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()

In [None]:
pip install tweepy

In [None]:
import tweepy

In [None]:
from tweepy import StreamingClient

In [None]:
from tweepy import OAuthHandler
from tweepy.streaming import StreamingClient

In [None]:
import requests

BEARER_TOKEN = '%2FI%3DSLqJDBSzn6umIpnCI7SWGw7NPefRlHaZSplEfCFiHyRsz6'
url = "https://api.twitter.com/2/tweets/search/stream/rules"
headers = {"Authorization": f"Bearer {BEARER_TOKEN}"}

response = requests.get(url, headers=headers)
print(response.json())