# Prerequisites


## load 'employee.csv' into DataFrame

In [0]:
df = spark.read.csv(
    path = "/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/employee.csv",
                    sep = '|',
                    header = True,
                    inferSchema = True,
                    quote = "'"
).limit(10)

df.printSchema()
display(df)

#Array Type

## Create an array type column

In [0]:
from pyspark.sql.functions import split
 
result_df = df.withColumn("skills", split("col_skills", ",")).withColumn(
    "current_expected_salary",
    split("col_current_expected_salary", ",").cast("array<int>"),
)
result_df.printSchema()
result_df.display()

In [0]:
from pyspark.sql.functions import split
 
result_df = (
    df.withColumn("skills", split("col_skills", ","))
    .withColumn(
        "current_expected_salary",
        split("col_current_expected_salary", ",").cast("array<int>"),
    )
    .drop("col_skills", "col_current_expected_salary")
)
result_df.printSchema()
result_df.display()
 

In [0]:
from pyspark.sql.functions import col

result_df.select(
    col("current_expected_salary"),
    col("current_expected_salary")[0].alias("current_salary"),
    col("current_expected_salary")[1].alias("expected_salary"),
).display()

In [0]:
from pyspark.sql.functions import col
 
result_df1 = result_df.select(
    col("name"),
    col("current_expected_salary")[0].alias("current_salary"),
    col("current_expected_salary")[1].alias("expected_salary")
)
 
result_df1.filter(col("current_salary") > col("expected_salary")).display()

#Applying Different functions on array <<String>>


In [0]:
result_df.printSchema()

In [0]:
from pyspark.sql.functions import size, array_distinct, array_contains
 
result_df.select(
    col("name"),
    col("skills"),
    size("skills"),
    array_contains("skills", "PySpark"),
    array_distinct("skills").alias("distinct_skills"),
).display()

In [0]:
from pyspark.sql.functions import size, array_distinct, array_contains
 
result_df2 = result_df.select(
    col("name"),
    col("skills"),
    size("skills"),
    array_contains("skills", "PySpark"),
    array_distinct("skills").alias("distinct_skills"),
)

result_df2.filter(array_contains("skills", "PySpark")).display()

In [0]:
from pyspark.sql.functions import col, when, array_contains

result_incr = (
    result_df
    .select(
        col("name"),
        col("skills"),
        col("current_expected_salary")[0].alias("current_salary"),
        col("current_expected_salary")[1].alias("expected_salary")
    )
    .withColumn(
        "incr_salary",
        when(
            array_contains(col("skills"), "PySpark"),
            col("current_salary") * 1.30         
        ).otherwise(col("current_salary"))
    )
)

result_incr.display()


In [0]:
from pyspark.sql.functions import array_contains
 
result_df.withColumn(
    "base_salary",
    when(
        array_contains("skills", "PySpark"), col("current_expected_salary")[1] * 1.3
    ).otherwise(col("current_expected_salary")[1]),
).display()

In [0]:
from pyspark.sql.functions import col, array_contains, when
 
result_df = result_df.withColumn(
    "base_salary",
    when(
        array_contains(col("skills"), "PySpark"),
        (col("current_expected_salary")[1] * 1.3).cast("decimal(18,2)")
    ).otherwise(col("current_expected_salary")[1].cast("decimal(18,2)"))
)
 
result_df.display()
 
 

#Explode - It can be applied only on Array Column

In [0]:
from pyspark.sql.functions import explode
result_df.select(explode("skills").alias("words")).groupBy("words").count().display()

#STRUCT TYPE

#Load Product_information_001.json into the dataframe


In [0]:
df = spark.read.json(
    path="/Volumes/quickstart_catalog/quickstart_schema/sandbox/dataset/product_Information_001.json",
    multiLine=True,
)
df.printSchema()

In [0]:
from pyspark.sql.functions import col
df.select(col("name"), col("details.screen.size")).display()

In [0]:
from pyspark.sql.functions import col
df.select(col("product_id"), col("name"), col("details.screen.size"),col("details.memory.size"),col("details.storage.capacity")).display()