In [21]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, FloatType,\
                               StructType, StructField, ArrayType
from pyspark.sql.functions import col, array_contains
                            

In [5]:
spark_app =  SparkSession.builder.appName('sparkdemo').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/09 18:46:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/09 18:46:45 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [7]:
# ------create student data with 5 rows and 6 attributes------
students =[['001','john',23,5.79,67,'NY'], ['002','James',18,3.79,34,'NY'], ['003','Eric',17,2.79,17,'NJ'],
               ['004','Shahparan',19,3.69,28,'NJ'],['005','Flex',37,5.59,54,'Dallas']]
students

[['001', 'john', 23, 5.79, 67, 'NY'],
 ['002', 'James', 18, 3.79, 34, 'NY'],
 ['003', 'Eric', 17, 2.79, 17, 'NJ'],
 ['004', 'Shahparan', 19, 3.69, 28, 'NJ'],
 ['005', 'Flex', 37, 5.59, 54, 'Dallas']]

In [15]:
#----------define the StructType and StructFields-------
schema= StructType([StructField("rollno", StringType(), True),
        StructField("name",StringType(),True),
        StructField("age",IntegerType(),True),
        StructField("height", FloatType(), True),
        StructField("weight", IntegerType(), True),
        StructField("address", StringType(), True) 
])

In [16]:
schema.fieldNames

<bound method StructType.fieldNames of StructType([StructField('rollno', StringType(), True), StructField('name', StringType(), True), StructField('age', IntegerType(), True), StructField('height', FloatType(), True), StructField('weight', IntegerType(), True), StructField('address', StringType(), True)])>

In [17]:
df = spark_app.createDataFrame(students, schema=schema)
df.show()

+------+---------+---+------+------+-------+
|rollno|     name|age|height|weight|address|
+------+---------+---+------+------+-------+
|   001|     john| 23|  5.79|    67|     NY|
|   002|    James| 18|  3.79|    34|     NY|
|   003|     Eric| 17|  2.79|    17|     NJ|
|   004|Shahparan| 19|  3.69|    28|     NJ|
|   005|     Flex| 37|  5.59|    54| Dallas|
+------+---------+---+------+------+-------+



In [18]:
df.schema.fields

[StructField('rollno', StringType(), True),
 StructField('name', StringType(), True),
 StructField('age', IntegerType(), True),
 StructField('height', FloatType(), True),
 StructField('weight', IntegerType(), True),
 StructField('address', StringType(), True)]

In [19]:
df.printSchema()

