Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

24/03/19 14:23:43 WARN Utils: Your hostname, devsidd resolves to a loopback address: 127.0.1.1; using 192.168.1.8 instead (on interface wlp2s0)
24/03/19 14:23:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/19 14:23:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Dummy Data Creation

In [2]:
sampleData = [
    {
        "employee": "James", "department": "Sales", "state": "NY", "salary": 90000, "age": 34, "bonus": 10000
    },
    {
        "employee": "Michael", "department": "Sales", "state": "NY", "salary": 86000, "age": 56, "bonus": 20000
    },
    {
        "employee": "Robert", "department": "Sales", "state": "CA", "salary": 81000, "age": 39, "bonus": 23000
    },
    {
        "employee": "Maria", "department": "Finance", "state": "CA", "salary": 90000, "age": 24, "bonus": 23000
    },
    {
        "employee": "Raman", "department": "Finance", "state": "CA", "salary": 99000, "age": 40, "bonus": 24000
    },
    {
        "employee": "Scott", "department": "Finance", "state": "NY", "salary": 83000, "age": 36, "bonus": 19000
    },
    {
        "employee": "Jen", "department": "Finance", "state": "NY", "salary": 79000, "age": 53, "bonus": 15000
    },
    {
        "employee": "Jeff", "department": "Marketing", "state": "CA", "salary": 80000, "age": 25, "bonus": 18000
    },
    {
        "employee": "Kumar", "department": "Marketing", "state": "NY", "salary": 91000, "age": 50, "bonus": 21000
    }
]

Create and Show Spark Dataframe

In [12]:
# to read from a .csv file
# df = spark.read.csv("<file_path>")

df = spark.createDataFrame(sampleData)
df.show()

+---+-----+----------+--------+------+-----+
|age|bonus|department|employee|salary|state|
+---+-----+----------+--------+------+-----+
| 34|10000|     Sales|   James| 90000|   NY|
| 56|20000|     Sales| Michael| 86000|   NY|
| 39|23000|     Sales|  Robert| 81000|   CA|
| 24|23000|   Finance|   Maria| 90000|   CA|
| 40|24000|   Finance|   Raman| 99000|   CA|
| 36|19000|   Finance|   Scott| 83000|   NY|
| 53|15000|   Finance|     Jen| 79000|   NY|
| 25|18000| Marketing|    Jeff| 80000|   CA|
| 50|21000| Marketing|   Kumar| 91000|   NY|
+---+-----+----------+--------+------+-----+



Show as Pandas Dataframe

In [13]:
df.limit(5).toPandas()

Unnamed: 0,age,bonus,department,employee,salary,state
0,34,10000,Sales,James,90000,NY
1,56,20000,Sales,Michael,86000,NY
2,39,23000,Sales,Robert,81000,CA
3,24,23000,Finance,Maria,90000,CA
4,40,24000,Finance,Raman,99000,CA


Show Schema

In [14]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)
 |-- department: string (nullable = true)
 |-- employee: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- state: string (nullable = true)



Rename Columns

In [15]:
df = df.withColumnRenamed("employee", "employee_name")
df.printSchema()

# rename for all columns
# df = df.toDF(*["age_xy", "bonus_xy", "department_xy", "employee_xy", "salary_xy", "state_xy"])
# df.printSchema()

