In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark=SparkSession.builder.master('local[*]').appName('First_program').getOrCreate()

In [38]:
from pyspark.sql.types import StructType,StructField 
from pyspark.sql.types import StringType, IntegerType, ArrayType
data = [
    (("James","","Smith"),["Java","Scala","C++"],"OH","M"),
    (("Anna","Rose",""),["Spark","Java","C++"],"NY","F"),
    (("Julia","","Williams"),["CSharp","VB"],"OH","F"),
    (("Maria","Anne","Jones"),["CSharp","VB"],"NY","M"),
    (("Jen","Mary","Brown"),["CSharp","VB"],"NY","M"),
    (("Mike","Mary","Williams"),["Python","VB"],"OH","M")
 ]
        
schema = StructType([
     StructField('name', StructType([
        StructField('firstname', StringType(), True),
        StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
     ])),
     StructField('languages', ArrayType(StringType()), True),
     StructField('state', StringType(), True),
     StructField('gender',StringType())
 ])

df = spark.createDataFrame(data = data, schema = schema)
df.printSchema()
df.show(truncate=False)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- state: string (nullable = true)
 |-- gender: string (nullable = true)

+----------------------+------------------+-----+------+
|name                  |languages         |state|gender|
+----------------------+------------------+-----+------+
|{James, , Smith}      |[Java, Scala, C++]|OH   |M     |
|{Anna, Rose, }        |[Spark, Java, C++]|NY   |F     |
|{Julia, , Williams}   |[CSharp, VB]      |OH   |F     |
|{Maria, Anne, Jones}  |[CSharp, VB]      |NY   |M     |
|{Jen, Mary, Brown}    |[CSharp, VB]      |NY   |M     |
|{Mike, Mary, Williams}|[Python, VB]      |OH   |M     |
+----------------------+------------------+-----+------+



In [39]:
df.select(df.state).printSchema()
df.filter((df.state=='OH') & (df.gender=='M')).show()
df.filter(df.state.isin(['OH','NY'])).show()
df.filter(df.state.like('O%')).show()

root
 |-- state: string (nullable = true)

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+

+--------------------+------------------+-----+------+
|                name|         languages|state|gender|
+--------------------+------------------+-----+------+
|    {James, , Smith}|[Java, Scala, C++]|   OH|     M|
|      {Anna, Rose, }|[Spark, Java, C++]|   NY|     F|
| {Julia, , Williams}|      [CSharp, VB]|   OH|     F|
|{Maria, Anne, Jones}|      [CSharp, VB]|   NY|     M|
|  {Jen, Mary, Brown}|      [CSharp, VB]|   NY|     M|
|{Mike, Mary, Will...|      [Python, VB]|   OH|     M|
+--------------------+------------------+-----+------+

+--------------------+------------------+-----+------+
|                nam

In [40]:
data = [
    ("James",None,"M"),
    ("Anna","NY","F"),
    ("Julia",None,None)
  ]

columns = ["name","state","gender"]
df1 = spark.createDataFrame(data,columns)
df1.show()


+-----+-----+------+
| name|state|gender|
+-----+-----+------+
|James| null|     M|
| Anna|   NY|     F|
|Julia| null|  null|
+-----+-----+------+



In [48]:
df1.filter(df1.state.isNull() & df1.gender.isNull()).show()

+-----+-----+------+
| name|state|gender|
+-----+-----+------+
|Julia| null|  null|
+-----+-----+------+



In [43]:
df1.na.drop().show()
df1.na.drop(subset=['gender']).show()

+----+-----+------+
|name|state|gender|
+----+-----+------+
|Anna|   NY|     F|
+----+-----+------+

+-----+-----+------+
| name|state|gender|
+-----+-----+------+
|James| null|     M|
| Anna|   NY|     F|
+-----+-----+------+



In [4]:
data = [('James','xxx','Smith','1991-04-01','M','3000'),
  ('Michael','Rose','yyy','2000-05-19','M','4000'),
  ('Robert','aaa','Williams','1978-09-05','M','4000'),
  ('Maria','Anne','Jones','1967-12-01','F','4000'),
  ('Jen','Mary','Brown','1980-02-17','F','-1')]

columns = ["firstname","middlename","lastname","dob","gender","salary"]

df2 = spark.createDataFrame(data=data, schema=columns)

df2.printSchema()
df2.show()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: string (nullable = true)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|       xxx|   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|     yyy|2000-05-19|     M|  4000|
|   Robert|       aaa|Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [5]:
df_3=df2.withColumn('dob',df2.dob.cast('date'))
df_3.select(df_3.dob).printSchema()

root
 |-- dob: date (nullable = true)



In [12]:
df2.withColumn('Updatesalary',col('salary')+1000).show()

+---------+----------+--------+----------+------+------+------------+
|firstname|middlename|lastname|       dob|gender|salary|Updatesalary|
+---------+----------+--------+----------+------+------+------------+
|    James|       xxx|   Smith|1991-04-01|     M|  3000|      4000.0|
|  Michael|      Rose|     yyy|2000-05-19|     M|  4000|      5000.0|
|   Robert|       aaa|Williams|1978-09-05|     M|  4000|      5000.0|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|      5000.0|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|       999.0|
+---------+----------+--------+----------+------+------+------------+



In [18]:
from pyspark.sql.functions import concat,lit
df2.withColumn('Name',concat(col('firstname'),lit('-'),col('lastname'))).show()

+---------+----------+--------+----------+------+------+---------------+
|firstname|middlename|lastname|       dob|gender|salary|           Name|
+---------+----------+--------+----------+------+------+---------------+
|    James|       xxx|   Smith|1991-04-01|     M|  3000|    James-Smith|
|  Michael|      Rose|     yyy|2000-05-19|     M|  4000|    Michael-yyy|
|   Robert|       aaa|Williams|1978-09-05|     M|  4000|Robert-Williams|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|    Maria-Jones|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|      Jen-Brown|
+---------+----------+--------+----------+------+------+---------------+



In [20]:
df2.withColumnRenamed('dob','DateOfBirth').show()

+---------+----------+--------+-----------+------+------+
|firstname|middlename|lastname|DateOfBirth|gender|salary|
+---------+----------+--------+-----------+------+------+
|    James|       xxx|   Smith| 1991-04-01|     M|  3000|
|  Michael|      Rose|     yyy| 2000-05-19|     M|  4000|
|   Robert|       aaa|Williams| 1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones| 1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown| 1980-02-17|     F|    -1|
+---------+----------+--------+-----------+------+------+



In [4]:
data = [("James", "Sales", 3000), \
    ("Michael", "Sales", 4600), \
    ("Robert", "Sales", 4100), \
    ("Maria", "Finance", 3000), \
    ("James", "Sales", 3000), \
    ("Scott", "Finance", 3300), \
    ("Jen", "Finance", 3900), \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000), \
    ("Saif", "Sales", 4100) \
  ]
