In [1]:
# Install pyspark
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=873e87752912fe52e5f2127fb9d8655fe0bd6569090948920430a78597a389dc
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
print(spark)
# Import a Spark function from library
from pyspark.sql.functions import col

<pyspark.sql.session.SparkSession object at 0x7d8faed64c70>


In [3]:
# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.5.0


In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)

+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



**PySpark reduceByKey usage with example**

PySpark *reduceByKey()* transformation is used to merge the values of each key using an associative reduce function on PySpark RDD. It is a wider transformation as it shuffles data across multiple partitions and It operates on pair RDD (key/value pair).

When *reduceByKey()* performs, the output will be partitioned by either numPartitions or the default parallelism level. The Default partitioner is hash-partition.

First, let’s create an RDD from the list.







In [5]:
data = [('Project', 1),
('Gutenberg’s', 1),
('Alice’s', 1),
('Adventures', 1),
('in', 1),
('Wonderland', 1),
('Project', 1),
('Gutenberg’s', 1),
('Adventures', 1),
('in', 1),
('Wonderland', 1),
('Project', 1),
('Gutenberg’s', 1)]

rdd=spark.sparkContext.parallelize(data)

**reduceByKey() Example**

In our example, we use PySpark reduceByKey() to reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count.


In [6]:
rdd2=rdd.reduceByKey(lambda a,b: a+b)
for element in rdd2.collect():
    print(element)

('Project', 3)
('Gutenberg’s', 3)
('Alice’s', 1)
('in', 2)
('Adventures', 2)
('Wonderland', 2)


**groupByKey**

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.

If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.




In [7]:
sorted(rdd.groupByKey().mapValues(len).collect())

[('Adventures', 2),
 ('Alice’s', 1),
 ('Gutenberg’s', 3),
 ('Project', 3),
 ('Wonderland', 2),
 ('in', 2)]

In [8]:
sorted(rdd.groupByKey().mapValues(list).collect())

[('Adventures', [1, 1]),
 ('Alice’s', [1]),
 ('Gutenberg’s', [1, 1, 1]),
 ('Project', [1, 1, 1]),
 ('Wonderland', [1, 1]),
 ('in', [1, 1])]

**combineByKey**

Generic function to combine the elements for each key using a custom set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.

Users provide three functions:

createCombiner, which turns a V into a C (e.g., creates a one-element list)

mergeValue, to merge a V into a C (e.g., adds it to the end of a list)

mergeCombiners, to combine two C’s into a single one (e.g., merges the lists)

To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.

In addition, users can control the partitioning of the output RDD.

Notes

V and C can be different – for example, one might group an RDD of type
(Int, Int) into an RDD of type (Int, List[Int]).



In [14]:
x = spark.sparkContext.parallelize([("a", 1), ("b", 1), ("a", 2)])
def to_list(a):
    return [a]

def append(a, b):
    a.append(b)
    return a

def extend(a, b):
    a.extend(b)
    return a

sorted(x.combineByKey(to_list, append, extend).collect())

[('a', [1, 2]), ('b', [1])]

**sortByKey**

Sorts this RDD, which is assumed to consist of (key, value) pairs.



In [15]:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
spark.sparkContext.parallelize(tmp).sortByKey().first()

('1', 3)

In [16]:
spark.sparkContext.parallelize(tmp).sortByKey(True, 1).collect()

