# PySpark DataFrame Exercise Materials.

### Import Lib

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
spark = (SparkSession.builder.appName("PySparkExercise").getOrCreate())

### Dataset Person

In [3]:
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

In [4]:
type(data)

list

### Define Schema

In PySpark, we can define the schema for a DataFrame using the pyspark.sql.types module. To define a schema with specific column names, data types, and nullability constraints.

We define the schema using StructType and specify the columns using StructField. The first argument of StructField is the column name, the second argument is the data type (StringType for string columns, IntegerType for integer columns), and the third argument specifies the nullability (True for nullable, False for non-nullable).

We can create a DataFrame based on this schema by passing an empty list as the data argument to createDataFrame(). The resulting DataFrame will have the specified column names, data types, and nullability constraints.

By printing the schema using df.printSchema(), we can verify that the DataFrame has been created with the defined schema.

Please note that the nullability constraint (False in this case) is only enforced when writing data to the DataFrame or performing certain operations. It does not prevent the DataFrame from being created or modified initially.

In [5]:
schema = StructType([
    StructField("firstName",StringType(), True),
    StructField("midName",StringType(), True),
    StructField("lastName",StringType(), True),
    StructField("id",StringType(), True),
    StructField("gender",StringType(), True),
    StructField("salary",IntegerType(), True)
])

In [6]:
df = spark.createDataFrame(data=data, schema=schema)

#### Checking Schema and dataTypes

In [7]:
df.printSchema()

root
 |-- firstName: string (nullable = true)
 |-- midName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



### Checking the Data

In PySpark, we can use the **df.show()** method to display the top rows of a DataFrame, which is similar to **df.head()** in pandas. Similarly, you can use the df.show(n=3, truncate=False) method to display the last rows of a DataFrame, which is similar to df.tail(n=3) in pandas like showing last 3 of the rows.

In [8]:
df.show(truncate=False)

+---------+-------+--------+-----+------+------+
|firstName|midName|lastName|id   |gender|salary|
+---------+-------+--------+-----+------+------+
|James    |       |Smith   |36636|M     |3000  |
|Michael  |Rose   |        |40288|M     |4000  |
|Robert   |       |Williams|42114|M     |4000  |
|Maria    |Anne   |Jones   |39192|F     |4000  |
|Jen      |Mary   |Brown   |     |F     |-1    |
+---------+-------+--------+-----+------+------+



### DataReader

Reading **fire-incidents.csv** dataset.

In [9]:
file_path = "./testdata/fire-incidents/fire-incidents.csv"

**inferSchema** is set to True to infer the data types automatically. so we don't have to manually like previous in structType.

In [10]:
fire_df = (spark.read.format("csv")
           .option("header",True)
           .option("InferSchema",True)
           .load(file_path))

In [11]:
fire_df.select("IncidentNumber","incidentDate","City").show(10)

+--------------+-------------------+-------------+
|IncidentNumber|       incidentDate|         City|
+--------------+-------------------+-------------+
|      20104668|2020-09-11 00:00:00|San Francisco|
|      20104708|2020-09-11 00:00:00|San Francisco|
|      20104648|2020-09-10 00:00:00|San Francisco|
|      20104598|2020-09-10 00:00:00|San Francisco|
|      20104575|2020-09-10 00:00:00|San Francisco|
|      20104477|2020-09-10 00:00:00|San Francisco|
|      20104443|2020-09-10 00:00:00|San Francisco|
|      20104605|2020-09-10 00:00:00|San Francisco|
|      20104474|2020-09-10 00:00:00|San Francisco|
|      20104652|2020-09-10 00:00:00|San Francisco|
+--------------+-------------------+-------------+
only showing top 10 rows



In [12]:
fire_df.printSchema()

root
 |-- IncidentNumber: integer (nullable = true)
 |-- ExposureNumber: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- IncidentDate: timestamp (nullable = true)
 |-- CallNumber: integer (nullable = true)
 |-- AlarmDtTm: timestamp (nullable = true)
 |-- ArrivalDtTm: timestamp (nullable = true)
 |-- CloseDtTm: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- ZIPCode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- SuppressionUnits: integer (nullable = true)
 |-- SuppressionPersonnel: integer (nullable = true)
 |-- EMSUnits: integer (nullable = true)
 |-- EMSPersonnel: integer (nullable = true)
 |-- OtherUnits: integer (nullable = true)
 |-- OtherPersonnel: integer (nullable = true)
 |-- FirstUnitOnScene: string (nullable = true)
 |-- EstimatedPropertyLoss: integer (nullable = true)
 |-- EstimatedContentsLoss: d

In [13]:
output_path = './data/output/fireincidents'
fire_df.write.format("parquet").mode("overwrite").save(output_path)

## Working with Structured Operations

### Reading a JSON File.

In [14]:
from pyspark.sql.types import ArrayType, FloatType, DateType, BooleanType

In [15]:
person_schema = StructType([
    StructField("id",IntegerType(), True),
    StructField("first_name",StringType(), True),
    StructField("last_name",StringType(), True),
    StructField("fav_movies",ArrayType(StringType()), True),
    StructField("salary",FloatType(), True),
    StructField("image_url",StringType(), True),
    StructField("date_of_birth",DateType(), True),
    StructField("active",BooleanType(), True)
])

