In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import  pyspark.sql.functions as fun
import numpy as np
from pyspark.sql.types import *

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
sc = spark.sparkContext

In [8]:
rdd = sc.parallelize(np.arange(1, 51))
rdd.collect()

[np.int64(1),
 np.int64(2),
 np.int64(3),
 np.int64(4),
 np.int64(5),
 np.int64(6),
 np.int64(7),
 np.int64(8),
 np.int64(9),
 np.int64(10),
 np.int64(11),
 np.int64(12),
 np.int64(13),
 np.int64(14),
 np.int64(15),
 np.int64(16),
 np.int64(17),
 np.int64(18),
 np.int64(19),
 np.int64(20),
 np.int64(21),
 np.int64(22),
 np.int64(23),
 np.int64(24),
 np.int64(25),
 np.int64(26),
 np.int64(27),
 np.int64(28),
 np.int64(29),
 np.int64(30),
 np.int64(31),
 np.int64(32),
 np.int64(33),
 np.int64(34),
 np.int64(35),
 np.int64(36),
 np.int64(37),
 np.int64(38),
 np.int64(39),
 np.int64(40),
 np.int64(41),
 np.int64(42),
 np.int64(43),
 np.int64(44),
 np.int64(45),
 np.int64(46),
 np.int64(47),
 np.int64(48),
 np.int64(49),
 np.int64(50)]

In [10]:
#sum
total_sum = rdd.sum()
print(f"Sum: {total_sum}")


Sum: 1275


In [15]:
#Count
count = rdd.count()
print(f"Count: {count}")

Count: 50


In [16]:
#avg
average = total_sum / count if count > 0 else 0
print(f"Average: {average}")

Average: 25.5


In [17]:
#Min
minimum = rdd.min()
print(f"Minimum: {minimum}")

Minimum: 1


In [18]:
#Max
maximum = rdd.max()
print(f"Maximum: {maximum}")

Maximum: 50


In [19]:
# count the number of even and odd numbers in the RDD
even_numbers = rdd.filter(lambda x: x % 2 == 0).count()
odd_numbers = rdd.filter(lambda x: x % 2 != 0).count()
print(f"ODD Numbers: {odd_numbers} EVEN Numbers: {even_numbers}")

ODD Numbers: 25 EVEN Numbers: 25


In [21]:
people_data = [("Nada ", 25), ("Mona", 30), ("Ahmed", 35), ("Khaled", 40),("Ahmed", 35), ('Nada ', 25)]
rdd_people = sc.parallelize(people_data)
rdd_people.collect()

[('Nada ', 25),
 ('Mona', 30),
 ('Ahmed', 35),
 ('Khaled', 40),
 ('Ahmed', 35),
 ('Nada ', 25)]

In [22]:
# Find the oldest person
oldest_person = rdd_people.max(key=lambda x: x[1])
print(oldest_person)

('Khaled', 40)


In [23]:
# compute average age
ages = rdd_people.map(lambda x: x[1])
total_sum = ages.sum()
count = ages.count()
average = total_sum / count
print(average)

31.666666666666668


In [24]:
# Group all the names by their age
grouped = rdd_people.keyBy(lambda x: x[1]).groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
print(grouped)

[(30, [('Mona', 30)]), (40, [('Khaled', 40)]), (25, [('Nada ', 25), ('Nada ', 25)]), (35, [('Ahmed', 35), ('Ahmed', 35)])]


In [29]:
import os

# Write the text to a file
os.makedirs("data", exist_ok=True)  # Create the data directory if it doesn't exist
with open("data/russia.txt", "w") as file:
    file.write("Russia is the largest country in the world by land area\n")
    file.write("Moscow is the capital city of Russia\n")
    file.write("The Russian language is one of the most widely spoken languages in the world\n")
    file.write("Russia is known for its rich history and culture\n")
    file.write("The Trans-Siberian Railway is the longest railway line in the world\n")
    file.write("Russia has a strong tradition in literature, music and ballet\n")
    file.write("The country is famous for its cold winters and vast landscapes\n")
    file.write("Russia is a major player in global energy production")

# Load the file into an RDD
rdd_russia = sc.textFile("data/russia.txt")

# Collect and display the RDD content
print(rdd_russia.collect())

['Russia is the largest country in the world by land area', 'Moscow is the capital city of Russia', 'The Russian language is one of the most widely spoken languages in the world', 'Russia is known for its rich history and culture', 'The Trans-Siberian Railway is the longest railway line in the world', 'Russia has a strong tradition in literature, music and ballet', 'The country is famous for its cold winters and vast landscapes', 'Russia is a major player in global energy production']


In [30]:
# Count the total number of lines
total_lines = rdd_russia.count()
print("Total number of lines:", total_lines)

Total number of lines: 8


In [31]:
# Count how many lines contain the word "Russia"
lines_with_russia = rdd_russia.filter(lambda line: "Russia" in line).count()
print("Lines containing the word 'Russia':", lines_with_russia)

Lines containing the word 'Russia': 6


In [32]:
# Find the 5 most frequent words
words = rdd_russia.flatMap(lambda line: line.lower().split()) \
                 .map(lambda word: (word.strip(".,!?:;\"'"), 1)) \
                 .reduceByKey(lambda a, b: a + b) \
                 .sortBy(lambda x: x[1], ascending=False) \
                 .take(5)
print("Top 5 frequent words:", words)

Top 5 frequent words: [('the', 10), ('is', 7), ('russia', 5), ('in', 5), ('world', 3)]


In [33]:
# Tokenize words
tokenized_words = rdd_russia.flatMap(lambda line: line.lower().split())
print("Tokenized words:")
print(tokenized_words.collect())

Tokenized words:
['russia', 'is', 'the', 'largest', 'country', 'in', 'the', 'world', 'by', 'land', 'area', 'moscow', 'is', 'the', 'capital', 'city', 'of', 'russia', 'the', 'russian', 'language', 'is', 'one', 'of', 'the', 'most', 'widely', 'spoken', 'languages', 'in', 'the', 'world', 'russia', 'is', 'known', 'for', 'its', 'rich', 'history', 'and', 'culture', 'the', 'trans-siberian', 'railway', 'is', 'the', 'longest', 'railway', 'line', 'in', 'the', 'world', 'russia', 'has', 'a', 'strong', 'tradition', 'in', 'literature,', 'music', 'and', 'ballet', 'the', 'country', 'is', 'famous', 'for', 'its', 'cold', 'winters', 'and', 'vast', 'landscapes', 'russia', 'is', 'a', 'major', 'player', 'in', 'global', 'energy', 'production']


In [34]:
# Remove stopwords (a, the, is, to, in, of)
stopwords = {"a", "the", "is", "to", "in", "of"}
filtered_words = tokenized_words.filter(lambda word: word not in stopwords)
print("Remove stopwords (a, the, is, to, in, of):")
print(filtered_words.collect())

Remove stopwords (a, the, is, to, in, of):
['russia', 'largest', 'country', 'world', 'by', 'land', 'area', 'moscow', 'capital', 'city', 'russia', 'russian', 'language', 'one', 'most', 'widely', 'spoken', 'languages', 'world', 'russia', 'known', 'for', 'its', 'rich', 'history', 'and', 'culture', 'trans-siberian', 'railway', 'longest', 'railway', 'line', 'world', 'russia', 'has', 'strong', 'tradition', 'literature,', 'music', 'and', 'ballet', 'country', 'famous', 'for', 'its', 'cold', 'winters', 'and', 'vast', 'landscapes', 'russia', 'major', 'player', 'global', 'energy', 'production']


In [35]:
# Count the frequency of each word
word_counts = filtered_words.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect()
print("Count the frequency of each word:")
print(word_counts)

Count the frequency of each word:
[('largest', 1), ('country', 2), ('world', 3), ('by', 1), ('land', 1), ('area', 1), ('capital', 1), ('russian', 1), ('language', 1), ('most', 1), ('widely', 1), ('known', 1), ('for', 2), ('history', 1), ('and', 3), ('line', 1), ('literature,', 1), ('music', 1), ('famous', 1), ('cold', 1), ('winters', 1), ('landscapes', 1), ('player', 1), ('energy', 1), ('production', 1), ('russia', 5), ('moscow', 1), ('city', 1), ('one', 1), ('spoken', 1), ('languages', 1), ('its', 2), ('rich', 1), ('culture', 1), ('trans-siberian', 1), ('railway', 2), ('longest', 1), ('has', 1), ('strong', 1), ('tradition', 1), ('ballet', 1), ('vast', 1), ('major', 1), ('global', 1)]


In [36]:
schema = 'id integer, name string, age integer, salary integer'
data = [
    (1, "Ali", 25, 4000),
    (2, "Mariam", 30, 6000),
    (3, "Omar", 35, 7000),
    (4, "Sara", 28, 5000),
    (5, "Omar", 25, 6500),
    (6, "Mariam", 26, 7500)
]

df = spark.createDataFrame(data,schema)

In [37]:
# Show schema and first 2 rows
df.printSchema()
df.show(2)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)

+---+------+---+------+
| id|  name|age|salary|
+---+------+---+------+
|  1|   Ali| 25|  4000|
|  2|Mariam| 30|  6000|
+---+------+---+------+
only showing top 2 rows



In [38]:
# Select only name and salary columns
name_salary_df = df.select("name", "salary")
name_salary_df.show()

+------+------+
|  name|salary|
+------+------+
|   Ali|  4000|
|Mariam|  6000|
|  Omar|  7000|
|  Sara|  5000|
|  Omar|  6500|
|Mariam|  7500|
+------+------+



In [44]:
# Find the average salary
average_salary = df.groupBy().avg("salary").collect()[0][0]
print("Average salary:", average_salary)

Average salary: 6000.0


In [40]:
# Filter employees older than 28
older_than_28 = df.filter(df.age > 28)
older_than_28.show()

+---+------+---+------+
| id|  name|age|salary|
+---+------+---+------+
|  2|Mariam| 30|  6000|
|  3|  Omar| 35|  7000|
+---+------+---+------+



In [41]:
# Count distinct values in the name column
distinct_names_count = df.select("name").distinct().count()
print("Distinct values in the name column:", distinct_names_count)

Distinct values in the name column: 4


In [42]:
# Group by name column and find average salary
avg_salary_by_name = df.groupBy("name").avg("salary")
avg_salary_by_name.show()

+------+-----------+
|  name|avg(salary)|
+------+-----------+
|  Omar|     6750.0|
|Mariam|     6750.0|
|   Ali|     4000.0|
|  Sara|     5000.0|
+------+-----------+



In [54]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ColabSpark").getOrCreate()
data = [
    ("emp1", "John", None),
    ("emp2", None, None),
    ("emp3", None, 345.0),
    ("emp4", "Cindy", 456.0)
]
columns = ["Id", "Name", "Sales"]
df1 = spark.createDataFrame(data, columns)

# Show the DataFrame
df1.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [55]:
# Find the average sales
avg_sales = df1.agg({"Sales": "avg"}).collect()[0][0]
print("Average sales:", avg_sales)

Average sales: 400.5


In [57]:
# Replace null name with 'Unknown' and null sales with the average sales
df_replaced = df1.na.fill({"Name": "Unknown", "Sales": avg_sales})
df_replaced.show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John|400.5|
|emp2|Unknown|400.5|
|emp3|Unknown|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+

