# File Reading

In [0]:
path = "dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/flight_data.csv"

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(),True),
    StructField("count",StringType(),True)
])

In [0]:
flight_df = spark.read.format('csv').option('header','false').option('skipRows',1).schema(schema).load(path)

In [0]:
flight_df.show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
flight_df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: string (nullable = true)



In [0]:
%fs
ls /FileStore/shared_uploads/riteshojha2002@gmail.com/

path,name,size,modificationTime
dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/2015_summary.csv,2015_summary.csv,7080,1715526321000
dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/HR_Gender_Diversity___Equality.csv,HR_Gender_Diversity___Equality.csv,114732,1713266514000
dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/San_Martin_Stores_2021_New.xlsx,San_Martin_Stores_2021_New.xlsx,1455778,1713266475000
dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/flight_data.csv,flight_data.csv,7080,1715526353000


# Handling Corrupt Files


### Failfest
- fail execution if mailfomed records in dataset
### Dropmal fomed
- Drop corrupted records
### Permissive
- Set null values to all corrupted fields

In [0]:
corrupt_schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(),True),
    StructField("count",StringType(),True),
    StructField("_corrupt_records",StringType(),True)
])

In [0]:
# Handling Corrupt Files

flight_df_corrupt = spark.read.format('csv')\
.option('header','false')\
.option("badRecordsPath","dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/")
.schema(corrupt_schema).load(path)\

# Bad record file in Json format


# Write file in spark

- Append
- Overwrite

In [0]:
df.write.format('csv')\
    .option("header","True")\
    .option("mode","overwrite")\
    .option("path","dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/write/")
    .save()

In [0]:
# Partitioning
df.repartition(3).write.format('csv')\
    .option("header","True")\
    .option("mode","overwrite")\
    .option("path","dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/write/")
    .save()

# Partition and Bucket
- Partition used when we have columns of categorys like gender, country, city
- Bucket created when we not able to create partitions.

In [0]:
df.write.format('csv')\
    .option("header","True")\
    .option("mode","overwrite")\
    .option("path","dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/write/")\
    .partitionBy(Column-Name)\
    .save()

In [0]:
dbutils.fs.ls("dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/write/")

In [0]:
df.write.format('csv')\
    .option("header","True")\
    .option("mode","overwrite")\
    .option("path","dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/write/")\
    .bucketBy(3,"ID")\
    # .save() # save not work in bucket
    .saveAsTable('table_name')


# Create Dataframe in Spark

In [0]:
data = [
    (1,2),
    (2,1),
    (3,1),
    (4,2),
    (5,1),
    (6,2),
    (7,2)
]

In [0]:
schema = ['id','num']

In [0]:
df = spark.createDataFrame(data=data,schema=schema)
df.show()

# Transformations

## Flight df

In [0]:
from pyspark.sql.functions import col, expr, lit, concat

In [0]:
flight_df.show(1)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
+-----------------+-------------------+-----+
only showing top 1 row



In [0]:
flight_df.select("DEST_COUNTRY_NAME").show(5)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
|    United States|
|            Egypt|
|    United States|
+-----------------+
only showing top 5 rows



In [0]:
flight_df.select(col("DEST_COUNTRY_NAME")).show(5)

+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
|    United States|
|            Egypt|
|    United States|
+-----------------+
only showing top 5 rows



In [0]:
# Expression - Write SQL statements

flight_df.select(expr("count +5")).show(5)

+-----------+
|(count + 5)|
+-----------+
|       20.0|
|        6.0|
|      349.0|
|       20.0|
|       67.0|
+-----------+
only showing top 5 rows



In [0]:
flight_df.select(col("count")+5).show(5)

+-----------+
|(count + 5)|
+-----------+
|       20.0|
|        6.0|
|      349.0|
|       20.0|
|       67.0|
+-----------+
only showing top 5 rows



In [0]:
# Four ways - String way , Col Way, Pandas Way, SQl way

flight_df.select("DEST_COUNTRY_NAME",col("DEST_COUNTRY_NAME"),flight_df['DEST_COUNTRY_NAME'],flight_df.DEST_COUNTRY_NAME).show(3)

+-----------------+-----------------+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+-----------------+-----------------+
|    United States|    United States|    United States|    United States|
|    United States|    United States|    United States|    United States|
|    United States|    United States|    United States|    United States|
+-----------------+-----------------+-----------------+-----------------+
only showing top 3 rows



In [0]:
flight_df.select(expr("DEST_COUNTRY_NAME as Destination")).show(3)

+-------------+
|  Destination|
+-------------+
|United States|
|United States|
|United States|
+-------------+
only showing top 3 rows



In [0]:
flight_df.filter((col("DEST_COUNTRY_NAME") == "India") | (col("DEST_COUNTRY_NAME")=="Egypt")).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|            Egypt|      United States|   15|
|            India|      United States|   61|
+-----------------+-------------------+-----+



