<a href="https://colab.research.google.com/github/jinyang628/pyspark-practice/blob/main/pyspark_practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [36]:
!pip install pyspark



In [37]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.sql.functions import lit


In [38]:
spark=SparkSession.builder.appName("Practice").getOrCreate() # Start session

In [39]:
df_pyspark=spark.read.csv("test1.csv")

In [40]:
df_pyspark=spark.read.csv('test1.csv', header=True, inferSchema=True) # Initialises the entire df with the original header names

In [41]:
df_pyspark.columns # Get all column names in a list

['name', 'age', 'experience', 'salary']

In [42]:
df_pyspark.select(['name', 'experience']) # Select 2 particular columns

DataFrame[name: string, experience: int]

In [43]:
df_appended=df_pyspark.withColumn("experience after 2 years", df_pyspark['experience']+2) # Add a new column

In [44]:
df_appended.drop('experience after 2 years') # drop column

DataFrame[name: string, age: int, experience: int, salary: int]

In [45]:
df_renamed=df_pyspark.withColumnRenamed('name', 'new name') # rename column


In [46]:
df_pyspark.na.drop() # drop all rows that contain a null value

DataFrame[name: string, age: int, experience: int, salary: int]

In [47]:
df_pyspark.na.drop(how='all') # drop all rows that are fully null

DataFrame[name: string, age: int, experience: int, salary: int]

In [48]:
df_pyspark.na.drop(how='any', subset=['experience']) # only drop the row if experience column is null

DataFrame[name: string, age: int, experience: int, salary: int]

In [49]:
# df_replaced = df_pyspark.na.fill({"age": "missing values"}) # will fail to replace null with the string because the column is not of string type
df_replaced = df_pyspark.na.fill({"age": 10, "experience": 24}) # replace null values in age column with -1

df_replaced.show()

+-------+---+----------+------+
|   name|age|experience|salary|
+-------+---+----------+------+
|   john| 10|        24|   100|
| thomas| 12|        24|   200|
|jinyang| 10|        23|   300|
+-------+---+----------+------+



In [50]:
df_replaced.filter(df_replaced["salary"]<200).select(["name","age"]).show()

+----+---+
|name|age|
+----+---+
|john| 10|
+----+---+



In [51]:
df_replaced.filter(
    (df_replaced["salary"]<=200) &
    (df_replaced["experience"]>=22)
  ).select(["name","age"]).show()

+------+---+
|  name|age|
+------+---+
|  john| 10|
|thomas| 12|
+------+---+



In [52]:
df_replaced.filter(
    (df_replaced["salary"] <= 200) &
    ~(df_replaced["experience"] <= 22) # ~ is inverse
 ).select(["name"]).show()

+------+
|  name|
+------+
|  john|
|thomas|
+------+



In [61]:

df_replaced = df_replaced.withColumn("department", lit("sales"))
df_replaced.withColumn("department", when(col("name")=="john", "education").otherwise(col("department"))).show()

+-------+---+----------+------+----------+
|   name|age|experience|salary|department|
+-------+---+----------+------+----------+
|   john| 10|        24|   100| education|
| thomas| 12|        24|   200|     sales|
|jinyang| 10|        23|   300|     sales|
+-------+---+----------+------+----------+



In [65]:
df_replaced.drop("age", "department", "experience").groupBy("name").count().show()

+-------+-----+
|   name|count|
+-------+-----+
|jinyang|    1|
|   john|    1|
| thomas|    1|
+-------+-----+



In [67]:
df_replaced.agg({"salary": "sum"}).show()

+-----------+
|sum(salary)|
+-----------+
|        600|
+-----------+