columns= ["employee_name", "department", "salary"]
df3 = spark.createDataFrame(data = data, schema = columns)
df3.printSchema()
df3.show(truncate=False)
df3.count()

root
 |-- employee_name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|James        |Sales     |3000  |
|Michael      |Sales     |4600  |
|Robert       |Sales     |4100  |
|Maria        |Finance   |3000  |
|James        |Sales     |3000  |
|Scott        |Finance   |3300  |
|Jen          |Finance   |3900  |
|Jeff         |Marketing |3000  |
|Kumar        |Marketing |2000  |
|Saif         |Sales     |4100  |
+-------------+----------+------+



10

In [31]:
df3.select(df3.department).distinct().show()

+----------+
|department|
+----------+
|     Sales|
|   Finance|
| Marketing|
+----------+



In [41]:
df_4=df3.select(df3.salary).dropDuplicates()
df_4.count()

6

In [45]:
df3.select(df3.department,df3.employee_name).distinct().count()

9

In [47]:
df3.select(df3.department,df3.employee_name).dropDuplicates().count()

9

In [61]:
df3.sort(df3.salary.desc()).show()
df3.sort(df3.salary.desc(),(df3.department)).show()
df3.orderBy(df3.salary).show()

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|      Michael|     Sales|  4600|
|         Saif|     Sales|  4100|
|       Robert|     Sales|  4100|
|          Jen|   Finance|  3900|
|        Scott|   Finance|  3300|
|        James|     Sales|  3000|
|        Maria|   Finance|  3000|
|         Jeff| Marketing|  3000|
|        James|     Sales|  3000|
|        Kumar| Marketing|  2000|
+-------------+----------+------+

+-------------+----------+------+
|employee_name|department|salary|
+-------------+----------+------+
|      Michael|     Sales|  4600|
|         Saif|     Sales|  4100|
|       Robert|     Sales|  4100|
|          Jen|   Finance|  3900|
|        Scott|   Finance|  3300|
|        Maria|   Finance|  3000|
|         Jeff| Marketing|  3000|
|        James|     Sales|  3000|
|        James|     Sales|  3000|
|        Kumar| Marketing|  2000|
+-------------+----------+------+

+-------------+----------+------+
|employee_na

In [7]:
from pyspark.sql.functions import spark_partition_id
df3.repartition(8).groupBy(spark_partition_id()).count().show()

+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   6|    8|
|                   7|    2|
+--------------------+-----+

