<a href="https://colab.research.google.com/github/Niv0902/Shablool/blob/main/Tirgul10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Step 1 - Installation**

In [None]:
# Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download the latest Apache Spark version
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz

# Install findspark to connect Python with Spark
!pip install -q findspark



gzip: stdin: unexpected end of file
tar: Unexpected EOF in archive
tar: Unexpected EOF in archive
tar: Error is not recoverable: exiting now


# **Step 2 - Environment Setup**

In [None]:
# Import the os module to interact with the operating system
import os
# Import findspark to locate the Spark installation
import findspark

# Set the environment variable for Java home directory (required for Spark to run)
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# Set the environment variable for Spark home directory to the downloaded Spark path
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"

# Initialize findspark to make pyspark importable within Python
findspark.init()


# **Step 3 - Create SparkSession**

In [None]:
# Import SparkSession class from PySpark SQL module
from pyspark.sql import SparkSession

# Create a SparkSession object, which is the entry point to use Spark functionality
  # Set the name of the Spark application to be "Big Data Example"
  # Create a new SparkSession or return an existing one
spark = SparkSession.builder.appName("Big Data Example").getOrCreate()

# **Step 4 – Continue with DataFrame operations**

In [None]:
# Define a list of tuples, each containing a name and a price
data = [("Tal", 120), ("Uri", 90), ("Dina", 150)]
# Define the column names for the DataFrame
columns = ["name", "price"]
# Create a DataFrame from the data and column names using the SparkSession
df = spark.createDataFrame(data, columns)
# Filter the DataFrame to include only rows where the price is greater than 100
df.filter(df["price"] > 100).show()


+----+-----+
|name|price|
+----+-----+
| Tal|  120|
|Dina|  150|
+----+-----+



# **Real Dataset Example of Amazon Reviews**

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("datafiniti/consumer-reviews-of-amazon-products")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/consumer-reviews-of-amazon-products


In [None]:
df = spark.read.csv("/kaggle/input/consumer-reviews-of-amazon-products", header=True, inferSchema=True)
df.printSchema()
df.show(50)

