# BD Assignment - Pyspark
## Prachi Mehta (202318008)

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=83a6bd599f173b9b72d4348876bcd6409faf8e7a5b4a578b977624bb697aa56a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [None]:
import numpy as np
from pyspark.sql import SparkSession

In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("RDD and dataframe manipulation") \
    .getOrCreate()

# Set seed for reproducibility
np.random.seed(10)

# Generate 100 random numbers in range 0 to 10 using numpy
random_numbers = np.random.randint(0, 11, 100)
print(random_numbers)

# Create RDD using parallelize function
rdd = spark.sparkContext.parallelize(random_numbers)

# Map each number to a tuple of (number, 1)
mapped_rdd = rdd.map(lambda x: (x, 1))

# Reduce by key to sum up the occurrences of each number
reduced_rdd = mapped_rdd.reduceByKey(int.__add__)

# Sort by key
sorted_rdd = reduced_rdd.sortByKey()

# Print frequency of each number
for num, freq in sorted_rdd.collect():
    print(f"Number {num}: {freq} occurrences")

# Stop SparkSession
spark.stop()


[ 9  4  0  1  9  0  1 10  8  9  0 10  8  6  4  3  0  4  6  8 10  1  8  4
  1  3  6  5  3  9  6  9  1  9  4  2  6  7  8 10  8  9  2  0  6  7  8  1
  7  1  4 10  0  8  5  4  7  8  8  2  6  2  8  8  6  6  5 10  6  0  0  6
  9  1  8 10  9  1  2  8  9  9  5  0  2  7  3  0  4  2  0  3  3  1  2  5
  9  0 10  1]
Number 0: 12 occurrences
Number 1: 11 occurrences
Number 2: 8 occurrences
Number 3: 6 occurrences
Number 4: 8 occurrences
Number 5: 5 occurrences
Number 6: 11 occurrences
Number 7: 5 occurrences
Number 8: 14 occurrences
Number 9: 12 occurrences
Number 10: 8 occurrences


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, lower, split

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Word Frequency Calculation") \
    .getOrCreate()

# Load text8 dataset into a DataFrame
text8_df = spark.read.text("text8.unknown")

# Extract words, convert them to lowercase, split them, and explode them into separate rows
words_df = text8_df.select(explode(split(lower(text8_df.value), " ")).alias("word"))

# Filter words containing the letter 'a'
words_with_a_df = words_df.filter(words_df.word.like("%a%"))

# Calculate word frequencies
word_frequencies_df = words_with_a_df.groupBy("word").count()

# Show the frequencies of words containing the letter 'a'
word_frequencies_df.show()

# Stop SparkSession
spark.stop()


+-------------+-----+
|         word|count|
+-------------+-----+
|  interaction|   43|
|      marxism|    8|
|     everyday|   21|
|    indicator|   20|
|socialization|    4|
|  handicapped|    3|
|     cautious|    7|
|       ransom|   24|
|      barrier|   26|
|  unequivocal|    1|
|       travel|  143|
|          art|  631|
|     didactic|    1|
|       lamian|    1|
|        trail|   25|
|    arguments|  105|
|        oscar|   65|
|    librarian|    1|
|  mccarthyism|    6|
|    standards|   92|
+-------------+-----+
only showing top 20 rows



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, corr

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Iris Data Analysis") \
    .getOrCreate()

# Load iris JSON data into a DataFrame
iris_df = spark.read.json("iris.json")

# Calculate Pearson Correlation between petalLength and petalWidth
pearson_correlation = iris_df.select(corr("petalLength", "petalWidth")).collect()[0][0]
print(f"Pearson Correlation between petalLength and petalWidth: {pearson_correlation}")

# Show sepalLength, sepalWidth, and species for rows with petalLength >= 1.4
filtered_df = iris_df.filter(col("petalLength") >= 1.4).select("sepalLength", "sepalWidth", "species")
filtered_df.show()

# Stop SparkSession
spark.stop()


Pearson Correlation between petalLength and petalWidth: 0.962865431402796
+-----------+----------+-------+
|sepalLength|sepalWidth|species|
+-----------+----------+-------+
|        5.1|       3.5| setosa|
|        4.9|       3.0| setosa|
|        4.6|       3.1| setosa|
|        5.0|       3.6| setosa|
|        5.4|       3.9| setosa|
|        4.6|       3.4| setosa|
|        5.0|       3.4| setosa|
|        4.4|       2.9| setosa|
|        4.9|       3.1| setosa|
|        5.4|       3.7| setosa|
|        4.8|       3.4| setosa|
|        4.8|       3.0| setosa|
|        5.7|       4.4| setosa|
|        5.1|       3.5| setosa|
|        5.7|       3.8| setosa|
|        5.1|       3.8| setosa|
|        5.4|       3.4| setosa|
|        5.1|       3.7| setosa|
|        5.1|       3.3| setosa|
|        4.8|       3.4| setosa|
+-----------+----------+-------+
only showing top 20 rows

