In [2]:
# !pip install pyspark

In [1]:
import os, sys

# SparkContext - Entry point to PySpark Functionality
# SparkSession - Entry point to PySpark to work RDD
# - introduced in version 2.0
# - replaced SQLContext, HiveContext

from pyspark.sql import SparkSession

In [2]:
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

In [3]:
# master() - if you are running it on cluster you need to use master name
# ideally it would be either yarn or mesos

# local[x] - when running in standalone mode
# x - how many partitions it should create with RDD
spark = SparkSession.builder.master("local[1]").appName("MyFirstApp").getOrCreate()

In [4]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x0000024B581FF2D0>


In [5]:
spark

In [6]:
# SparkSession.newSession
# SparkSession.builder.getOrCreate

# enable hive support with SparkSession
# SparkSession.builder.master().appName().enableHiveSupport().getOrCreate()

In [7]:
dataset = [
    (101, "John", 56000),
    (102, "Max", 50000),
    (103, "Shawn", 656000),
    (104, "Jack", 45000),
    (105, "Nick", 89000),
    (106, "Smith", 25000),
    (107, "Suma", 75000),
    (108, "Alex", 85000),
    (109, "Raman", 100000)
]

In [8]:
rdd = spark.sparkContext.parallelize(dataset)
print(rdd)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:289


In [9]:
rdd.getNumPartitions()

1

In [10]:
rdd.collect()

[(101, 'John', 56000),
 (102, 'Max', 50000),
 (103, 'Shawn', 656000),
 (104, 'Jack', 45000),
 (105, 'Nick', 89000),
 (106, 'Smith', 25000),
 (107, 'Suma', 75000),
 (108, 'Alex', 85000),
 (109, 'Raman', 100000)]

In [11]:
df = rdd.toDF()

In [12]:
df.show()

+---+-----+------+
| _1|   _2|    _3|
+---+-----+------+
|101| John| 56000|
|102|  Max| 50000|
|103|Shawn|656000|
|104| Jack| 45000|
|105| Nick| 89000|
|106|Smith| 25000|
|107| Suma| 75000|
|108| Alex| 85000|
|109|Raman|100000|
+---+-----+------+



In [13]:
# Add column names
columns = ["ID", "Name", "Salary"]
df_2 = rdd.toDF(columns)

In [14]:
df_2.show()

+---+-----+------+
| ID| Name|Salary|
+---+-----+------+
|101| John| 56000|
|102|  Max| 50000|
|103|Shawn|656000|
|104| Jack| 45000|
|105| Nick| 89000|
|106|Smith| 25000|
|107| Suma| 75000|
|108| Alex| 85000|
|109|Raman|100000|
+---+-----+------+



In [15]:
df_2.printSchema()

root
 |-- ID: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- Salary: long (nullable = true)



In [16]:
# Create Dataframe with Schema

In [17]:
from pyspark.sql.types import StringType, StructType, IntegerType, StructField

In [18]:
schema = StructType([
    StructField("EmpID", IntegerType()),
    StructField("EmpName", StringType()),
    StructField("EmpSalary", IntegerType()),
    StructField("EmpLeaves", IntegerType())
])

In [19]:
schema

StructType([StructField('EmpID', IntegerType(), True), StructField('EmpName', StringType(), True), StructField('EmpSalary', IntegerType(), True), StructField('EmpLeaves', IntegerType(), True)])

In [21]:
dataset = [
    (101, "John", 56000, 2),
    (102, "Max", 50000, 0),
    (103, "Shawn", 656000, 1),
    (104, "Jack", 45000, 5),
    (105, "Nick", 89000, 3),
    (106, "Smith", 25000, 1),
    (107, "Suma", 75000, 0),
    (108, "Alex", 85000, 0),
    (109, "Raman", 100000, 2)
]

new_df = spark.createDataFrame(data=dataset, schema=schema)

In [22]:
new_df.show()

+-----+-------+---------+---------+
|EmpID|EmpName|EmpSalary|EmpLeaves|
+-----+-------+---------+---------+
|  101|   John|    56000|        2|
|  102|    Max|    50000|        0|
|  103|  Shawn|   656000|        1|
|  104|   Jack|    45000|        5|
|  105|   Nick|    89000|        3|
|  106|  Smith|    25000|        1|
|  107|   Suma|    75000|        0|
|  108|   Alex|    85000|        0|
|  109|  Raman|   100000|        2|
+-----+-------+---------+---------+



In [23]:
new_df.printSchema()

root
 |-- EmpID: integer (nullable = true)
 |-- EmpName: string (nullable = true)
 |-- EmpSalary: integer (nullable = true)
 |-- EmpLeaves: integer (nullable = true)



In [24]:
new_df.dtypes

[('EmpID', 'int'),
 ('EmpName', 'string'),
 ('EmpSalary', 'int'),
 ('EmpLeaves', 'int')]

In [26]:
# print number of rows
new_df.count()

9

In [27]:
# print first 3 rows
new_df.limit(3).show()