root
 |-- rollno: string (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- height: float (nullable = true)
 |-- weight: integer (nullable = true)
 |-- address: string (nullable = true)



In [22]:
spark = SparkSession.builder.appName('SparkApp').getOrCreate()

23/02/09 18:55:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [24]:
# ------create dummy data ----
arrayStructureData = [
        (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
        (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
        (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
        (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
        (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
        (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
        ]
type(arrayStructureData)

list

In [29]:
#-----define the StructType and StructFields----
arrayStructureSchema = StructType([
    StructField("name",StructType([StructField('firstname', StringType(), True),
                                    StructField('middlename', StringType(), True),
                                    StructField('lastname', StringType(), True)])),
    StructField("languages", ArrayType(StringType()), True),
    StructField("state", StringType(), True),
    StructField("gender", StringType(), True)
])  
arrayStructureSchema.fields

[StructField('name', StructType([StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True)]), True),
 StructField('languages', ArrayType(StringType(), True), True),
 StructField('state', StringType(), True),
 StructField('gender', StringType(), True)]

In [30]:
#---create the dataframe and add schema to the dataframe---
df_array = spark.createDataFrame(data = arrayStructureData, schema=arrayStructureSchema)
df_array.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)



In [33]:
df_array.show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [35]:
# filtering the dataframe
df_array.filter(df_array.state == 'OH').show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [36]:
df_array.filter(~(df_array.state == 'OH')).show(truncate=False)

+--------------------+------------------+-----+------+
|name                |languages         |state|gender|
+--------------------+------------------+-----+------+
|{Anna, Rose, }      |[Spark, Java, C++]|NY   |F     |
|{Maria, Anne, Jones}|[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, VB]      |NY   |M     |
+--------------------+------------------+-----+------+



In [37]:
df_array.filter(df_array.state != 'OH').show(truncate=False)

+--------------------+------------------+-----+------+
|name                |languages         |state|gender|
+--------------------+------------------+-----+------+
|{Anna, Rose, }      |[Spark, Java, C++]|NY   |F     |
|{Maria, Anne, Jones}|[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}  |[CSharp, VB]      |NY   |M     |
+--------------------+------------------+-----+------+



In [38]:
df_array.filter(col("state") == 'OH').show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [40]:
df_array.filter("state == 'OH'").show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [43]:
df_array.filter("gender != 'M'").show(truncate=False)

+-------------------+------------------+-----+------+
|name               |languages         |state|gender|
+-------------------+------------------+-----+------+
|{Anna, Rose, }     |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}|[CSharp, VB]      |OH   |F     |
+-------------------+------------------+-----+------+



In [44]:
# ISIN:
li = ["OH","CA","DE"]
df_array.filter(df_array.state.isin(li)).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+



In [45]:
df_array.filter(~df_array.state.isin(li)).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [49]:
# dataframe booleans

df_array.filter((df_array.state  == "OH") & (df_array.gender  == "M"))\
        .show(truncate=False)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [50]:
# array_contains(column, value) return true if columny containes value
df_array.filter(array_contains(df_array.languages, "Java")).show()

+----------------+------------------+-----+------+
|            name|         languages|state|gender|
+----------------+------------------+-----+------+
|{James, , Smith}|[Java, Scala, C++]|   OH|     M|
|  {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
+----------------+------------------+-----+------+



In [53]:
df_array.collect()

[Row(name=Row(firstname='James', middlename='', lastname='Smith'), languages=['Java', 'Scala', 'C++'], state='OH', gender='M'),
 Row(name=Row(firstname='Anna', middlename='Rose', lastname=''), languages=['Spark', 'Java', 'C++'], state='NY', gender='F'),
 Row(name=Row(firstname='Julia', middlename='', lastname='Williams'), languages=['CSharp', 'VB'], state='OH', gender='F'),
 Row(name=Row(firstname='Maria', middlename='Anne', lastname='Jones'), languages=['CSharp', 'VB'], state='NY', gender='M'),
 Row(name=Row(firstname='Jen', middlename='Mary', lastname='Brown'), languages=['CSharp', 'VB'], state='NY', gender='M'),
 Row(name=Row(firstname='Mike', middlename='Mary', lastname='Williams'), languages=['Python', 'VB'], state='OH', gender='M')]

In [58]:
df_array.filter(df_array.name.lastname == "Williams").show()

+--------------------+------------+-----+------+
|                name|   languages|state|gender|
+--------------------+------------+-----+------+
| {Julia, , Williams}|[CSharp, VB]|   OH|     F|
|{Mike, Mary, Will...|[Python, VB]|   OH|     M|
+--------------------+------------+-----+------+



In [59]:
df_array.filter(df_array.name.firstname.startswith('M')).show()

+--------------------+------------+-----+------+
|                name|   languages|state|gender|
+--------------------+------------+-----+------+
|{Maria, Anne, Jones}|[CSharp, VB]|   NY|     M|
|{Mike, Mary, Will...|[Python, VB]|   OH|     M|
+--------------------+------------+-----+------+



In [60]:
df_array.filter(df_array.name.firstname.endswith('a')).show()

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
+--------------------+------------------+-----+------+



In [61]:
df_array.filter(df_array.name.firstname.like('%na')).show()

+--------------+------------------+-----+------+
|          name|         languages|state|gender|
+--------------+------------------+-----+------+
|{Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
+--------------+------------------+-----+------+



In [63]:
spark.stop()
spark_app.stop()