# Word Count


In [0]:
%fs head /FileStore/tables/july2023data/sample.txt

In [0]:
df = (spark
      .read
      .format("text")
      .load("/FileStore/tables/july2023data/sample.txt")
      .toDF("lines"))

display(df)

lines
hello world
how are you
hello world
I am doing fine
hello again
hope you are fine
hello world
hello world
I am also fine


In [0]:
from pyspark.sql.functions import split
split_df = df.select(split("lines"," ").alias("words"))

display(split_df)

words
"List(hello, world)"
"List(how, are, you)"
"List(hello, world)"
"List(I, am, doing, fine)"
"List(hello, again)"
"List(hope, you, are, fine)"
"List(hello, world)"
"List(hello, world)"
"List(I, am, also, fine)"


In [0]:
from pyspark.sql.functions import explode

explode_df = split_df.select(explode("words").alias("word"))
display(explode_df)

word
hello
world
how
are
you
hello
world
I
am
doing


In [0]:
agg_df = explode_df.groupby("word").count()
display(agg_df)

word,count
hope,1
you,2
fine,3
how,1
hello,5
again,1
doing,1
are,2
world,4
I,2


In [0]:
from pyspark.sql.functions import desc
sorted_df = agg_df.orderBy(desc("count"))
sorted_df.show(5)

+-----+-----+
| word|count|
+-----+-----+
|hello|    5|
|world|    4|
| fine|    3|
|  you|    2|
|  are|    2|
+-----+-----+
only showing top 5 rows



In [0]:
%fs head  /FileStore/tables/july2023data/input.json

In [0]:
input_df = (spark
            .read
            .format("json")
            .option("multiline","true")
            .load("/FileStore/tables/july2023data/input.json"))

In [0]:
input_df.show()

+----+---------+
|name|   values|
+----+---------+
| xyz|[a, b, c]|
+----+---------+



In [0]:
from pyspark.sql.functions import explode
explode_df = input_df.select("name", explode("values").alias("value"))
display(explode_df)

name,value
xyz,a
xyz,b
xyz,c


In [0]:
%fs head  /FileStore/tables/students_nested_json_data.json

In [0]:
sn_df = (spark.read.format("json")
         .option("multiline","true")
         .load("/FileStore/tables/students_nested_json_data.json"))

In [0]:
sn_df.show()

+--------------------+-----+
|           Education| name|
+--------------------+-----+
|[{BE, 2011}, {ME,...|  Ram|
|        [{BE, 2010}]|Rohit|
+--------------------+-----+



In [0]:
sn_df.printSchema()

root
 |-- Education: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Qualification: string (nullable = true)
 |    |    |-- year: long (nullable = true)
 |-- name: string (nullable = true)



In [0]:
explode_sn_df = sn_df.select("name",explode("Education").alias("education"))
explode_sn_df.show()

+-----+----------+
| name| education|
+-----+----------+
|  Ram|{BE, 2011}|
|  Ram|{ME, 2013}|
|Rohit|{BE, 2010}|
+-----+----------+



In [0]:
explode_sn_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- education: struct (nullable = true)
 |    |-- Qualification: string (nullable = true)
 |    |-- year: long (nullable = true)



In [0]:
final_df = explode_sn_df.select("name","education.Qualification","education.year")
final_df.show()

+-----+-------------+----+
| name|Qualification|year|
+-----+-------------+----+
|  Ram|           BE|2011|
|  Ram|           ME|2013|
|Rohit|           BE|2010|
+-----+-------------+----+



In [0]:
final_df_2 = explode_sn_df.select("name","education.*")
final_df_2.show()

+-----+-------------+----+
| name|Qualification|year|
+-----+-------------+----+
|  Ram|           BE|2011|
|  Ram|           ME|2013|
|Rohit|           BE|2010|
+-----+-------------+----+



#Infer Schema

In [0]:
%fs head /FileStore/tables/employee.csv

In [0]:
emp_csv = (spark
           .read
           .format("csv")
           .option("header","true")
           .option("inferSchema","true")
           .load("/FileStore/tables/employee.csv"))
display(emp_csv)

# infer schema adds one more spark job not a good practice as it slows down the process

ID,Name,HomeTown,Salary,JoiningDate
1,Arpit,Burhanpur,50000.0,2018-11-14
2,Benu,Bhubaneswar,100000.0,2018-11-19
3,Dilsher,Amritsar,100000.0,2018-11-19
4,Kiran,Bengaluru,50000.0,2018-11-15


In [0]:
emp_csv.printSchema()
# ideally the default data type is string type

root
 |-- ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- HomeTown: string (nullable = true)
 |-- Salary: double (nullable = true)
 |-- JoiningDate: date (nullable = true)



# user defined schema/ custom schema usind Struct

In [0]:
%fs head /FileStore/tables/july2023data/department.csv

In [0]:
# Strurt type defines a row/ record
#struct field defines a field/column
from pyspark.sql.types import StructType, IntegerType, StructField, StringType, DateType

dept_schema = StructType([StructField("EmployeeID",IntegerType()),
                          StructField("DepartmentName",StringType()),
                           StructField("Client",StringType()),
                           StructField("OnboardedDate",DateType())])

In [0]:
dept_df = (spark.read
           .format("csv")
           .option("header","true")
           .schema(dept_schema)
           .load("/FileStore/tables/july2023data/department.csv"))
dept_df.show()

+----------+----------------+--------------+-------------+
|EmployeeID|  DepartmentName|        Client|OnboardedDate|
+----------+----------------+--------------+-------------+
|         1|Data Engineering|Funding Circle|   2019-01-15|
|         2|Data Engineering|Funding Circle|   2018-11-19|
|         3|  Data Analytics|Funding Circle|   2018-11-19|
|         4|  Data Analytics|Funding Circle|   2018-12-16|
+----------+----------------+--------------+-------------+



In [0]:
dept_df.printSchema()

root
 |-- EmployeeID: integer (nullable = true)
 |-- DepartmentName: string (nullable = true)
 |-- Client: string (nullable = true)
 |-- OnboardedDate: date (nullable = true)



# Custom/ User defined schema using DDL String method

In [0]:
%fs head /FileStore/tables/part_00000-1

In [0]:
order_schema = "order_id int, order_date date, order_customer_id int, order_status string"

In [0]:
orders_df = (spark.read
             .format("csv")
             .schema(order_schema)
             .load("/FileStore/tables/part_00000-1")
             
             .toDF("order_id","order_date","order_customer_id","order_status"))

orders_df.show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       1|2013-07-25|            11599|         CLOSED|
|       2|2013-07-25|              256|PENDING_PAYMENT|
|       3|2013-07-25|            12111|       COMPLETE|
|       4|2013-07-25|             8827|         CLOSED|
|       5|2013-07-25|            11318|       COMPLETE|
|       6|2013-07-25|             7130|       COMPLETE|
|       7|2013-07-25|             4530|       COMPLETE|
|       8|2013-07-25|             2911|     PROCESSING|
|       9|2013-07-25|             5657|PENDING_PAYMENT|
|      10|2013-07-25|             5648|PENDING_PAYMENT|
|      11|2013-07-25|              918| PAYMENT_REVIEW|
|      12|2013-07-25|             1837|         CLOSED|
|      13|2013-07-25|             9149|PENDING_PAYMENT|
|      14|2013-07-25|             9842|     PROCESSING|
|      15|2013-07-25|             2568|       CO

In [0]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)