root
 |-- id: string (nullable = true)
 |-- dateAdded: string (nullable = true)
 |-- dateUpdated: string (nullable = true)
 |-- name: string (nullable = true)
 |-- asins: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- primaryCategories: string (nullable = true)
 |-- imageURLs: string (nullable = true)
 |-- keys: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- manufacturerNumber: string (nullable = true)
 |-- reviews.date: string (nullable = true)
 |-- reviews.dateSeen: string (nullable = true)
 |-- reviews.didPurchase: string (nullable = true)
 |-- reviews.doRecommend: string (nullable = true)
 |-- reviews.id: string (nullable = true)
 |-- reviews.numHelpful: string (nullable = true)
 |-- reviews.rating: string (nullable = true)
 |-- reviews.sourceURLs: string (nullable = true)
 |-- reviews.text: string (nullable = true)
 |-- reviews.title: string (nullable = true)
 |-- reviews.username: string (nul

In [None]:
# Filter positive reviews
positive_reviews = df.filter(df["`reviews.rating`"] >= 4)

# Show examples of positive reviews
positive_reviews.select("`reviews.text`", "`reviews.rating`").show(50, truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Count how many reviews there are per numerical rating
df.filter(df["`reviews.rating`"] >= 0).groupBy("`reviews.rating`").count().orderBy("count", ascending=False).show()

+--------------+-----+
|reviews.rating|count|
+--------------+-----+
|             5|22687|
|             4| 6520|
|             3| 1372|
|             1| 1045|
|             0|  704|
|             2|  670|
|            16|    1|
|            44|    1|
|             8|    1|
|             6|    1|
|            97|    1|
|             9|    1|
|            49|    1|
|            12|    1|
|            13|    1|
+--------------+-----+



# **Map Reduce Example**

In [None]:
logs = [
    "192.168.1.10 - - [10/May/2025:13:00] GET /index.html",
    "172.16.0.5 - - [10/May/2025:13:01] GET /contact.html",
    "192.168.1.10 - - [10/May/2025:13:02] GET /products.html",
    "10.0.0.1 - - [10/May/2025:13:02] GET /index.html",
    "192.168.1.10 - - [10/May/2025:13:03] GET /about.html"
]

In [None]:
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("IP Visit Count") \
    .getOrCreate()

# Create RDD (Resilient Distributed Dataset) from the list of logs
rdd = spark.sparkContext.parallelize(logs)

# Map step: extract the IP and map each one to (IP, 1)
ip_counts = rdd.map(lambda line: (line.split()[0], 1))

# Reduce step: sum all counts per IP
result = ip_counts.reduceByKey(lambda a, b: a + b)

# Collect the results to the driver and print
for ip, count in result.collect():
    print(f"{ip} visited {count} times")

# Stop the Spark session
spark.stop()


172.16.0.5 visited 1 times
192.168.1.10 visited 3 times
10.0.0.1 visited 1 times


In [None]:
add = lambda x, y: x + y
print(add(2, 3))

In [None]:
from pyspark.sql import SparkSession

# Start SparkSession
spark = SparkSession.builder.appName("RDD Example").getOrCreate()

rdd = spark.sparkContext.parallelize([("apple", 1), ("banana", 1), ("apple", 1)])
result = rdd.reduceByKey(lambda a, b: a + b)
print(result.collect())


[('apple', 2), ('banana', 1)]


# **Word Count with MapReduce in PySpark:**

In [None]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder \
    .appName("Big Data Word Count with MapReduce") \
    .getOrCreate()

# Load text data (simulating a large dataset with parallelize)
text = [
    "Big data is transforming the world",
    "Apache Spark is fast and powerful",
    "Big data requires scalable tools",
    "Spark is designed for big data processing"
]

# Create RDD from text lines
rdd = spark.sparkContext.parallelize(text)

# MapReduce steps:
# Step 1: Split each line into words
words = rdd.flatMap(lambda line: line.lower().split())

# Step 2: Map each word to (word, 1)
word_pairs = words.map(lambda word: (word, 1))

# Step 3: Reduce by key (sum counts for each word)
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

# Collect and display the results
for word, count in word_counts.collect():
    print(f"{word}: {count}")


big: 3
world: 1
apache: 1
fast: 1
and: 1
powerful: 1
requires: 1
for: 1
data: 3
is: 3
transforming: 1
the: 1
spark: 2
scalable: 1
tools: 1
designed: 1
processing: 1


# **Map Reduce analysis Big Data**

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("datafiniti/consumer-reviews-of-amazon-products")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/consumer-reviews-of-amazon-products


In [None]:
from pyspark.sql.functions import col
from pyspark.sql.functions import isnull # Import the isnull function

df = spark.read.csv("/kaggle/input/consumer-reviews-of-amazon-products", header=True, inferSchema=True)
# Select the review column and filter out rows where 'reviews.text' is null
df_clean = df.select(col("`reviews.text`").alias("review")).filter(col("review").isNotNull())
# Create RDD from text lines
rdd = df_clean.select("review").rdd.map(lambda row: row["review"])
# MapReduce steps:
# Step 1: Split each line into words
words = rdd.flatMap(lambda line: line.lower().split())

# Step 2: Map each word to (word, 1)
word_pairs = words.map(lambda word: (word, 1))

# Step 3: Reduce by key (sum counts for each word)
word_counts = word_pairs.reduceByKey(lambda a, b: a + b)

# Collect and display the results
for word, count in word_counts.collect():
    print(f"{word}: {count}")

of: 9337
and: 25632
have: 6302
to: 22710
pcs: 1
battery: 1852
like: 2806
these: 4395
well: 1744
they: 5750
work: 2098
batteries: 5671
at: 2784
long: 2242
great.: 765
noticed: 124
just: 3108
easier: 312
hand.: 70
buy: 2112
again.: 417
past: 86
looking: 491
last: 2223
while: 596
more: 2161
high: 250
power: 287
price.: 1531
done: 159
by: 665
good: 5533
i'll: 222
find: 475
believe: 137
buying: 560
really: 1644
recent: 26
purchases,: 2
mixed-bag: 1
than: 2373
use: 3722
it: 15678
love: 4319
easily: 248
there.: 89
comment: 22
quick: 157
keyboards: 12
three: 198
even: 852
condition: 7
did: 530
what: 1674
would: 2333
seems: 508
(which: 58
trail: 28
cameras.: 25
were: 885
beeding: 1
size: 827
enough: 508
lasted: 305
.: 414
used: 1165
doorbell: 15
remote: 255
control: 319
working: 332
this: 11702
ship: 27
door.: 17
none: 77
exploded: 7
disappointed!: 10
4star: 2
because: 1083
batteries,: 509
times: 330
cause: 32
recommend: 1102
hopefully: 36
remain: 3
under: 140
fit: 139
ok,it: 1
our: 1139
this.: