In [98]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as fun 
import numpy as np
from pyspark.sql.types import *

In [99]:
spark = SparkSession.builder.getOrCreate()

In [100]:
sc = spark.sparkContext

In [101]:
data = np.arange(1,50)
rdd = sc.parallelize(data)
print(rdd.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49]


In [102]:
print(rdd.sum())

1225


In [103]:
print(rdd.mean())

25.0


In [104]:
print(rdd.count())

49


In [105]:
print(rdd.min())

1


In [106]:
print(rdd.max())

49


In [107]:
rdd_even = rdd.filter(lambda x: x % 2 == 0)
rdd_odd = rdd.filter(lambda x: x % 2 != 0)
print(f"ODD Numbers: {rdd_odd.count()} EVEN Numbers: {rdd_even.count()}" )

ODD Numbers: 25 EVEN Numbers: 24


In [108]:
people_data = [("Nada", 25), ("Mona", 30), ("Ahmed", 35), ("Khaled", 40),("Ahmed", 35), ("Nada", 25)]
rdd_people = sc.parallelize(people_data)
rdd_people.collect()

[('Nada', 25),
 ('Mona', 30),
 ('Ahmed', 35),
 ('Khaled', 40),
 ('Ahmed', 35),
 ('Nada', 25)]

In [109]:
oldest = rdd_people.max(key=lambda x: x[1])
print(oldest)

('Khaled', 40)


In [110]:
ages = rdd_people.map(lambda x: x[1])
print(ages.mean())

31.666666666666668


In [111]:
rdd_grouped = rdd_people.map(lambda x: (x[1], f"{x[0]}({x[1]})")).groupByKey().mapValues(list)
rdd_grouped.collect()

[(25, ['Nada(25)', 'Nada(25)']),
 (40, ['Khaled(40)']),
 (30, ['Mona(30)']),
 (35, ['Ahmed(35)', 'Ahmed(35)'])]

In [112]:
rdd_russia = sc.textFile("/data/russia.txt")
rdd_russia.collect()

['Russia is the largest country in the world by land area',
 'Moscow is the capital city of Russia',
 'The Russian language is one of the most widely spoken languages in the world',
 'Russia is known for its rich history and culture',
 'The Trans-Siberian Railway is the longest railway line in the world',
 'Russia has a strong tradition in literature, music and ballet',
 'The country is famous for its cold winters and vast landscapes',
 'Russia is a major player in global energy production']

In [113]:
rdd_russia.count()

8

In [114]:
rdd_russia_count = rdd_russia.filter(lambda x: 'Russia' in x).count()
rdd_russia_count

6

In [115]:
rdd_russia_split = rdd_russia.flatMap(lambda line: line.split())
rdd_pairs = rdd_russia_split.map(lambda word: (word, 1))
word_count = rdd_pairs.reduceByKey(lambda a, b: a + b)

rdd_most_frequent = word_count.sortBy(lambda x: x[1], ascending=False)
print(rdd_most_frequent.take(5))

[('is', 7), ('the', 7), ('Russia', 5), ('in', 5), ('world', 3)]


In [116]:
print(rdd_russia_split.collect())

['Russia', 'is', 'the', 'largest', 'country', 'in', 'the', 'world', 'by', 'land', 'area', 'Moscow', 'is', 'the', 'capital', 'city', 'of', 'Russia', 'The', 'Russian', 'language', 'is', 'one', 'of', 'the', 'most', 'widely', 'spoken', 'languages', 'in', 'the', 'world', 'Russia', 'is', 'known', 'for', 'its', 'rich', 'history', 'and', 'culture', 'The', 'Trans-Siberian', 'Railway', 'is', 'the', 'longest', 'railway', 'line', 'in', 'the', 'world', 'Russia', 'has', 'a', 'strong', 'tradition', 'in', 'literature,', 'music', 'and', 'ballet', 'The', 'country', 'is', 'famous', 'for', 'its', 'cold', 'winters', 'and', 'vast', 'landscapes', 'Russia', 'is', 'a', 'major', 'player', 'in', 'global', 'energy', 'production']


In [117]:
stop_word = ['a', 'the', 'is' , 'to', 'in', 'of']
rdd_russia_stop_words = rdd_russia.flatMap(lambda line: line.split()).filter(lambda word: word not in stop_word)
print(rdd_russia_stop_words.collect())

['Russia', 'largest', 'country', 'world', 'by', 'land', 'area', 'Moscow', 'capital', 'city', 'Russia', 'The', 'Russian', 'language', 'one', 'most', 'widely', 'spoken', 'languages', 'world', 'Russia', 'known', 'for', 'its', 'rich', 'history', 'and', 'culture', 'The', 'Trans-Siberian', 'Railway', 'longest', 'railway', 'line', 'world', 'Russia', 'has', 'strong', 'tradition', 'literature,', 'music', 'and', 'ballet', 'The', 'country', 'famous', 'for', 'its', 'cold', 'winters', 'and', 'vast', 'landscapes', 'Russia', 'major', 'player', 'global', 'energy', 'production']


In [118]:
print(word_count.collect())

[('Russia', 5), ('largest', 1), ('country', 2), ('world', 3), ('by', 1), ('land', 1), ('area', 1), ('capital', 1), ('of', 2), ('language', 1), ('most', 1), ('widely', 1), ('known', 1), ('for', 2), ('history', 1), ('and', 3), ('Trans-Siberian', 1), ('Railway', 1), ('line', 1), ('literature,', 1), ('music', 1), ('famous', 1), ('cold', 1), ('winters', 1), ('landscapes', 1), ('player', 1), ('energy', 1), ('production', 1), ('is', 7), ('the', 7), ('in', 5), ('Moscow', 1), ('city', 1), ('The', 3), ('Russian', 1), ('one', 1), ('spoken', 1), ('languages', 1), ('its', 2), ('rich', 1), ('culture', 1), ('longest', 1), ('railway', 1), ('has', 1), ('a', 2), ('strong', 1), ('tradition', 1), ('ballet', 1), ('vast', 1), ('major', 1), ('global', 1)]


In [119]:
schema = 'id integer, name string, age integer, salary integer' 
data = [
    (1, "Ali", 25, 4000),
    (2, "Mariam", 30, 6000),
    (3, "Omar", 35, 7000),
    (4, "Sara", 28, 5000),
    (5, "Omar", 25, 6500),
    (6, "Mariam", 26, 7500)
]

df = spark.createDataFrame(data,schema)

In [120]:
df.printSchema()
df.show(2)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- salary: integer (nullable = true)

+---+------+---+------+
| id|  name|age|salary|
+---+------+---+------+
|  1|   Ali| 25|  4000|
|  2|Mariam| 30|  6000|
+---+------+---+------+
only showing top 2 rows



In [121]:
df_ = df.select('name','salary')
df_.show()

+------+------+
|  name|salary|
+------+------+
|   Ali|  4000|
|Mariam|  6000|
|  Omar|  7000|
|  Sara|  5000|
|  Omar|  6500|
|Mariam|  7500|
+------+------+



In [122]:
df_avg = df.select(fun.avg('salary'))
df_avg.show()

+-----------+
|avg(salary)|
+-----------+
|     6000.0|
+-----------+



In [123]:
df.filter(df.age > 28).show()

+---+------+---+------+
| id|  name|age|salary|
+---+------+---+------+
|  2|Mariam| 30|  6000|
|  3|  Omar| 35|  7000|
+---+------+---+------+



In [124]:
# df.select(fun.countDistinct("name")).show()  --> table
df_distinct = df.select('name').distinct()
print(df_distinct.count())

4


In [125]:
df_salary_avg = df.groupBy('name').avg('salary')
df_salary_avg.show()

+------+-----------+
|  name|avg(salary)|
+------+-----------+
|   Ali|     4000.0|
|Mariam|     6750.0|
|  Omar|     6750.0|
|  Sara|     5000.0|
+------+-----------+



In [126]:
df1 = spark.read.csv("/data/NullData.csv", header=True, inferSchema=True) 
df1.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| NULL|
|emp2| NULL| NULL|
|emp3| NULL|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [127]:
avg_salary = df1.select(fun.avg('sales')).collect()[0][0]
print(avg_salary)

400.5


In [128]:
df1.fillna(
{
    "Name" : 'Unknown',
    "Sales" : avg_salary
}
).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John|400.5|
|emp2|Unknown|400.5|
|emp3|Unknown|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+