+-----+-------+---------+---------+
|EmpID|EmpName|EmpSalary|EmpLeaves|
+-----+-------+---------+---------+
|  101|   John|    56000|        2|
|  102|    Max|    50000|        0|
|  103|  Shawn|   656000|        1|
+-----+-------+---------+---------+



In [29]:
new_df.limit(3).toPandas()

Unnamed: 0,EmpID,EmpName,EmpSalary,EmpLeaves
0,101,John,56000,2
1,102,Max,50000,0
2,103,Shawn,656000,1


In [30]:
# print some specific columns
new_df.select("EmpName", "EmpSalary").show(3)

+-------+---------+
|EmpName|EmpSalary|
+-------+---------+
|   John|    56000|
|    Max|    50000|
|  Shawn|   656000|
+-------+---------+
only showing top 3 rows



In [31]:
new_df.select(new_df.EmpName, new_df.EmpSalary).show(3)

+-------+---------+
|EmpName|EmpSalary|
+-------+---------+
|   John|    56000|
|    Max|    50000|
|  Shawn|   656000|
+-------+---------+
only showing top 3 rows



In [33]:
# Indexing
new_df.select(new_df.columns[1:4]).show(5)

+-------+---------+---------+
|EmpName|EmpSalary|EmpLeaves|
+-------+---------+---------+
|   John|    56000|        2|
|    Max|    50000|        0|
|  Shawn|   656000|        1|
|   Jack|    45000|        5|
|   Nick|    89000|        3|
+-------+---------+---------+
only showing top 5 rows



In [34]:
from pyspark.sql.functions import col, lit

In [35]:
# increment salary of each emp by 5000
new_df.withColumn("IncrementedSalary", col("EmpSalary") + 5000).show()

+-----+-------+---------+---------+-----------------+
|EmpID|EmpName|EmpSalary|EmpLeaves|IncrementedSalary|
+-----+-------+---------+---------+-----------------+
|  101|   John|    56000|        2|            61000|
|  102|    Max|    50000|        0|            55000|
|  103|  Shawn|   656000|        1|           661000|
|  104|   Jack|    45000|        5|            50000|
|  105|   Nick|    89000|        3|            94000|
|  106|  Smith|    25000|        1|            30000|
|  107|   Suma|    75000|        0|            80000|
|  108|   Alex|    85000|        0|            90000|
|  109|  Raman|   100000|        2|           105000|
+-----+-------+---------+---------+-----------------+



In [40]:
new_df.withColumn("SalaryPerDay", col("EmpSalary")/22 * col("EmpLeaves")).show()

+-----+-------+---------+---------+------------------+
|EmpID|EmpName|EmpSalary|EmpLeaves|      SalaryPerDay|
+-----+-------+---------+---------+------------------+
|  101|   John|    56000|        2| 5090.909090909091|
|  102|    Max|    50000|        0|               0.0|
|  103|  Shawn|   656000|        1| 29818.18181818182|
|  104|   Jack|    45000|        5|10227.272727272728|
|  105|   Nick|    89000|        3|12136.363636363636|
|  106|  Smith|    25000|        1|1136.3636363636363|
|  107|   Suma|    75000|        0|               0.0|
|  108|   Alex|    85000|        0|               0.0|
|  109|  Raman|   100000|        2|  9090.90909090909|
+-----+-------+---------+---------+------------------+



In [41]:
new_df.withColumn("SepMonthSalary", col("EmpSalary") - (col("EmpSalary")/22 * col("EmpLeaves"))).show()

+-----+-------+---------+---------+------------------+
|EmpID|EmpName|EmpSalary|EmpLeaves|    SepMonthSalary|
+-----+-------+---------+---------+------------------+
|  101|   John|    56000|        2| 50909.09090909091|
|  102|    Max|    50000|        0|           50000.0|
|  103|  Shawn|   656000|        1| 626181.8181818182|
|  104|   Jack|    45000|        5| 34772.72727272727|
|  105|   Nick|    89000|        3| 76863.63636363637|
|  106|  Smith|    25000|        1|23863.636363636364|
|  107|   Suma|    75000|        0|           75000.0|
|  108|   Alex|    85000|        0|           85000.0|
|  109|  Raman|   100000|        2| 90909.09090909091|
+-----+-------+---------+---------+------------------+



In [45]:
temp_df = new_df.withColumn("SepMonthSalary", col("EmpSalary") - (col("EmpSalary")/22 * col("EmpLeaves")))

In [46]:
temp_df = temp_df.withColumn("SepMonthSalary", col("SepMonthSalary").cast("Integer"))

In [47]:
temp_df.show()

