In [3]:
#"""
#    If you choose to run this notebook on your local machine 
#   set the Pyspark Set Spark environment variables as shown below
#"""

In [None]:
import os 
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/3.5.4/libexec" # Your spark hom dir
os.environ["PYSPARK_PYTHON"] = "/Users/martin/miniforge3/bin/python"# your Python dir
os.environ["PYSPARK_DRIVER_PYTHON"] = "/Users/martin/miniforge3/bin/python"# your Python dir


In [None]:
# If You choose to open on Google Colab, comment the above cell:

In [None]:
# =========================================
# MAPREDUCE & PYTHON FUNCTIONAL EXAMPLES
# =========================================

In [None]:
# Import some useful modules
from functools import reduce
from pyspark.sql import SparkSession
from collections import defaultdict

#==================================================

In [None]:
#  Simulate MapReduce Word Count in Pure Python 

lines = ["hello world", "hi Spark", "hello MapReduce"]


In [None]:
# Step 1: Map phase - emit (word, 1)
mapped = [(word, 1) for line in lines for word in line.split()]

In [None]:
# Step 2: Shuffle phase - group by word
grouped = defaultdict(list) # Initialize to take a list
for word, count in mapped:
    grouped[word].append(count)

In [None]:
# Step 3: Reduce phase - sum values for each word
reduced = {word: sum(counts) for word, counts in grouped.items()}
print("Word Count Result:", reduced)

In [None]:
# Python Higher-Order Functions Examples 

numbers = [1, 2, 3, 4, 5]
# map: square each number
squared = list(map(lambda x: x**2, numbers))

# filter: retain even numbers
evens = list(filter(lambda x: x % 2 == 0, numbers))

# reduce: sum all numbers
summed = reduce(lambda x, y: x + y, numbers)

print("Squared:", squared)
print("Evens:", evens)
print("Summed:", summed)

In [None]:
# More examples using map, filter, reduce
names = ["alice", "bob", "charlie"]
capitalized = list(map(str.capitalize, names))
print("Capitalized Names:", capitalized)

In [None]:
# Chained map + filter: square even numbers
chained = list(map(lambda x: x**2, filter(lambda x: x % 2 == 0, numbers)))
print("Chained Square of Evens:", chained)

In [None]:
# =========================================
# SPARK (PySpark) EXAMPLES [The above were manual implimentations]
# =========================================


In [None]:
# Start Spark session
spark = SparkSession.builder.appName("BigDataExamples").getOrCreate()

In [None]:
# ----- RDD Examples -----

# Create RDD
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7, 8])

# Map: square values
squared_rdd = rdd.map(lambda x: x ** 2)

# Filter: keep squares greater than 20
filtered_rdd = squared_rdd.filter(lambda x: x > 20)

# Reduce: sum remaining squares
sum_of_filtered = filtered_rdd.reduce(lambda a, b: a + b)

print("Squared RDD:", squared_rdd.collect())
print("Filtered RDD (>20):", filtered_rdd.collect())
print("Sum of Filtered:", sum_of_filtered)

In [None]:
# Parallel transformation of multiple RDDs
names_rdd = spark.sparkContext.parallelize(["alice", "bob", "carol"])
lengths_rdd = names_rdd.map(lambda name: (name, len(name)))
print("Name Lengths:", lengths_rdd.collect())

In [None]:
# MapReduce Examples in PySpark (Real-World Scenarios) 

# 1. Word count on filtered text (excluding stopwords)
text_rdd = spark.sparkContext.parallelize(["spark is awesome", "pyspark handles big data", "big data tools"])
stopwords = {"is", "big", "data"}
words_filtered = text_rdd.flatMap(lambda line: line.split()) \
                            .filter(lambda word: word.lower() not in stopwords)
pairs = words_filtered.map(lambda word: (word.lower(), 1))
word_counts = pairs.reduceByKey(lambda a, b: a + b)
print("Filtered Word Count:", word_counts.collect())

# 2. Grouping and summing based on key prefix
pairs = spark.sparkContext.parallelize([("apple", 2), ("banana", 3), ("apricot", 5), ("berry", 4)])
# Group by first letter of fruit name
grouped = pairs.map(lambda x: (x[0][0], x[1])).reduceByKey(lambda a, b: a + b)
print("Grouped by Prefix:", grouped.collect())