In [0]:
flight_df.where(col("DEST_COUNTRY_NAME") == "United States").show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|    United States|              India|   62|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 5 rows



In [0]:
# literal - Use to assign default values
flight_df.select("*",lit("Air India").alias("Company")).show(5)


+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|  Company|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|Air India|
|    United States|            Croatia|    1|Air India|
|    United States|            Ireland|  344|Air India|
|            Egypt|      United States|   15|Air India|
|    United States|              India|   62|Air India|
+-----------------+-------------------+-----+---------+
only showing top 5 rows



In [0]:
# With Column - Update or Create New Column

flight_df.withColumn()

## Hr Df

In [0]:
hr_df = spark.read.format('csv').option('header','True').option('inferschema','True').load("dbfs:/FileStore/shared_uploads/riteshojha2002@gmail.com/HR_Gender_Diversity___Equality.csv")

In [0]:
hr_df.printSchema()

root
 |-- Employee ID: integer (nullable = true)
 |-- DOB: date (nullable = true)
 |-- Emp Age: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- Hire Date: date (nullable = true)
 |-- Leave Date: date (nullable = true)
 |-- Leave Reason: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Employee Satisfaction: integer (nullable = true)
 |-- Annual Salary ($): integer (nullable = true)
 |-- Bonus ($): integer (nullable = true)
 |-- Total Compensation: integer (nullable = true)
 |-- Job Title: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Manager (Y/N): string (nullable = true)
 |-- Performance: string (nullable = true)



In [0]:
hr_df.withColumn("Full Name",concat("Name","Surname")).show(5)

+-----------+----------+-------+---------+-------+------+--------------+------------+----------+----------+------------+------+----------+---------------------+-----------------+---------+------------------+--------------------+---------------+-------------+-----------+--------------+
|Employee ID|       DOB|Emp Age|  Surname|   Name|Gender|Marital Status|      Branch| Hire Date|Leave Date|Leave Reason|Status|Department|Employee Satisfaction|Annual Salary ($)|Bonus ($)|Total Compensation|           Job Title|Job Description|Manager (Y/N)|Performance|     Full Name|
+-----------+----------+-------+---------+-------+------+--------------+------------+----------+----------+------------+------+----------+---------------------+-----------------+---------+------------------+--------------------+---------------+-------------+-----------+--------------+
|      10005|1953-10-28|     69|  Jackson| Noelia|     F|        Single|Buenos Aires|2014-01-25|      null|Not provided|Active|Production|    

In [0]:
new_df = hr_df.withColumnRenamed("Employee ID","ID")
new_df.show(5)

+-----+----------+-------+---------+-------+------+--------------+------------+----------+----------+------------+------+----------+---------------------+-----------------+---------+------------------+--------------------+---------------+-------------+-----------+
|   ID|       DOB|Emp Age|  Surname|   Name|Gender|Marital Status|      Branch| Hire Date|Leave Date|Leave Reason|Status|Department|Employee Satisfaction|Annual Salary ($)|Bonus ($)|Total Compensation|           Job Title|Job Description|Manager (Y/N)|Performance|
+-----+----------+-------+---------+-------+------+--------------+------------+----------+----------+------------+------+----------+---------------------+-----------------+---------+------------------+--------------------+---------------+-------------+-----------+
|10005|1953-10-28|     69|  Jackson| Noelia|     F|        Single|Buenos Aires|2014-01-25|      null|Not provided|Active|Production|                    3|            43241|     4757|             47998|Prod

In [0]:
hr_df.withColumn("Emp Age",col("Emp Age").cast("string")).printSchema()

root
 |-- Employee ID: integer (nullable = true)
 |-- DOB: date (nullable = true)
 |-- Emp Age: string (nullable = true)
 |-- Surname: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- Branch: string (nullable = true)
 |-- Hire Date: date (nullable = true)
 |-- Leave Date: date (nullable = true)
 |-- Leave Reason: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Employee Satisfaction: integer (nullable = true)
 |-- Annual Salary ($): integer (nullable = true)
 |-- Bonus ($): integer (nullable = true)
 |-- Total Compensation: integer (nullable = true)
 |-- Job Title: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Manager (Y/N): string (nullable = true)
 |-- Performance: string (nullable = true)



# Spark SQL

In [0]:
flight_df.createOrReplaceTempView("Flight_view")

In [0]:
%sql
select * from Flight_view

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62
United States,Singapore,1
United States,Grenada,62
Costa Rica,United States,588
Senegal,United States,40
Moldova,United States,1


In [0]:
spark.sql("select * from {flight_df}",flight_df = flight_df).show(5)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
|    United States|            Ireland|  344|
|            Egypt|      United States|   15|
|    United States|              India|   62|
+-----------------+-------------------+-----+
only showing top 5 rows