+-----+-------+---------+---------+--------------+
|EmpID|EmpName|EmpSalary|EmpLeaves|SepMonthSalary|
+-----+-------+---------+---------+--------------+
|  101|   John|    56000|        2|         50909|
|  102|    Max|    50000|        0|         50000|
|  103|  Shawn|   656000|        1|        626181|
|  104|   Jack|    45000|        5|         34772|
|  105|   Nick|    89000|        3|         76863|
|  106|  Smith|    25000|        1|         23863|
|  107|   Suma|    75000|        0|         75000|
|  108|   Alex|    85000|        0|         85000|
|  109|  Raman|   100000|        2|         90909|
+-----+-------+---------+---------+--------------+



In [48]:
# Drop a column
temp_df.drop("SepMonthSalary").show(4)

+-----+-------+---------+---------+
|EmpID|EmpName|EmpSalary|EmpLeaves|
+-----+-------+---------+---------+
|  101|   John|    56000|        2|
|  102|    Max|    50000|        0|
|  103|  Shawn|   656000|        1|
|  104|   Jack|    45000|        5|
+-----+-------+---------+---------+
only showing top 4 rows



In [50]:
temp_df.withColumn("EmpDept", lit("IT")).show()

+-----+-------+---------+---------+--------------+-------+
|EmpID|EmpName|EmpSalary|EmpLeaves|SepMonthSalary|EmpDept|
+-----+-------+---------+---------+--------------+-------+
|  101|   John|    56000|        2|         50909|     IT|
|  102|    Max|    50000|        0|         50000|     IT|
|  103|  Shawn|   656000|        1|        626181|     IT|
|  104|   Jack|    45000|        5|         34772|     IT|
|  105|   Nick|    89000|        3|         76863|     IT|
|  106|  Smith|    25000|        1|         23863|     IT|
|  107|   Suma|    75000|        0|         75000|     IT|
|  108|   Alex|    85000|        0|         85000|     IT|
|  109|  Raman|   100000|        2|         90909|     IT|
+-----+-------+---------+---------+--------------+-------+



In [51]:
temp_df.filter(temp_df.EmpSalary >= 60000).show()

+-----+-------+---------+---------+--------------+
|EmpID|EmpName|EmpSalary|EmpLeaves|SepMonthSalary|
+-----+-------+---------+---------+--------------+
|  103|  Shawn|   656000|        1|        626181|
|  105|   Nick|    89000|        3|         76863|
|  107|   Suma|    75000|        0|         75000|
|  108|   Alex|    85000|        0|         85000|
|  109|  Raman|   100000|        2|         90909|
+-----+-------+---------+---------+--------------+



In [52]:
# Sorting
temp_df.sort(col("EmpName")).show()

+-----+-------+---------+---------+--------------+
|EmpID|EmpName|EmpSalary|EmpLeaves|SepMonthSalary|
+-----+-------+---------+---------+--------------+
|  108|   Alex|    85000|        0|         85000|
|  104|   Jack|    45000|        5|         34772|
|  101|   John|    56000|        2|         50909|
|  102|    Max|    50000|        0|         50000|
|  105|   Nick|    89000|        3|         76863|
|  109|  Raman|   100000|        2|         90909|
|  103|  Shawn|   656000|        1|        626181|
|  106|  Smith|    25000|        1|         23863|
|  107|   Suma|    75000|        0|         75000|
+-----+-------+---------+---------+--------------+



In [53]:
temp_df.sort(col("EmpName").desc()).show()

+-----+-------+---------+---------+--------------+
|EmpID|EmpName|EmpSalary|EmpLeaves|SepMonthSalary|
+-----+-------+---------+---------+--------------+
|  107|   Suma|    75000|        0|         75000|
|  106|  Smith|    25000|        1|         23863|
|  103|  Shawn|   656000|        1|        626181|
|  109|  Raman|   100000|        2|         90909|
|  105|   Nick|    89000|        3|         76863|
|  102|    Max|    50000|        0|         50000|
|  101|   John|    56000|        2|         50909|
|  104|   Jack|    45000|        5|         34772|
|  108|   Alex|    85000|        0|         85000|
+-----+-------+---------+---------+--------------+



In [54]:
temp_df.groupBy("EmpLeaves").sum("EmpSalary").show()

+---------+--------------+
|EmpLeaves|sum(EmpSalary)|
+---------+--------------+
|        1|        681000|
|        3|         89000|
|        5|         45000|
|        2|        156000|
|        0|        210000|
+---------+--------------+



In [57]:
from pyspark.sql.functions import sum, avg, min, max, count

temp_df.groupBy("EmpLeaves").agg(sum("EmpSalary").alias("Total Salary"), max("EmpSalary"), min("EmpSalary"), avg("EmpSalary")).show()

+---------+------------+--------------+--------------+--------------+
|EmpLeaves|Total Salary|max(EmpSalary)|min(EmpSalary)|avg(EmpSalary)|
+---------+------------+--------------+--------------+--------------+
|        1|      681000|        656000|         25000|      340500.0|
|        3|       89000|         89000|         89000|       89000.0|
|        5|       45000|         45000|         45000|       45000.0|
|        2|      156000|        100000|         56000|       78000.0|
|        0|      210000|         85000|         50000|       70000.0|
+---------+------------+--------------+--------------+--------------+