# 3. Calculating average length of lines
lines = spark.sparkContext.parallelize(["hello spark", "map reduce model", "python is cool"])
lengths = lines.map(lambda line: ("avg", (len(line), 1)))
avg_length = lengths.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
                        .mapValues(lambda x: x[0]/x[1])
print("Average Line Length:", avg_length.collect())

# 4. Count words only longer than 5 characters
long_words = text_rdd.flatMap(lambda line: line.split()) \
                      .filter(lambda word: len(word) > 5)
long_word_pairs = long_words.map(lambda word: (word.lower(), 1))
long_word_counts = long_word_pairs.reduceByKey(lambda a, b: a + b)
print("Long Word Counts:", long_word_counts.collect())



In [None]:
# =========================================
# LOAD AND SAVE DIFFERENT FILE TYPES IN SPARK
# =========================================

In [None]:
# --- Load CSV File ---
csv_df = spark.read.csv("sample_people.csv", header=True, inferSchema=True) # replace with your path to csv
csv_df.show()
csv_df.printSchema()
# Note. You can also use 'sc = spark.sparkContext' to load the data as we did in class
# --- Load JSON File ---
json_df = spark.read.json("sample_data.json") # Replace with your path to json file
json_df.show()
json_df.printSchema()
# Note. You can also use 'sc = spark.sparkContext' to load the data as we did in class
# --- Save to Parquet Format ---
csv_df.write.mode("overwrite").parquet("output/people_parquet")

# --- Save to JSON Format ---
csv_df.write.mode("overwrite").json("output/people_json")

# --- Save to CSV Format with header ---
csv_df.write.mode("overwrite").csv("output/people_csv", header=True)

In [None]:
# =========================================
# LOG ANALYSIS USING MAPREDUCE IN PYSPARK (Step by step example to process unstructured data
# Similar to what we did in class)
# =========================================

# Sample log data (simulate Apache log entries)[Similar example we did in class]
log_data = [
    "10.0.0.1 - - [10/Oct/2023:13:55:36 +0000] \"GET /index.html HTTP/1.1\" 200 1043",
    "10.0.0.2 - - [10/Oct/2023:13:55:40 +0000] \"GET /login HTTP/1.1\" 200 2048",
    "10.0.0.1 - - [10/Oct/2023:13:56:10 +0000] \"GET /dashboard HTTP/1.1\" 500 512",
    "10.0.0.3 - - [10/Oct/2023:13:56:42 +0000] \"GET /profile HTTP/1.1\" 200 1337",
    "10.0.0.2 - - [10/Oct/2023:13:57:01 +0000] \"GET /index.html HTTP/1.1\" 404 321"
]

logs_rdd = spark.sparkContext.parallelize(log_data)

# 1. Count requests per IP address
ip_counts = logs_rdd.map(lambda line: (line.split(" ")[0], 1))
ip_summary = ip_counts.reduceByKey(lambda a, b: a + b)
print("Request Count per IP:", ip_summary.collect())

# 2. Count status codes (200, 404, 500)
status_counts = logs_rdd.map(lambda line: (line.split(" ")[-2], 1))
status_summary = status_counts.reduceByKey(lambda a, b: a + b)
print("Status Code Summary:", status_summary.collect())

# 3. Count access frequency for each endpoint
endpoint_counts = logs_rdd.map(lambda line: line.split(" ")[6]) \
                          .map(lambda endpoint: (endpoint, 1)) \
                          .reduceByKey(lambda a, b: a + b)
print("Endpoint Access Counts:", endpoint_counts.collect())

# 4. Filter and count only failed (status != 200) requests
failed_logs = logs_rdd.filter(lambda line: line.split(" ")[-2] != "200")
failed_endpoints = failed_logs.map(lambda line: (line.split(" ")[6], 1)) \
                               .reduceByKey(lambda a, b: a + b)
print("Failed Endpoint Hits:", failed_endpoints.collect())


In [None]:
# Stop Spark session [Good practice when you are done to stop the background running of spark]
spark.stop()