In [16]:
json_path = "./testdata/persondata/persons.json"

In [17]:
persons_df = (spark.read.json(json_path, person_schema, multiLine="True"))

In [18]:
persons_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- fav_movies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- salary: float (nullable = true)
 |-- image_url: string (nullable = true)
 |-- date_of_birth: date (nullable = true)
 |-- active: boolean (nullable = true)



In [19]:
persons_df.show(10, truncate=False)

+---+----------+---------+-------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|id |first_name|last_name|fav_movies                                                   |salary |image_url                                      |date_of_birth|active|
+---+----------+---------+-------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|1  |Drucy     |Poppy    |[I giorni contati]                                           |1463.36|http://dummyimage.com/126x166.png/cc0000/ffffff|1991-02-16   |true  |
|2  |Emelyne   |Blaza    |[Musketeer, The, Topralli]                                   |3006.04|http://dummyimage.com/158x106.bmp/cc0000/ffffff|1991-11-02   |false |
|3  |Max       |Rettie   |[The Forgotten Space, Make It Happen]                        |1422.88|http://dummyimage.com/237x140.jpg/ff4444/ffffff|1990-03-03   |false |
|4  

## Columns and Expression

In [20]:
from pyspark.sql.functions import col, expr

### Selecting Multiple columns

Select expression is similar to expression in SQL way like :

SELECT a,b FROM db

and also take note a Similiarity pandas and pyspark to selecting columns, Using PySpark:

df = df_test.select("a", "b")

Using pandas:

df = df_test.loc[:, ["a", "b"]]


In [21]:
persons_df.select("first_name","last_name","date_of_birth").show(10)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
|    Oswald| Petrolli|   1986-09-02|
|    Adrian|   Clarey|   1971-08-24|
|  Dominica|  Goodnow|   1973-08-27|
|     Emory|  Slocomb|   1974-06-08|
|  Jeremias|     Bode|   1997-08-02|
+----------+---------+-------------+
only showing top 10 rows



in same way we able to do with this :

In [22]:
persons_df.select(col("first_name"),col("last_name"),col("date_of_birth")).show(10)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
|    Oswald| Petrolli|   1986-09-02|
|    Adrian|   Clarey|   1971-08-24|
|  Dominica|  Goodnow|   1973-08-27|
|     Emory|  Slocomb|   1974-06-08|
|  Jeremias|     Bode|   1997-08-02|
+----------+---------+-------------+
only showing top 10 rows



and additionally we also able do with this :

In [23]:
persons_df.select(expr("first_name"),expr("last_name"),expr("date_of_birth")).show(10)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
|    Oswald| Petrolli|   1986-09-02|
|    Adrian|   Clarey|   1971-08-24|
|  Dominica|  Goodnow|   1973-08-27|
|     Emory|  Slocomb|   1974-06-08|
|  Jeremias|     Bode|   1997-08-02|
+----------+---------+-------------+
only showing top 10 rows



Now what is the difference??

In [24]:
from pyspark.sql.functions import concat_ws

In [25]:
persons_df.select(concat_ws(" ",col("first_name"),col("last_name")).alias("full_name"),
                  col("salary"),
                  (col("salary") * 0.10 + col("salary")).alias("salary_increase")).show(10)

+----------------+-------+------------------+
|       full_name| salary|   salary_increase|
+----------------+-------+------------------+
|     Drucy Poppy|1463.36|1609.6959838867188|
|   Emelyne Blaza|3006.04|  3306.64404296875|
|      Max Rettie|1422.88|1565.1680053710938|
|     Ilario Kean|3561.36|3917.4961181640624|
|    Toddy Drexel|4934.87|  5428.35712890625|
| Oswald Petrolli|1153.23| 1268.552978515625|
|   Adrian Clarey|1044.73| 1149.202978515625|
|Dominica Goodnow|1147.76|1262.5360107421875|
|   Emory Slocomb|1082.11|1190.3209838867188|
|   Jeremias Bode|3472.63|  3819.89287109375|
+----------------+-------+------------------+
only showing top 10 rows



In [26]:
persons_df.select(concat_ws(" ",col("first_name"),col("last_name")).alias("full_name"),
                  col("salary"),
                  (expr("salary * 0.10 + salary")).alias("salary_increase")).show(10)

+----------------+-------+------------------+
|       full_name| salary|   salary_increase|
+----------------+-------+------------------+
|     Drucy Poppy|1463.36|1609.6959838867188|
|   Emelyne Blaza|3006.04|  3306.64404296875|
|      Max Rettie|1422.88|1565.1680053710938|
|     Ilario Kean|3561.36|3917.4961181640624|
|    Toddy Drexel|4934.87|  5428.35712890625|
| Oswald Petrolli|1153.23| 1268.552978515625|
|   Adrian Clarey|1044.73| 1149.202978515625|
|Dominica Goodnow|1147.76|1262.5360107421875|
|   Emory Slocomb|1082.11|1190.3209838867188|
|   Jeremias Bode|3472.63|  3819.89287109375|
+----------------+-------+------------------+
only showing top 10 rows