[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [17]:
spark.sparkContext.parallelize(tmp).sortByKey(True, 2).collect()


[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

In [18]:
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
spark.sparkContext.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()

[('a', 3),
 ('fleece', 7),
 ('had', 2),
 ('lamb', 5),
 ('little', 4),
 ('Mary', 1),
 ('was', 8),
 ('white', 9),
 ('whose', 6)]

**Intersection**

Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.

Notes

This method performs a shuffle internally.

In [19]:
rdd1 = spark.sparkContext.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = spark.sparkContext.parallelize([1, 6, 2, 3, 7, 8])
rdd1.intersection(rdd2).collect()

[1, 2, 3]

## **cogroup**

For each key k in self or other, return a resulting RDD that contains a tuple with the list of values for that key in self as well as other.



In [20]:
x = spark.sparkContext.parallelize([("a", 1), ("b", 4)])
y = spark.sparkContext.parallelize([("a", 2)])
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]

[('a', ([1], [2])), ('b', ([4], []))]

## **groupWith**

Alias for cogroup but with support for multiple RDDs.



In [21]:
w = spark.sparkContext.parallelize([("a", 5), ("b", 6)])
x = spark.sparkContext.parallelize([("a", 1), ("b", 4)])
y = spark.sparkContext.parallelize([("a", 2)])
z = spark.sparkContext.parallelize([("b", 42)])
[(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]

[('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]

## **distinct**

Return a new RDD containing the distinct elements in this RDD.



In [22]:
sorted(spark.sparkContext.parallelize([1, 1, 2, 3]).distinct().collect())

[1, 2, 3]

## **repartition**

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.


In [None]:
rdd = spark.sparkContext.parallelize([1,2,3,4,5,6,7], 4)
sorted(rdd.glom().collect())

[[1], [2, 3], [4, 5], [6, 7]]

In [None]:
len(rdd.repartition(2).glom().collect())

2

In [None]:
len(rdd.repartition(10).glom().collect())

10

## **coalesce**

Return a new RDD that is reduced into numPartitions partitions.

In [None]:
spark.sparkContext.parallelize([1, 2, 3, 4, 5], 3).glom().collect()

[[1], [2, 3], [4, 5]]

In [None]:
spark.sparkContext.parallelize([1, 2, 3, 4, 5], 3).coalesce(2).glom().collect()

[[1], [2, 3, 4, 5]]

**PySpark Join Types | Join Two DataFrames**

PySpark Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames; it supports all basic join type operations available in traditional SQL like INNER, LEFT OUTER, RIGHT OUTER, LEFT ANTI, LEFT SEMI, CROSS, SELF JOIN. PySpark Joins are wider transformations that involve data shuffling across the network.

PySpark SQL Joins comes with more optimization by default (thanks to DataFrames) however still there would be some performance issues to consider while using.


*join(self, other, on=None, how=None)*
*join() *operation takes parameters as below and returns DataFrame.

param other: Right side of the join
param on: a string for the join column name
param how: default inner. Must be one of *inner, cross, outer,full, full_outer, left, left_outer, right, right_outer,left_semi, and left_anti*.
You can also write Join expression by adding where() and filter() methods on DataFrame and can have Join on multiple columns.

Before we jump into PySpark SQL Join examples, first, let’s create an "emp" and "dept" DataFrames. here, column "emp_id" is unique on emp and "dept_id" is unique on the dept dataset’s and emp_dept_id from emp has a reference to dept_id on dept dataset.



In [None]:
emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

Inner join is the default join in PySpark and it’s mostly used. This joins two datasets on key columns, where keys don’t match the rows get dropped from both datasets (emp & dept).



In [None]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner").show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



**PySpark Full Outer Join**
Outer a.k.a full, fullouter join returns all rows from both datasets, where join expression doesn’t match it returns null on respective record columns.



In [None]:
print("Outer Join")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer").show(truncate=False)

Outer Join
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|NULL  |NULL    |NULL           |NULL       |NULL       |NULL  |NULL  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |NULL     |NULL   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

In [None]:
print("Full join")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full").show(truncate=False)

Full join
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|NULL  |NULL    |NULL           |NULL       |NULL       |NULL  |NULL  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |NULL     |NULL   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+


In [None]:
print("Full outer")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter").show(truncate=False)

Full outer
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|NULL  |NULL    |NULL           |NULL       |NULL       |NULL  |NULL  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
|6     |Brown   |2              |2010       |50         |      |-1    |NULL     |NULL   |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

**PySpark Left Outer Join**

*Left* a.k.a *Leftouter* join returns all rows from the left dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found.


In [None]:
print("Left join")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left").show(truncate=False)

Left join
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|6     |Brown   |2              |2010       |50         |      |-1    |NULL     |NULL   |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [None]:
print("Leftouter join")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter").show(truncate=False)

Leftouter join
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|6     |Brown   |2              |2010       |50         |      |-1    |NULL     |NULL   |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



**Right Outer Join**

Right a.k.a Rightouter join is opposite of left join, here it returns all rows from the right dataset regardless of math found on the left dataset, when join expression doesn’t match, it assigns null for that record and drops records from left where match not found.



In [None]:
print("Right join")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right").show(truncate=False)

Right join
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|NULL  |NULL    |NULL           |NULL       |NULL       |NULL  |NULL  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



In [None]:
print("Right outer join")
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"rightouter").show(truncate=False)

Right outer join
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|NULL  |NULL    |NULL           |NULL       |NULL       |NULL  |NULL  |Sales    |30     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+



**Left Semi Join**

*leftsemi* join is similar to inner join difference being leftsemi join returns all columns from the left dataset and ignores all columns from the right dataset. In other words, this join returns columns from the only left dataset for the records match in the right dataset on join expression, records not matched on join expression are ignored from both left and right datasets.

The same result can be achieved using select on the result of the inner join however, using this join would be efficient.

In [None]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi").show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
+------+--------+---------------+-----------+-----------+------+------+



**Left Anti Join**

*leftanti* join does the exact opposite of the leftsemi, leftanti join returns only columns from the left dataset for non-matched records.



In [None]:
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti").show(truncate=False)

+------+-----+---------------+-----------+-----------+------+------+
|emp_id|name |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+-----+---------------+-----------+-----------+------+------+
|6     |Brown|2              |2010       |50         |      |-1    |
+------+-----+---------------+-----------+-----------+------+------+



**Self Join**

Joins are not complete without a *self join*, Though there is no self-join type available, we can use any of the above-explained join types to join DataFrame to itself. below example use inner self join.



In [None]:
empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

+------+--------+---------------+-----------------+
|emp_id|name    |superior_emp_id|superior_emp_name|
+------+--------+---------------+-----------------+
|2     |Rose    |1              |Smith            |
|3     |Williams|1              |Smith            |
|4     |Jones   |2              |Rose             |
|5     |Brown   |2              |Rose             |
|6     |Brown   |2              |Rose             |
+------+--------+---------------+-----------------+



### **Complete join example**

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined", \
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)
empDF.printSchema()
empDF.show(truncate=False)


dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"outer") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"fullouter") \
    .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left") \
    .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftouter") \
   .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right") \
   .show(truncate=False)
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"rightouter") \
   .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi") \
   .show(truncate=False)

empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti") \
   .show(truncate=False)

empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

empDF.createOrReplaceTempView("EMP")
deptDF.createOrReplaceTempView("DEPT")

joinDF = spark.sql("select * from EMP e, DEPT d where e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

joinDF2 = spark.sql("select * from EMP e INNER JOIN DEPT d ON e.emp_dept_id == d.dept_id") \
  .show(truncate=False)

root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

**Assignment Questions:**

**Task** **1** : count of distinct incomes – The number of distinct incomes in the dataset

In [1]:
# import all module
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import median

# import sparksession from pyspark.sql module
from pyspark.sql import SparkSession
from pyspark import SparkContext

# initializing a spark session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# reading trial and test dataset as a text file and converting them to data frame
trial_rdd = spark.sparkContext.textFile("trial_incomes.csv",32)
trial_df = trial_rdd.map(lambda line: line.split(",")).toDF()

test_rdd = spark.sparkContext.textFile("test_incomes.csv",32)
test_df = test_rdd.map(lambda line: line.split(",")).toDF()


In [2]:
# calculating the distinct incomes for trial data
print("Distinct income in trial dataset :", trial_rdd.distinct().count())

# calculating the distinct incomes for test data
print("Distinct inccome in test dataset :", test_rdd.distinct().count())


Distinct income in trial dataset : 79
Distinct inccome in test dataset : 20872


**Task** **2** : median – The median of all incomes in the dataset: the income at which there is an equal number of values greater than the income as there are values less than the income.

In [11]:
# Function to calculate median of the dataset

def calculate_median(dataset_name, df):
  df = df.withColumnRenamed("_c0", "Income").withColumn("_1", F.col("_1").cast("double"))
  print("Median of the " + dataset_name + " dataset :")
  df.agg(median('_1')).show()

In [13]:
# Calculate median of the trial dataset
calculate_median('trial', trial_df)

# Calculate median of the test dataset
calculate_median('test', test_df)

Median of the trial dataset :
+----------+
|median(_1)|
+----------+
|       4.0|
+----------+

Median of the test dataset :
+----------+
|median(_1)|
+----------+
|       3.0|
+----------+



**Task 3:** Mode - The mode of all incomes in the dataset: the most frequently seen
income.

In [48]:
# Function to calculate the mode of the dataset
def calculate_mode(dataset_name, data_df):
  # Renaming column name to Income
  df = data_df.withColumnRenamed("_c0", "Income").withColumn("Income", F.col("Income").cast("double"))

  # Group data and calculate the mode income
  grouped_data = df.groupBy("Income").agg(F.count("Income").alias("count"))
  max_count = grouped_data.agg(F.max("count")).collect()[0]["max(count)"]
  mode_data = grouped_data.filter(F.col("count") == max_count)

  # Print the result
  print("The mode of the "+ dataset_name +" dataset is " + str(mode_data.first()['Income']))


In [49]:
# Calculate mode of the trial dataset
calculate_mode('trial', trial_df)

# Calculate mode of the test dataset
calculate_mode('test', test_df)

The mode of the trial dataset is 4.0
The mode of the test dataset is 3.0


**Task 4:** count per 10power – counting the incomes by powers of 10. That is, for each integer round it down to its nearest power of 10 (for example 3 map to 1 = 100 ; 30 would map to 10 = 101 . 87 would map to 10 = 101 ; 870 would map to 100 = 102 , 100 would map to 100 = 102 etc….). Your goal is to count the number of integers between each power of 10.

In [66]:
# Function to count per 10 power for income data

def count_per_10_power(dataset_name, data_df):
    # Renaming column name to Income and casting it to integer to perform math operations
    df = data_df.withColumnRenamed("_c0", "Income").withColumn("Income", F.col("Income").cast("integer"))

    # Calculate the nearest power of 10 using a logarithmic approach
    df = df.withColumn("PowerOf10", F.pow(10, F.floor(F.log10("Income"))))

    # Use reduceByKey to count occurrences for each PowerOf10
    result = df.rdd.map(lambda x: (x['PowerOf10'], 1)).reduceByKey(lambda a, b: a + b).toDF(['PowerOf10', 'Count']).sort("PowerOf10")

    # Display the result
    print("Results for " + dataset_name + " dataset:")
    result.show(truncate=False)

In [67]:
# Calculate count per 10 power of the trial dataset
count_per_10_power('trial', trial_df)

# Calculate count per 10 power of the test dataset
count_per_10_power('test', test_df)

Results for trial dataset:
+---------+-----+
|PowerOf10|Count|
+---------+-----+
|1.0      |853  |
|10.0     |118  |
|100.0    |19   |
|1000.0   |8    |
|10000.0  |2    |
+---------+-----+

Results for test dataset:
+---------+-------+
|PowerOf10|Count  |
+---------+-------+
|1.0      |8245447|
|10.0     |1414970|
|100.0    |271544 |
|1000.0   |54505  |
|10000.0  |10876  |
|100000.0 |2137   |
|1000000.0|415    |
|1.0E7    |89     |
|1.0E8    |13     |
|1.0E9    |4      |
+---------+-------+