root
 |-- age: long (nullable = true)
 |-- bonus: long (nullable = true)
 |-- department: string (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- state: string (nullable = true)



Select columns

In [16]:
df1 = df.select("employee_name", "age", "state")
df1.show()

+-------------+---+-----+
|employee_name|age|state|
+-------------+---+-----+
|        James| 34|   NY|
|      Michael| 56|   NY|
|       Robert| 39|   CA|
|        Maria| 24|   CA|
|        Raman| 40|   CA|
|        Scott| 36|   NY|
|          Jen| 53|   NY|
|         Jeff| 25|   CA|
|        Kumar| 50|   NY|
+-------------+---+-----+



Sort

In [20]:
## by default, in ascending order
# df.sort('age').show()

#OR
from pyspark.sql import functions as F

df.sort(F.asc('age')).show()

+---+-----+----------+-------------+------+-----+
|age|bonus|department|employee_name|salary|state|
+---+-----+----------+-------------+------+-----+
| 24|23000|   Finance|        Maria| 90000|   CA|
| 25|18000| Marketing|         Jeff| 80000|   CA|
| 34|10000|     Sales|        James| 90000|   NY|
| 36|19000|   Finance|        Scott| 83000|   NY|
| 39|23000|     Sales|       Robert| 81000|   CA|
| 40|24000|   Finance|        Raman| 99000|   CA|
| 50|21000| Marketing|        Kumar| 91000|   NY|
| 53|15000|   Finance|          Jen| 79000|   NY|
| 56|20000|     Sales|      Michael| 86000|   NY|
+---+-----+----------+-------------+------+-----+



In [19]:
## sort in descending order
from pyspark.sql import functions as F

df.sort(F.desc('age')).show()

+---+-----+----------+-------------+------+-----+
|age|bonus|department|employee_name|salary|state|
+---+-----+----------+-------------+------+-----+
| 56|20000|     Sales|      Michael| 86000|   NY|
| 53|15000|   Finance|          Jen| 79000|   NY|
| 50|21000| Marketing|        Kumar| 91000|   NY|
| 40|24000|   Finance|        Raman| 99000|   CA|
| 39|23000|     Sales|       Robert| 81000|   CA|
| 36|19000|   Finance|        Scott| 83000|   NY|
| 34|10000|     Sales|        James| 90000|   NY|
| 25|18000| Marketing|         Jeff| 80000|   CA|
| 24|23000|   Finance|        Maria| 90000|   CA|
+---+-----+----------+-------------+------+-----+



Create a new column using Spark UDF

In [22]:
# normal python function
def salary_in_k(x):
    return x/1000

# convert the above function to spark UDF and return type of function (Typecast)
from pyspark.sql import types as T

salary_in_k_udf = F.udf(salary_in_k, T.DoubleType())

# creating column
df = df.withColumn('salary_in_k', salary_in_k_udf(F.col('salary')))

df.show()
            

+---+-----+----------+-------------+------+-----+-----------+
|age|bonus|department|employee_name|salary|state|salary_in_k|
+---+-----+----------+-------------+------+-----+-----------+
| 34|10000|     Sales|        James| 90000|   NY|       90.0|
| 56|20000|     Sales|      Michael| 86000|   NY|       86.0|
| 39|23000|     Sales|       Robert| 81000|   CA|       81.0|
| 24|23000|   Finance|        Maria| 90000|   CA|       90.0|
| 40|24000|   Finance|        Raman| 99000|   CA|       99.0|
| 36|19000|   Finance|        Scott| 83000|   NY|       83.0|
| 53|15000|   Finance|          Jen| 79000|   NY|       79.0|
| 25|18000| Marketing|         Jeff| 80000|   CA|       80.0|
| 50|21000| Marketing|        Kumar| 91000|   NY|       91.0|
+---+-----+----------+-------------+------+-----+-----------+



Filter

In [23]:
df.filter((df.age >= 50) & (df.salary_in_k >= 80.0)).show()

+---+-----+----------+-------------+------+-----+-----------+
|age|bonus|department|employee_name|salary|state|salary_in_k|
+---+-----+----------+-------------+------+-----+-----------+
| 56|20000|     Sales|      Michael| 86000|   NY|       86.0|
| 50|21000| Marketing|        Kumar| 91000|   NY|       91.0|
+---+-----+----------+-------------+------+-----+-----------+



GroupBy

In [24]:
# groupBy on single column with sum agg
df.groupBy('department').sum('salary').show()

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|     Sales|     257000|
|   Finance|     351000|
| Marketing|     171000|
+----------+-----------+



In [29]:
# groupBy on multiple columns
# df.groupBy('department', 'state').sum('salary', 'bonus').show()
# df.groupBy('department').sum('salary', 'bonus').show()
df.groupBy('department').agg({'salary': 'sum', 'bonus': 'sum'}).show()

+----------+----------+-----------+
|department|sum(bonus)|sum(salary)|
+----------+----------+-----------+
|     Sales|     53000|     257000|
|   Finance|     81000|     351000|
| Marketing|     39000|     171000|
+----------+----------+-----------+



In [30]:
# running more aggregations at a time
df.groupBy('department').agg(F.sum('salary'), F.avg('salary'),
                            F.min('bonus'), F.max('bonus')).show()

+----------+-----------+-----------------+----------+----------+
|department|sum(salary)|      avg(salary)|min(bonus)|max(bonus)|
+----------+-----------+-----------------+----------+----------+
|     Sales|     257000|85666.66666666667|     10000|     23000|
|   Finance|     351000|          87750.0|     15000|     24000|
| Marketing|     171000|          85500.0|     18000|     21000|
+----------+-----------+-----------------+----------+----------+



In [31]:
# using filter and alias on the aggreated data
df.groupBy('department').agg(F.sum('salary').alias('sum_salary'), 
                             F.avg('salary').alias('avg_salary'),
                             F.min('bonus').alias('min_bonus'), 
                             F.max('bonus').alias('max_bonus')).where(
                                                                F.col('min_bonus') >= 15000).show()

+----------+----------+----------+---------+---------+
|department|sum_salary|avg_salary|min_bonus|max_bonus|
+----------+----------+----------+---------+---------+
|   Finance|    351000|   87750.0|    15000|    24000|
| Marketing|    171000|   85500.0|    18000|    21000|
+----------+----------+----------+---------+---------+



In [33]:

## collect aggregated list
# df.groupBy('department').agg(F.collect_list('state')).alias('state_list').show()

## Collect unique values for states per deparment
df.groupBy('department').agg(F.collect_set('state').alias('unique_states')).show()

+----------+-------------+
|department|unique_states|
+----------+-------------+
|     Sales|     [NY, CA]|
|   Finance|     [CA, NY]|
| Marketing|     [CA, NY]|
+----------+-------------+

