In [1]:
'''
Run the below cmd in cmd prompt
conda install -c conda-forge findspark
'''

import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'C:\\spark'

# PySpark StructType & StructField

In [45]:
#To create a StructType & StructField on DataFrame 

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType #--> To define the struct of DF

#Sparksession link:https://sparkbyexamples.com/pyspark/pyspark-what-is-sparksession/#:~:text=getOrCreate()%20%E2%80%93%20This%20returns%20a,SparkSession%20using%20newSession()%20method.&text=This%20always%20creates%20new%20SparkSession%20object.

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","","Smith","36636","M",3000),\
    ("Michael","Rose","","40288","M",4000),\
    ("Robert","","Williams","42114","M",4000),\
    ("Maria","Anne","Jones","39192","F",4000),\
    ("Jen1234567890987654321","Mary","Brown","","F",-1)\
     ]

#Create StructType
schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])

df = spark.createDataFrame(data=data,schema=schema,)

#printSchema() method on the DataFrame shows StructType columns as “struct”.

df.printSchema()

df.show(truncate=False) 
# if truncate=True --> will not show the whole value of a column
# if truncate=False --> will show the whole value of a column

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+----------------------+----------+--------+-----+------+------+
|firstname             |middlename|lastname|id   |gender|salary|
+----------------------+----------+--------+-----+------+------+
|James                 |          |Smith   |36636|M     |3000  |
|Michael               |Rose      |        |40288|M     |4000  |
|Robert                |          |Williams|42114|M     |4000  |
|Maria                 |Anne      |Jones   |39192|F     |4000  |
|Jen1234567890987654321|Mary      |Brown   |     |F     |-1    |
+----------------------+----------+--------+-----+------+------+



In [15]:
# Defining nested StructType object struct

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()


structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith]    |36636|M     |3100  |
|[Michael, Rose, ]   |40288|M     |4300  |
|[Robert, , Williams]|42114|M     |1400  |
|[Maria, Anne, Jones]|39192|F     |5500  |
|[Jen, Mary, Brown]  |     |F     |-1    |
+--------------------+-----+------+------+



In [29]:
#Creating StructType object struct from JSON file
prit(df2.schema.json())

In [26]:
df.schema.simpleString()

'struct<firstname:string,middlename:string,lastname:string,id:string,gender:string,salary:int>'

In [52]:
#Load the json file and use it to create a DataFrame.
import pyspark  
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import json


spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)

#Load the json file and use it to create a DataFrame.
df2_schem_json = df2.schema.json()
schemaFromJson = StructType.fromJson(json.loads(df2_schem_json))

#here we converted RDD to DF(check 'data' args). Note:parallelize() to create RDD
df3 = spark.createDataFrame(data=spark.sparkContext.parallelize(structureData),schema=schemaFromJson)
df3.show(truncate=False)


+--------------------+-----+------+------+
|name                |id   |gender|salary|
+--------------------+-----+------+------+
|[James, , Smith]    |36636|M     |3100  |
|[Michael, Rose, ]   |40288|M     |4300  |
|[Robert, , Williams]|42114|M     |1400  |
|[Maria, Anne, Jones]|39192|F     |5500  |
|[Jen, Mary, Brown]  |     |F     |-1    |
+--------------------+-----+------+------+



In [56]:
#Adding & Changing struct of the DataFrame

import pyspark  
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col,struct,when
import json

#spark-sql-functions link : https://sparkbyexamples.com/spark/spark-sql-functions/

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)

#withcolumn() link: https://sparkbyexamples.com/pyspark/pyspark-withcolumn/
        
updatedDF = df2.withColumn("OtherInfo", \
    struct(col("id").alias("identifier"),\
    col("gender").alias("gender"),\
    col("salary").alias("salary"),\
    when(col("salary").cast(IntegerType()) < 2000,"Low")\
      .when(col("salary").cast(IntegerType()) < 4000,"Medium")\
      .otherwise("High").alias("Salary_Grade")\
  )).drop("id","gender","salary")

updatedDF.printSchema()
updatedDF.show(truncate=False)


root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- OtherInfo: struct (nullable = false)
 |    |-- identifier: string (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- salary: integer (nullable = true)
 |    |-- Salary_Grade: string (nullable = false)

+--------------------+------------------------+
|name                |OtherInfo               |
+--------------------+------------------------+
|[James, , Smith]    |[36636, M, 3100, Medium]|
|[Michael, Rose, ]   |[40288, M, 4300, High]  |
|[Robert, , Williams]|[42114, M, 1400, Low]   |
|[Maria, Anne, Jones]|[39192, F, 5500, High]  |
|[Jen, Mary, Brown]  |[, F, -1, Low]          |
+--------------------+------------------------+



In [11]:
#Using SQL ArrayType and MapType


'''
Getting error need to check "arr_map_structureData"

ArrayType link : http://nadbordrozd.github.io/blog/2016/05/22/one-weird-trick-that-will-fix-your-pyspark-schemas/

'''
import pyspark  
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, ArrayType, MapType
from pyspark.sql.functions import col,struct,when
import json

#spark-sql-functions link : https://sparkbyexamples.com/spark/spark-sql-functions/

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

arr_map_structureData = [\
    (("James","","Smith"),["cricket","chess"],{'batsman':'bat'})\
  ]


arrayStructureSchema = StructType([
    StructField('name', StructType([
       StructField('firstname', StringType(), True),
       StructField('middlename', StringType(), True),
       StructField('lastname', StringType(), True)
       ])),
       StructField('hobbies', ArrayType(StringType()), True),
       StructField('properties', MapType(StringType(),StringType()), True)
    ])

arr_map_df = spark.createDataFrame(data=arr_map_structureData,schema=arrayStructureSchema)
arr_map_df.printSchema()
arr_map_df.show()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- hobbies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

+----------------+----------------+----------------+
|            name|         hobbies|      properties|
+----------------+----------------+----------------+
|[James, , Smith]|[cricket, chess]|[batsman -> bat]|
+----------------+----------------+----------------+



In [40]:
#Checking if a field exists in a DataFrame
from pyspark.sql.functions import col,struct,when

print(df.dtypes)


if ('firstname','string') in df.dtypes:
    print(f"both column and dtype present in {df.dtypes}")

    
if 'firstname' in df.columns:
    print(f"column present in {df.columns}")
    
'''
Below is for scala
print(df.schema.fieldNames.contains("firstname"))
print(df.schema.contains(StructField("firstname",StringType,true)))
'''

[('firstname', 'string'), ('middlename', 'string'), ('lastname', 'string'), ('id', 'string'), ('gender', 'string'), ('salary', 'int')]
both column and dtype present in [('firstname', 'string'), ('middlename', 'string'), ('lastname', 'string'), ('id', 'string'), ('gender', 'string'), ('salary', 'int')]
column present in ['firstname', 'middlename', 'lastname', 'id', 'gender', 'salary']


'\nBelow is for scala\nprint(df.schema.fieldNames.contains("firstname"))\nprint(df.schema.contains(StructField("firstname",StringType,true)))\n'

In [41]:
#To compare two schema of df 
if df.printSchema() == df.printSchema():
    print("yes")


root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)

yes