**The difference is with expr we are able to set whole expression in the bracket quotes, in col must define for each column name suppose we want to add two columns then when using expr is expr("col1 + col2") whereas in col is : (col("col_name") + col("col_name")).

## Filter and Where Condition

### Filter one condition

In [27]:
persons_df.filter(col("salary") > 3000).show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
|  2|   Emelyne|    Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|
|  4|    Ilario|     Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|
|  5|     Toddy|   Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|
| 10|  Jeremias|     Bode|[Farewell to Arms...|3472.63|http://dummyimage...|   1997-08-02|  true|
| 14|   Ambrosi| Vidineev|[Wall Street: Mon...|4550.88|http://dummyimage...|   1989-07-20|  true|
| 18|     Alfie| Hatliffe|     [Lord of Tears]| 3893.1|http://dummyimage...|   1989-06-21|  true|
| 19|      Lura|   Follis|[My Life in Pink ...|3331.26|http://dummyimage...|   1998-11-03| false|
| 20|      Maxi|    

In [28]:
persons_df.where(col("salary") > 3000).show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
|  2|   Emelyne|    Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|
|  4|    Ilario|     Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|
|  5|     Toddy|   Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|
| 10|  Jeremias|     Bode|[Farewell to Arms...|3472.63|http://dummyimage...|   1997-08-02|  true|
| 14|   Ambrosi| Vidineev|[Wall Street: Mon...|4550.88|http://dummyimage...|   1989-07-20|  true|
| 18|     Alfie| Hatliffe|     [Lord of Tears]| 3893.1|http://dummyimage...|   1989-06-21|  true|
| 19|      Lura|   Follis|[My Life in Pink ...|3331.26|http://dummyimage...|   1998-11-03| false|
| 20|      Maxi|    

### Filter multiple condition.

In [29]:
persons_df.where((col("salary") > 3000) & (col("active") == True)).show(10)

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
|  4|    Ilario|       Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|
|  5|     Toddy|     Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|
| 10|  Jeremias|       Bode|[Farewell to Arms...|3472.63|http://dummyimage...|   1997-08-02|  true|
| 14|   Ambrosi|   Vidineev|[Wall Street: Mon...|4550.88|http://dummyimage...|   1989-07-20|  true|
| 18|     Alfie|   Hatliffe|     [Lord of Tears]| 3893.1|http://dummyimage...|   1989-06-21|  true|
| 21|      Dian|      Dancy|[Double, Double, ...| 3720.3|http://dummyimage...|   1998-12-01|  true|
| 25|     Kelcy|     Wogdon|    [Iron Mask, The]|4512.51|http://dummyimage...|   2000-10-20|  true|


In [30]:
persons_df.filter((col("salary") > 3000) & (col("active") == True)).show(10)


+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
|  4|    Ilario|       Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|
|  5|     Toddy|     Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|
| 10|  Jeremias|       Bode|[Farewell to Arms...|3472.63|http://dummyimage...|   1997-08-02|  true|
| 14|   Ambrosi|   Vidineev|[Wall Street: Mon...|4550.88|http://dummyimage...|   1989-07-20|  true|
| 18|     Alfie|   Hatliffe|     [Lord of Tears]| 3893.1|http://dummyimage...|   1989-06-21|  true|
| 21|      Dian|      Dancy|[Double, Double, ...| 3720.3|http://dummyimage...|   1998-12-01|  true|
| 25|     Kelcy|     Wogdon|    [Iron Mask, The]|4512.51|http://dummyimage...|   2000-10-20|  true|


In [31]:
from pyspark.sql.functions import year

In [32]:
persons_df.where((year("date_of_birth") == 2000) | (year("date_of_birth") == 1989)).show(10)

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| 14|   Ambrosi|   Vidineev|[Wall Street: Mon...|4550.88|http://dummyimage...|   1989-07-20|  true|
| 15|    Feodor|Nancekivell|   [Monsoon Wedding]|2218.46|http://dummyimage...|   2000-10-07| false|
| 18|     Alfie|   Hatliffe|     [Lord of Tears]| 3893.1|http://dummyimage...|   1989-06-21|  true|
| 25|     Kelcy|     Wogdon|    [Iron Mask, The]|4512.51|http://dummyimage...|   2000-10-20|  true|
| 32|      Redd|   Akenhead|[Century of the D...| 2470.9|http://dummyimage...|   2000-06-05| false|
| 34|     Davis|      Pinks|          [Hounddog]|1337.14|http://dummyimage...|   1989-07-27|  true|
| 61|    Shanna|    Samples|[Thomas in Love (...| 2703.0|http://dummyimage...|   1989-07-07| false|


### Filter Array value.

In [33]:
from pyspark.sql.functions import array_contains

In [34]:
persons_df.where(array_contains(col("fav_movies"),"Land of the Lost")).show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| 11|   Timothy|   Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+



## Distinct, Drop Duplicates, Order By

In [35]:
from pyspark.sql.functions import count, desc

In [36]:
persons_df.select(col("active")).show(10)

+------+
|active|
+------+
|  true|
| false|
| false|
|  true|
|  true|
| false|
| false|
| false|
|  true|
|  true|
+------+
only showing top 10 rows



### Distinct function

In [37]:
persons_df.select(col("active")).distinct().show(10)

+------+
|active|
+------+
|  true|
| false|
+------+



### Drop Duplicate

In [38]:
(persons_df.select(col("first_name"), 
           year(col("date_of_birth")).alias("year"),
           col("active")).orderBy("year","first_name")).show(10)

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Adrian|1971| false|
|   Feodora|1971|  true|
|       Sky|1971| false|
|   Timothy|1971| false|
|    Lucita|1972|  true|
|      Rodi|1972| false|
|  Sherline|1972|  true|
|     Toddy|1972|  true|
|  Dominica|1973| false|
|    Kelila|1973|  true|
+----------+----+------+
only showing top 10 rows



** Drop duplicate which means corresponding year only will have matched with distinctive value of active, suppose year has value : 1990, 1991, 1992, 1993, 1994, 1995 and active : true, false, then for each value of year 1990 will have matched with true and false only example : 1990, true and 1990, false. and then continue to respective value in year.

In [39]:
dropped_df = persons_df.select(col("first_name"), 
           year(col("date_of_birth")).alias("year"),
           col("active")).dropDuplicates(["year","active"]).orderBy("year","first_name")

in here first_name of Sky, Toddy, .. etc is removed since their year and active is repeated from previous value sorted by first_name order.

In [40]:
dropped_df.show(30)

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Adrian|1971| false|
|   Feodora|1971|  true|
|      Rodi|1972| false|
|  Sherline|1972|  true|
|  Dominica|1973| false|
|    Kelila|1973|  true|
|   Balduin|1974| false|
|     Emory|1974|  true|
|    Janean|1975|  true|
|       Bev|1976|  true|
| Franciska|1976| false|
|     Johny|1977| false|
|    Daveta|1978| false|
|   Guthrie|1978|  true|
|      Maxi|1979| false|
|   Melinda|1979|  true|
|    Carter|1980| false|
|   Loralyn|1980|  true|
|     Clive|1981|  true|
|   Leanora|1981| false|
| Franciska|1982| false|
|     Trace|1982|  true|
|    Cynthy|1983| false|
|        El|1984|  true|
|  Thorvald|1984| false|
|   Hillyer|1985|  true|
|       Rem|1985| false|
|     Nahum|1986|  true|
|    Oswald|1986| false|
|    Ilario|1987|  true|
+----------+----+------+
only showing top 30 rows



## Rows and Union

### Create a New rows to be Unioned.

In [41]:
from pyspark.sql import Row

In [42]:
person_row_list = [Row(101, "Robert", "Ownes", ["Men in Black III", "Home Alone"], 4300.64, "http//someimages.com", "1964-08-18", True), 
                    Row(102, "Sara", "Devine", ["Men in Black III", "Home Alone"], 4300.64, "http//someimages.com", "1964-08-18", True)]

In [43]:
person_row_list

[<Row(101, 'Robert', 'Ownes', ['Men in Black III', 'Home Alone'], 4300.64, 'http//someimages.com', '1964-08-18', True)>,
 <Row(102, 'Sara', 'Devine', ['Men in Black III', 'Home Alone'], 4300.64, 'http//someimages.com', '1964-08-18', True)>]

** Noted that The createDataFrame method expects either a list of tuples or a list of lists to create a DataFrame.

In [44]:
new_persons_df = spark.createDataFrame(person_row_list, ["id","first_name", "last_name", "fav_movies", "salary", "image_url", "date_of_birth", "active"])

In [45]:
new_persons_df.show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
|101|    Robert|    Ownes|[Men in Black III...|4300.64|http//someimages.com|   1964-08-18|  true|
|102|      Sara|   Devine|[Men in Black III...|4300.64|http//someimages.com|   1964-08-18|  true|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+



In [46]:
add_persons_df = persons_df.union(new_persons_df)

In [47]:
add_persons_df.sort(desc("id")).show(10)

+---+----------+---------+--------------------+------------------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies|            salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+------------------+--------------------+-------------+------+
|102|      Sara|   Devine|[Men in Black III...|           4300.64|http//someimages.com|   1964-08-18|  true|
|101|    Robert|    Ownes|[Men in Black III...|           4300.64|http//someimages.com|   1964-08-18|  true|
|100|    Virgie| Domanski|[Horseman, The, S...| 2165.929931640625|http://dummyimage...|   2002-01-05|  true|
| 99|   Rozalie|   Wannop|[Suddenly, The No...|1259.6400146484375|http://dummyimage...|   1997-03-25| false|
| 98|     Davin|     Labb|[Viva Riva!, Kill...| 1452.739990234375|http://dummyimage...|   1988-01-27|  true|
| 97|      Rodi|   Farnan|[Code, The (Menta...|   2325.8798828125|http://dummyimage...|   1972-01-04| false|
| 96|       Dew| Co

## Adding, Renaming, and Dropping Columns.

In [48]:
from pyspark.sql.functions import round

### Adding new columns Salary Incremental.

In [49]:
aug_persons_df1 = persons_df.withColumn("salary_increase", col("salary") * 0.10 + col("salary"))

In [50]:
aug_persons_df1.show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+------------------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|   salary_increase|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+------------------+
|  1|     Drucy|    Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|1609.6959838867188|
|  2|   Emelyne|    Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|  3306.64404296875|
|  3|       Max|   Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|1565.1680053710938|
|  4|    Ilario|     Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|3917.4961181640624|
|  5|     Toddy|   Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|  5428.35712890625|
|  6|    Oswald| Petrolli|[Wing and the Thi...|1153.23|http://du

In [51]:
aug_persons_df1.columns

['id',
 'first_name',
 'last_name',
 'fav_movies',
 'salary',
 'image_url',
 'date_of_birth',
 'active',
 'salary_increase']

### Renaming Column.

In [52]:
aug_persons_df2 = (aug_persons_df1
                  .withColumn("birth_year", year("date_of_birth"))
                  .withColumnRenamed("fav_movies","movies")
                  .withColumn("salaryX10", round(col("salary_increase"),2))
                  .drop("salary_increase"))

** Adding "birth_year" column, renaming fav movies to movies, add new column "salaryX10" which round 2 decimals of "salary_increas", and drop the current "saalry_increase" column.

In [53]:
aug_persons_df2.show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+----------+---------+
| id|first_name|last_name|              movies| salary|           image_url|date_of_birth|active|birth_year|salaryX10|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+----------+---------+
|  1|     Drucy|    Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|      1991|   1609.7|
|  2|   Emelyne|    Blaza|[Musketeer, The, ...|3006.04|http://dummyimage...|   1991-11-02| false|      1991|  3306.64|
|  3|       Max|   Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|      1990|  1565.17|
|  4|    Ilario|     Kean|[Up Close and Per...|3561.36|http://dummyimage...|   1987-06-09|  true|      1987|   3917.5|
|  5|     Toddy|   Drexel|[Walk in the Clou...|4934.87|http://dummyimage...|   1992-10-28|  true|      1992|  5428.36|
|  6|    Oswald| Petrolli|[Wing and the Thi...|1

## Working with Missing or Bad Data

In [54]:
bad_movies_list = [Row(None, None, None),
                Row(None, None, 2020),
                Row("John Doe", "Awesome movie", None),
                Row(None, "Awesome movie", 2021),
                Row("Mary jane", None, 2019),
                Row("Vikter Duplaix", "Not another teen movie", 2001)]

In [57]:
bad_movies_columns = ["Actor", "Title", "Year"]

In [61]:
bad_movies_df = spark.createDataFrame(data= bad_movies_list, schema= bad_movies_columns)

In [62]:
bad_movies_df.show()

+--------------+--------------------+----+
|         Actor|               Title|Year|
+--------------+--------------------+----+
|          null|                null|null|
|          null|                null|2020|
|      John Doe|       Awesome movie|null|
|          null|       Awesome movie|2021|
|     Mary jane|                null|2019|
|Vikter Duplaix|Not another teen ...|2001|
+--------------+--------------------+----+



In [63]:
bad_movies_df.printSchema()

root
 |-- Actor: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Year: long (nullable = true)



### Dropping NA 

In Apache Spark's DataFrame drop() function, the drop method can accept one or more parameters. Here's an overview of the parameters commonly used with the drop() function:

col(s) (string or list of strings): Specifies the name(s) of the column(s) to drop from the DataFrame. It can be a single column name as a string or a list of column names.

subset (string or list of strings): Specifies the name(s) of the column(s) to drop from the DataFrame. It can be a single column name as a string or a list of column names. This parameter is an alias for the col(s) parameter and can be used interchangeably.

how (string): Specifies the drop behavior. It accepts the following values:

"any": Drops a row if it contains at least one null or NaN value in any of the specified columns (default behavior).

"all": Drops a row only if all the specified columns have null or NaN values.

#### Dropping any NULL Rows.

In [64]:
bad_movies_df.na.drop().show()

+--------------+--------------------+----+
|         Actor|               Title|Year|
+--------------+--------------------+----+
|Vikter Duplaix|Not another teen ...|2001|
+--------------+--------------------+----+



In [65]:
bad_movies_df.na.drop("any").show()

+--------------+--------------------+----+
|         Actor|               Title|Year|
+--------------+--------------------+----+
|Vikter Duplaix|Not another teen ...|2001|
+--------------+--------------------+----+



#### Dropping Rows which all columns is NULL

In [66]:
bad_movies_df.na.drop("all").show()

+--------------+--------------------+----+
|         Actor|               Title|Year|
+--------------+--------------------+----+
|          null|                null|2020|
|      John Doe|       Awesome movie|null|
|          null|       Awesome movie|2021|
|     Mary jane|                null|2019|
|Vikter Duplaix|Not another teen ...|2001|
+--------------+--------------------+----+



#### Using filter to Show NOT NULL Columns

can use the filter() function to filter rows in a DataFrame based on certain conditions, such as checking if a column is not null.

In [68]:
bad_movies_df.filter(col("Actor").isNotNull()).show()

+--------------+--------------------+----+
|         Actor|               Title|Year|
+--------------+--------------------+----+
|      John Doe|       Awesome movie|null|
|     Mary jane|                null|2019|
|Vikter Duplaix|Not another teen ...|2001|
+--------------+--------------------+----+



In [69]:
bad_movies_df.filter(col("Actor").isNull() == False).show()

+--------------+--------------------+----+
|         Actor|               Title|Year|
+--------------+--------------------+----+
|      John Doe|       Awesome movie|null|
|     Mary jane|                null|2019|
|Vikter Duplaix|Not another teen ...|2001|
+--------------+--------------------+----+



## Working User Defined Function.

In [70]:
from pyspark.sql.functions import udf

In [71]:
students_list = [Row("John", 80),
                Row("Mary", 70),
                Row("Jane",50)]

In [72]:
students_columns = ["Name", "Mark"]

In [73]:
students_df = spark.createDataFrame(data=students_list, schema=students_columns)

In [74]:
students_df.show()

+----+----+
|Name|Mark|
+----+----+
|John|  80|
|Mary|  70|
|Jane|  50|
+----+----+



### Define a Function

In [82]:
def gradeStudents(score:int):
    grade = ''
    if(score >= 80):
        grade='Excelent'
    elif(score>=70):
        grade='Average'
    else:
        grade='Poor'
    return grade

Created a UDF using udf() and specified the function gradeStudents.

In [83]:
letterGradeUDF = udf(gradeStudents)

In [86]:
students_df.select(col("Name"),col("Mark"), letterGradeUDF(col("Mark")).alias("Grade")).show()

+----+----+--------+
|Name|Mark|   Grade|
+----+----+--------+
|John|  80|Excelent|
|Mary|  70| Average|
|Jane|  50|    Poor|
+----+----+--------+



## Aggregations

In [87]:
flights_file = "./testdata/flights/flight-summary.csv"
flights_summary_df = (spark.read.format("csv")
                      .option("header","true")
                      .option("inferSchema","true")
                      .load(flights_file))

In [88]:
flights_summary_df.show()

+-----------+--------------------+---------------+------------+---------+--------------------+----------------+----------+-----+
|origin_code|      origin_airport|    origin_city|origin_state|dest_code|        dest_airport|       dest_city|dest_state|count|
+-----------+--------------------+---------------+------------+---------+--------------------+----------------+----------+-----+
|        BQN|Rafael Hernández ...|      Aguadilla|          PR|      MCO|Orlando Internati...|         Orlando|        FL|  441|
|        PHL|Philadelphia Inte...|   Philadelphia|          PA|      MCO|Orlando Internati...|         Orlando|        FL| 4869|
|        MCI|Kansas City Inter...|    Kansas City|          MO|      IAH|George Bush Inter...|         Houston|        TX| 1698|
|        SPI|Abraham Lincoln C...|    Springfield|          IL|      ORD|Chicago O'Hare In...|         Chicago|        IL|  998|
|        SNA|John Wayne Airpor...|      Santa Ana|          CA|      PHX|Phoenix Sky Harbo...|   

In [90]:
flights_summary_df.printSchema()

root
 |-- origin_code: string (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- origin_state: string (nullable = true)
 |-- dest_code: string (nullable = true)
 |-- dest_airport: string (nullable = true)
 |-- dest_city: string (nullable = true)
 |-- dest_state: string (nullable = true)
 |-- count: integer (nullable = true)



In [93]:
flights_summary_df = flights_summary_df.withColumnRenamed("count","flights_count")

In [94]:
flights_summary_df.show()

+-----------+--------------------+---------------+------------+---------+--------------------+----------------+----------+-------------+
|origin_code|      origin_airport|    origin_city|origin_state|dest_code|        dest_airport|       dest_city|dest_state|flights_count|
+-----------+--------------------+---------------+------------+---------+--------------------+----------------+----------+-------------+
|        BQN|Rafael Hernández ...|      Aguadilla|          PR|      MCO|Orlando Internati...|         Orlando|        FL|          441|
|        PHL|Philadelphia Inte...|   Philadelphia|          PA|      MCO|Orlando Internati...|         Orlando|        FL|         4869|
|        MCI|Kansas City Inter...|    Kansas City|          MO|      IAH|George Bush Inter...|         Houston|        TX|         1698|
|        SPI|Abraham Lincoln C...|    Springfield|          IL|      ORD|Chicago O'Hare In...|         Chicago|        IL|          998|
|        SNA|John Wayne Airpor...|      S

### Count

In [101]:
flights_summary_df.select(count(col("origin_airport")), count("dest_airport")).show()

+---------------------+-------------------+
|count(origin_airport)|count(dest_airport)|
+---------------------+-------------------+
|                 4693|               4693|
+---------------------+-------------------+



In [97]:
bad_movies_df.show()

+--------------+--------------------+----+
|         Actor|               Title|Year|
+--------------+--------------------+----+
|          null|                null|null|
|          null|                null|2020|
|      John Doe|       Awesome movie|null|
|          null|       Awesome movie|2021|
|     Mary jane|                null|2019|
|Vikter Duplaix|Not another teen ...|2001|
+--------------+--------------------+----+



In [99]:
bad_movies_df.select(count(col("Actor")), count(col("Title")), count(col("Year")), count("*")).show()

+------------+------------+-----------+--------+
|count(Actor)|count(Title)|count(Year)|count(1)|
+------------+------------+-----------+--------+
|           3|           3|          4|       6|
+------------+------------+-----------+--------+



Counting is process count Non Null elements in the corresponding columns, but using count("*") will count all value including NULL.

### Count Distinct

In [102]:
from pyspark.sql.functions import col, countDistinct

In [106]:
flights_summary_df.select(countDistinct(col("origin_airport")), countDistinct("dest_airport"), count("*").alias("Total Columns")).show()

+------------------------------+----------------------------+-------------+
|count(DISTINCT origin_airport)|count(DISTINCT dest_airport)|Total Columns|
+------------------------------+----------------------------+-------------+
|                           322|                         322|         4693|
+------------------------------+----------------------------+-------------+



### SUM, AVG, MAX, MIN

In [107]:
from pyspark.sql.functions import min, max, avg, sum, sumDistinct

In [110]:
flights_summary_df.select(min(col("flights_count")), max(col("flights_count"))).show()

+------------------+------------------+
|min(flights_count)|max(flights_count)|
+------------------+------------------+
|                 1|             13744|
+------------------+------------------+



In [111]:
flights_summary_df.select(avg(col("flights_count")), sum(col("flights_count"))/count(col("flights_count"))).show()

+------------------+-------------------------------------------+
|avg(flights_count)|(sum(flights_count) / count(flights_count))|
+------------------+-------------------------------------------+
|1136.3549968037503|                         1136.3549968037503|
+------------------+-------------------------------------------+



### SUM DISTINCT

In [112]:
students_df.show()

+----+----+
|Name|Mark|
+----+----+
|John|  80|
|Mary|  70|
|Jane|  50|
+----+----+



In [145]:
students_df.select(sum(col("Mark"))).show()

+---------+
|sum(Mark)|
+---------+
|      200|
+---------+



In [115]:
temp_students_df = students_df.union(students_df)

In [116]:
temp_students_df.show()

+----+----+
|Name|Mark|
+----+----+
|John|  80|
|Mary|  70|
|Jane|  50|
|John|  80|
|Mary|  70|
|Jane|  50|
+----+----+



In [118]:
temp_students_df.select(sum(col("Mark")), sumDistinct(col("Mark"))).show()

+---------+------------------+
|sum(Mark)|sum(DISTINCT Mark)|
+---------+------------------+
|      400|               200|
+---------+------------------+



### Aggregating Group

Checking the maximum flight counts from origin_airport.

In [121]:
(flights_summary_df.groupBy("origin_airport").agg(max("flights_count").alias("max_flights_count")).orderBy("max_flights_count",ascending=False)).show(n=5)

+--------------------+-----------------+
|      origin_airport|max_flights_count|
+--------------------+-----------------+
|San Francisco Int...|            13744|
|Los Angeles Inter...|            13457|
|John F. Kennedy I...|            12016|
|McCarran Internat...|             9715|
|LaGuardia Airport...|             9639|
+--------------------+-----------------+
only showing top 5 rows



checking sample maximum value using filter of "San Francisco Intl. Airport".

In [127]:
flights_summary_df.filter(col("origin_airport") == "San Francisco International Airport").orderBy("flights_count", ascending=False).show()

+-----------+--------------------+-------------+------------+---------+--------------------+-----------------+----------+-------------+
|origin_code|      origin_airport|  origin_city|origin_state|dest_code|        dest_airport|        dest_city|dest_state|flights_count|
+-----------+--------------------+-------------+------------+---------+--------------------+-----------------+----------+-------------+
|        SFO|San Francisco Int...|San Francisco|          CA|      LAX|Los Angeles Inter...|      Los Angeles|        CA|        13744|
|        SFO|San Francisco Int...|San Francisco|          CA|      JFK|John F. Kennedy I...|         New York|        NY|         8440|
|        SFO|San Francisco Int...|San Francisco|          CA|      LAS|McCarran Internat...|        Las Vegas|        NV|         7995|
|        SFO|San Francisco Int...|San Francisco|          CA|      ORD|Chicago O'Hare In...|          Chicago|        IL|         7380|
|        SFO|San Francisco Int...|San Francisco|

Sum of flights_count where origin airport is "San Francisco Intl. airport"

In [131]:
sf_flights_df = (flights_summary_df.groupBy("origin_airport").agg(sum("flights_count").alias("sum_flights_count")).orderBy("sum_flights_count",ascending=False))
sf_flights_df.show()

+--------------------+-----------------+
|      origin_airport|sum_flights_count|
+--------------------+-----------------+
|Hartsfield-Jackso...|           346836|
|Chicago O'Hare In...|           285884|
|Dallas/Fort Worth...|           239551|
|Denver Internatio...|           196055|
|Los Angeles Inter...|           194673|
|San Francisco Int...|           148008|
|Phoenix Sky Harbo...|           146815|
|George Bush Inter...|           146622|
|McCarran Internat...|           133181|
|Minneapolis-Saint...|           112117|
|Orlando Internati...|           110982|
|Seattle-Tacoma In...|           110899|
|Detroit Metropoli...|           108500|
|Gen. Edward Lawre...|           107847|
|Newark Liberty In...|           101772|
|Charlotte Douglas...|           100324|
|LaGuardia Airport...|            99605|
|Salt Lake City In...|            97210|
|John F. Kennedy I...|            93811|
|Baltimore-Washing...|            86079|
+--------------------+-----------------+
only showing top

In [133]:
sf_flights_df.filter(col("origin_airport") == "San Francisco International Airport").show()

+--------------------+-----------------+
|      origin_airport|sum_flights_count|
+--------------------+-----------------+
|San Francisco Int...|           148008|
+--------------------+-----------------+



Fast way to query with groupBy - where clause.

In [135]:
(flights_summary_df.groupBy("origin_airport").agg(sum("flights_count").alias("sum_flights_count")).where(col("origin_airport") == "San Francisco International Airport")).show()

+--------------------+-----------------+
|      origin_airport|sum_flights_count|
+--------------------+-----------------+
|San Francisco Int...|           148008|
+--------------------+-----------------+



### Group By multiple columns.

Check for every "San Francisco Intl Airport" total flights to destination.

In [141]:
(flights_summary_df.groupBy("origin_airport","dest_airport")
 .agg(sum("flights_count").alias("sum_flights_count"))
 .where(col("origin_airport") == "San Francisco International Airport")
 .orderBy("sum_flights_count", ascending=False)).show()

+--------------------+--------------------+-----------------+
|      origin_airport|        dest_airport|sum_flights_count|
+--------------------+--------------------+-----------------+
|San Francisco Int...|Los Angeles Inter...|            13744|
|San Francisco Int...|John F. Kennedy I...|             8440|
|San Francisco Int...|McCarran Internat...|             7995|
|San Francisco Int...|Chicago O'Hare In...|             7380|
|San Francisco Int...|Seattle-Tacoma In...|             6932|
|San Francisco Int...|San Diego Interna...|             6917|
|San Francisco Int...|Denver Internatio...|             5066|
|San Francisco Int...|Newark Liberty In...|             5025|
|San Francisco Int...|Phoenix Sky Harbo...|             4856|
|San Francisco Int...|Dallas/Fort Worth...|             4555|
|San Francisco Int...|John Wayne Airpor...|             4314|
|San Francisco Int...|Portland Internat...|             4197|
|San Francisco Int...|Gen. Edward Lawre...|             3907|
|San Fra

In [140]:
(flights_summary_df.groupBy("origin_airport","dest_airport")
 .agg(sum("flights_count").alias("sum_flights_count"))
 .orderBy("sum_flights_count", ascending=False)).show()

+--------------------+--------------------+-----------------+
|      origin_airport|        dest_airport|sum_flights_count|
+--------------------+--------------------+-----------------+
|San Francisco Int...|Los Angeles Inter...|            13744|
|Los Angeles Inter...|San Francisco Int...|            13457|
|John F. Kennedy I...|Los Angeles Inter...|            12016|
|Los Angeles Inter...|John F. Kennedy I...|            12015|
|McCarran Internat...|Los Angeles Inter...|             9715|
|LaGuardia Airport...|Chicago O'Hare In...|             9639|
|Los Angeles Inter...|McCarran Internat...|             9594|
|Chicago O'Hare In...|LaGuardia Airport...|             9575|
|San Francisco Int...|John F. Kennedy I...|             8440|
|John F. Kennedy I...|San Francisco Int...|             8437|
|     Kahului Airport|Honolulu Internat...|             8313|
|Honolulu Internat...|     Kahului Airport|             8282|
|Los Angeles Inter...|Chicago O'Hare In...|             8256|
|Hartsfi

In [143]:
(flights_summary_df.groupBy("origin_airport")
 .agg(sum("flights_count").alias("sum_flights_count"),
      max("flights_count").alias("max_flights_count"),
      min("flights_count").alias("min_flights_count"),
      count("flights_count").alias("count_flights_count"))
 .orderBy("sum_flights_count", ascending=False)).show()

+--------------------+-----------------+-----------------+-----------------+-------------------+
|      origin_airport|sum_flights_count|max_flights_count|min_flights_count|count_flights_count|
+--------------------+-----------------+-----------------+-----------------+-------------------+
|Hartsfield-Jackso...|           346836|             8234|                1|                169|
|Chicago O'Hare In...|           285884|             9575|                2|                162|
|Dallas/Fort Worth...|           239551|             7870|               63|                148|
|Denver Internatio...|           196055|             7211|                2|                139|
|Los Angeles Inter...|           194673|            13457|                1|                 80|
|San Francisco Int...|           148008|            13744|                1|                 80|
|Phoenix Sky Harbo...|           146815|             7380|                1|                 79|
|George Bush Inter...|        