In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

In [2]:
spark = SparkSession.builder.appName("PysparkDataFrame").getOrCreate()

In [3]:
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

In [4]:
schema = StructType([StructField("firstname",StringType(), True),
                    StructField("middlename",StringType(), True),
                     StructField("lastname",StringType(), True),
                     StructField("id",StringType(), True),
                     StructField("sex",StringType(), True),
                    StructField("salary",IntegerType(), True),

                    ])

In [5]:
dataframe = spark.createDataFrame(data=data, schema=schema)

In [6]:
dataframe.show(truncate=False)

+---------+----------+--------+-----+---+------+
|firstname|middlename|lastname|id   |sex|salary|
+---------+----------+--------+-----+---+------+
|James    |          |Smith   |36636|M  |3000  |
|Michael  |Rose      |        |40288|M  |4000  |
|Robert   |          |Williams|42114|M  |4000  |
|Maria    |Anne      |Jones   |39192|F  |4000  |
|Jen      |Mary      |Brown   |     |F  |-1    |
+---------+----------+--------+-----+---+------+



In [7]:
dataframe.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- salary: integer (nullable = true)



In [8]:
!dir testdata

 Volume in drive D is UserProfile
 Volume Serial Number is 882D-C2C4

 Directory of D:\Spark-Projects\spark-test\sparkdf\testdata

09/11/2021  04:39 PM    <DIR>          .
09/11/2021  04:39 PM    <DIR>          ..
11/20/2020  10:36 PM       245,147,566 fire-incidents.csv
               1 File(s)    245,147,566 bytes
               2 Dir(s)  80,340,070,400 bytes free


# Fireincidents practice to load DataFrame

In [9]:
firedata = "./testdata/fire-incidents.csv"

In [10]:
spark = SparkSession.builder.appName("PysparkDataFrameFireincidents").getOrCreate()

In [11]:
fire_df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(firedata)

In [12]:
fire_df.printSchema()

root
 |-- IncidentNumber: integer (nullable = true)
 |-- ExposureNumber: integer (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- IncidentDate: timestamp (nullable = true)
 |-- CallNumber: integer (nullable = true)
 |-- AlarmDtTm: timestamp (nullable = true)
 |-- ArrivalDtTm: timestamp (nullable = true)
 |-- CloseDtTm: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- ZIPCode: string (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- SuppressionUnits: integer (nullable = true)
 |-- SuppressionPersonnel: integer (nullable = true)
 |-- EMSUnits: integer (nullable = true)
 |-- EMSPersonnel: integer (nullable = true)
 |-- OtherUnits: integer (nullable = true)
 |-- OtherPersonnel: integer (nullable = true)
 |-- FirstUnitOnScene: string (nullable = true)
 |-- EstimatedPropertyLoss: integer (nullable = true)
 |-- EstimatedContentsLoss: d

In [13]:
fire_df.columns

['IncidentNumber',
 'ExposureNumber',
 'ID',
 'Address',
 'IncidentDate',
 'CallNumber',
 'AlarmDtTm',
 'ArrivalDtTm',
 'CloseDtTm',
 'City',
 'ZIPCode',
 'Battalion',
 'StationArea',
 'Box',
 'SuppressionUnits',
 'SuppressionPersonnel',
 'EMSUnits',
 'EMSPersonnel',
 'OtherUnits',
 'OtherPersonnel',
 'FirstUnitOnScene',
 'EstimatedPropertyLoss',
 'EstimatedContentsLoss',
 'FireFatalities',
 'FireInjuries',
 'CivilianFatalities',
 'CivilianInjuries',
 'NumberofAlarms',
 'PrimarySituation',
 'MutualAid',
 'ActionTakenPrimary',
 'ActionTakenSecondary',
 'ActionTakenOther',
 'DetectorAlertedOccupants',
 'PropertyUse',
 'AreaofFireOrigin',
 'IgnitionCause',
 'IgnitionFactorPrimary',
 'IgnitionFactorSecondary',
 'HeatSource',
 'ItemFirstIgnited',
 'HumanFactorsAssociatedwithIgnition',
 'StructureType',
 'StructureStatus',
 'FloorofFireOrigin',
 'FireSpread',
 'NoFlameSpead',
 'Numberoffloorswithminimumdamage',
 'Numberoffloorswithsignificantdamage',
 'Numberoffloorswithheavydamage',
 'Numbe

In [14]:
fire_df.select("IncidentNumber","IncidentDate", "City").show(10)

+--------------+-------------------+-------------+
|IncidentNumber|       IncidentDate|         City|
+--------------+-------------------+-------------+
|      20104668|2020-09-11 00:00:00|San Francisco|
|      20104708|2020-09-11 00:00:00|San Francisco|
|      20104648|2020-09-10 00:00:00|San Francisco|
|      20104598|2020-09-10 00:00:00|San Francisco|
|      20104575|2020-09-10 00:00:00|San Francisco|
|      20104477|2020-09-10 00:00:00|San Francisco|
|      20104443|2020-09-10 00:00:00|San Francisco|
|      20104605|2020-09-10 00:00:00|San Francisco|
|      20104474|2020-09-10 00:00:00|San Francisco|
|      20104652|2020-09-10 00:00:00|San Francisco|
+--------------+-------------------+-------------+
only showing top 10 rows



In [15]:
output_path = "./data/output/fireincidents"

In [16]:
fire_df.write.format("parquet").mode("overwrite").save(output_path)

# Working with Structured operations

## Reading Json

In [17]:
from pyspark.sql.types import DateType, FloatType, ArrayType, BooleanType, StringType, StructType, StructField

In [18]:
schema_person = StructType([StructField("id", IntegerType(), True),
                           StructField("first_name", StringType(), True),
                           StructField("last_name", StringType(), True),
                           StructField("fav_movies", ArrayType(StringType()), True),
                           StructField("salary", FloatType(), True),
                           StructField("image_url", StringType(), True),
                           StructField("date_of_birth", DateType(), True),
                            StructField("active", BooleanType(), True)
                           ])

In [19]:
person_data = './data/persons/persons.json'

In [20]:
person_df = spark.read.json(person_data, schema_person, multiLine='True' )

In [21]:
person_df.show(10, False)

+---+----------+---------+-------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|id |first_name|last_name|fav_movies                                                   |salary |image_url                                      |date_of_birth|active|
+---+----------+---------+-------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|1  |Drucy     |Poppy    |[I giorni contati]                                           |1463.36|http://dummyimage.com/126x166.png/cc0000/ffffff|1991-02-16   |true  |
|2  |Emelyne   |Blaza    |[Musketeer, The, Topralli]                                   |3006.04|http://dummyimage.com/158x106.bmp/cc0000/ffffff|1991-11-02   |false |
|3  |Max       |Rettie   |[The Forgotten Space, Make It Happen]                        |1422.88|http://dummyimage.com/237x140.jpg/ff4444/ffffff|1990-03-03   |false |
|4  

## Columns and Expressions

In [22]:
from pyspark.sql.functions import col, expr

In [23]:
person_df.select(col("first_name"), col("last_name"), col("date_of_birth")).show(10, False)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|Drucy     |Poppy    |1991-02-16   |
|Emelyne   |Blaza    |1991-11-02   |
|Max       |Rettie   |1990-03-03   |
|Ilario    |Kean     |1987-06-09   |
|Toddy     |Drexel   |1992-10-28   |
|Oswald    |Petrolli |1986-09-02   |
|Adrian    |Clarey   |1971-08-24   |
|Dominica  |Goodnow  |1973-08-27   |
|Emory     |Slocomb  |1974-06-08   |
|Jeremias  |Bode     |1997-08-02   |
+----------+---------+-------------+
only showing top 10 rows



In [24]:
person_df.select(expr("first_name"), expr("last_name"), expr("date_of_birth")).show(10, False)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|Drucy     |Poppy    |1991-02-16   |
|Emelyne   |Blaza    |1991-11-02   |
|Max       |Rettie   |1990-03-03   |
|Ilario    |Kean     |1987-06-09   |
|Toddy     |Drexel   |1992-10-28   |
|Oswald    |Petrolli |1986-09-02   |
|Adrian    |Clarey   |1971-08-24   |
|Dominica  |Goodnow  |1973-08-27   |
|Emory     |Slocomb  |1974-06-08   |
|Jeremias  |Bode     |1997-08-02   |
+----------+---------+-------------+
only showing top 10 rows



In [25]:
from pyspark.sql.functions import concat_ws

In [26]:
person_df.select(concat_ws(" ", col("first_name"),  col("last_name")).alias("full_name"), col("salary"), (col("salary") * .10 + col("salary")).alias("Salary_increment")).show(20, False)

+------------------+-------+------------------+
|full_name         |salary |Salary_increment  |
+------------------+-------+------------------+
|Drucy Poppy       |1463.36|1609.6959838867188|
|Emelyne Blaza     |3006.04|3306.64404296875  |
|Max Rettie        |1422.88|1565.1680053710938|
|Ilario Kean       |3561.36|3917.4961181640624|
|Toddy Drexel      |4934.87|5428.35712890625  |
|Oswald Petrolli   |1153.23|1268.552978515625 |
|Adrian Clarey     |1044.73|1149.202978515625 |
|Dominica Goodnow  |1147.76|1262.5360107421875|
|Emory Slocomb     |1082.11|1190.3209838867188|
|Jeremias Bode     |3472.63|3819.89287109375  |
|Timothy Ervine    |1147.61|1262.3709838867187|
|Leanora Gooder    |1327.02|1459.722021484375 |
|Claiborn Denham   |2623.33|2885.6630859375   |
|Ambrosi Vidineev  |4550.88|5005.96787109375  |
|Feodor Nancekivell|2218.46|2440.30595703125  |
|Margaux Archbold  |1013.75|1115.125          |
|Balduin Elstone   |2302.26|2532.4860107421873|
|Alfie Hatliffe    |3893.1 |4282.4101074

In [27]:
person_df.select(concat_ws(" ", col("first_name"),  col("last_name")).alias("full_name"), col("salary"), (expr("salary * .10 + salary").alias("Salary_increment"))).show(20, False)

+------------------+-------+------------------+
|full_name         |salary |Salary_increment  |
+------------------+-------+------------------+
|Drucy Poppy       |1463.36|1609.6959838867188|
|Emelyne Blaza     |3006.04|3306.64404296875  |
|Max Rettie        |1422.88|1565.1680053710938|
|Ilario Kean       |3561.36|3917.4961181640624|
|Toddy Drexel      |4934.87|5428.35712890625  |
|Oswald Petrolli   |1153.23|1268.552978515625 |
|Adrian Clarey     |1044.73|1149.202978515625 |
|Dominica Goodnow  |1147.76|1262.5360107421875|
|Emory Slocomb     |1082.11|1190.3209838867188|
|Jeremias Bode     |3472.63|3819.89287109375  |
|Timothy Ervine    |1147.61|1262.3709838867187|
|Leanora Gooder    |1327.02|1459.722021484375 |
|Claiborn Denham   |2623.33|2885.6630859375   |
|Ambrosi Vidineev  |4550.88|5005.96787109375  |
|Feodor Nancekivell|2218.46|2440.30595703125  |
|Margaux Archbold  |1013.75|1115.125          |
|Balduin Elstone   |2302.26|2532.4860107421873|
|Alfie Hatliffe    |3893.1 |4282.4101074

In [28]:
person_df.filter(col("salary") < 3000).show(10, False)

+---+----------+-----------+----------------------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|id |first_name|last_name  |fav_movies                                                                  |salary |image_url                                      |date_of_birth|active|
+---+----------+-----------+----------------------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|1  |Drucy     |Poppy      |[I giorni contati]                                                          |1463.36|http://dummyimage.com/126x166.png/cc0000/ffffff|1991-02-16   |true  |
|3  |Max       |Rettie     |[The Forgotten Space, Make It Happen]                                       |1422.88|http://dummyimage.com/237x140.jpg/ff4444/ffffff|1990-03-03   |false |
|6  |Oswald    |Petrolli   |[Wing and the Thigh, The (L'aile ou la cuisse)]          

In [29]:
person_df.where(col("salary") < 3000).show(10, False)

+---+----------+-----------+----------------------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|id |first_name|last_name  |fav_movies                                                                  |salary |image_url                                      |date_of_birth|active|
+---+----------+-----------+----------------------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|1  |Drucy     |Poppy      |[I giorni contati]                                                          |1463.36|http://dummyimage.com/126x166.png/cc0000/ffffff|1991-02-16   |true  |
|3  |Max       |Rettie     |[The Forgotten Space, Make It Happen]                                       |1422.88|http://dummyimage.com/237x140.jpg/ff4444/ffffff|1990-03-03   |false |
|6  |Oswald    |Petrolli   |[Wing and the Thigh, The (L'aile ou la cuisse)]          

In [30]:
person_df.filter((col("salary") < 3000) & (col("active") == True)).show(10, False)

+---+----------+---------+-----------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|id |first_name|last_name|fav_movies                                                       |salary |image_url                                      |date_of_birth|active|
+---+----------+---------+-----------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|1  |Drucy     |Poppy    |[I giorni contati]                                               |1463.36|http://dummyimage.com/126x166.png/cc0000/ffffff|1991-02-16   |true  |
|9  |Emory     |Slocomb  |[Snake and Crane Arts of Shaolin (She hao ba bu), Mala Noche]    |1082.11|http://dummyimage.com/138x226.jpg/cc0000/ffffff|1974-06-08   |true  |
|16 |Margaux   |Archbold |[And Now a Word from Our Sponsor]                                |1013.75|http://dummyimage.com/229x133.png/5fa2dd/ffffff|19

In [31]:
from pyspark.sql.functions import array_contains

In [32]:
person_df.where(array_contains(col("fav_movies"), "Land of the Lost") ).show()

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| 11|   Timothy|   Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+



## Distinct Drop Duplicates and OrderBy

In [33]:
from pyspark.sql.functions import count, desc

In [34]:
person_df.select("active").distinct().show()

+------+
|active|
+------+
|  true|
| false|
+------+



In [35]:
from pyspark.sql.functions import year

In [36]:
person_df.select(col("first_name"), year("date_of_birth"), col("active")).show()

+----------+-------------------+------+
|first_name|year(date_of_birth)|active|
+----------+-------------------+------+
|     Drucy|               1991|  true|
|   Emelyne|               1991| false|
|       Max|               1990| false|
|    Ilario|               1987|  true|
|     Toddy|               1992|  true|
|    Oswald|               1986| false|
|    Adrian|               1971| false|
|  Dominica|               1973| false|
|     Emory|               1974|  true|
|  Jeremias|               1997|  true|
|   Timothy|               1971| false|
|   Leanora|               1981| false|
|  Claiborn|               1996| false|
|   Ambrosi|               1989|  true|
|    Feodor|               2000| false|
|   Margaux|               1988|  true|
|   Balduin|               1974| false|
|     Alfie|               1989|  true|
|      Lura|               1998| false|
|      Maxi|               1979| false|
+----------+-------------------+------+
only showing top 20 rows



In [37]:
(person_df.select(col("first_name"), year(col("date_of_birth")).alias("year"), col("active")).orderBy("year", "first_name")).show()

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Adrian|1971| false|
|   Feodora|1971|  true|
|       Sky|1971| false|
|   Timothy|1971| false|
|    Lucita|1972|  true|
|      Rodi|1972| false|
|  Sherline|1972|  true|
|     Toddy|1972|  true|
|  Dominica|1973| false|
|    Kelila|1973|  true|
|  Wolfgang|1973|  true|
|   Balduin|1974| false|
|     Emory|1974|  true|
|    Norean|1974|  true|
|    Janean|1975|  true|
|       Bev|1976|  true|
| Franciska|1976| false|
|    Bennie|1977| false|
|     Johny|1977| false|
|    Daveta|1978| false|
+----------+----+------+
only showing top 20 rows



In [38]:
dropped_df = (person_df.select(col("first_name"), year("date_of_birth").alias("year"), col("active")).dropDuplicates(["year", "active"]).orderBy("year", "first_name"))

In [39]:
dropped_df.show()

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Adrian|1971| false|
|   Feodora|1971|  true|
|      Rodi|1972| false|
|  Sherline|1972|  true|
|  Dominica|1973| false|
|    Kelila|1973|  true|
|   Balduin|1974| false|
|     Emory|1974|  true|
|    Janean|1975|  true|
|       Bev|1976|  true|
| Franciska|1976| false|
|     Johny|1977| false|
|    Daveta|1978| false|
|   Guthrie|1978|  true|
|      Maxi|1979| false|
|   Melinda|1979|  true|
|    Carter|1980| false|
|   Loralyn|1980|  true|
|     Clive|1981|  true|
|   Leanora|1981| false|
+----------+----+------+
only showing top 20 rows



In [40]:
(person_df.select(col("first_name"), year(col("date_of_birth")).alias("year"), col("active")).orderBy("year", ascending=False)).show()

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|   Lorilee|2002| false|
|    Virgie|2002|  true|
|    Carlen|2002|  true|
|     Daron|2002|  true|
|    Maxine|2001| false|
|    Feodor|2000| false|
|  Annabell|2000|  true|
|     Kelcy|2000|  true|
|     Jobie|2000| false|
|      Redd|2000| false|
|  Theodore|1999| false|
| Kendricks|1999|  true|
|    Cecily|1999|  true|
|      Jere|1999| false|
|  Elianora|1999| false|
|     Deina|1999|  true|
|      Jere|1998| false|
|    Wilden|1998| false|
|      Rudy|1998|  true|
|    Eugine|1998| false|
+----------+----+------+
only showing top 20 rows



## Rows and Union

In [41]:
from pyspark.sql import Row

In [42]:
person_row = Row(101, "Robert", "Ownes", ["Men in Black III", "Home Alone"], 43001.64, "http://someimage.com", "1964-08-18", True)

In [43]:
persons_rows_list = [Row(102, "Kenny", "Bobien", ["Men in Black III", "Home Alone"], 43001.64, "http://someimage.com", "1964-08-18", True), 
                    Row(103, "Sara", "Devine", ["Men in Black III", "Home Alone"], 43001.64, "http://someimage.com", "1964-08-18", True)]

In [44]:
persons_rows_list.append(person_row)

In [45]:
print(persons_rows_list)

[<Row(102, 'Kenny', 'Bobien', ['Men in Black III', 'Home Alone'], 43001.64, 'http://someimage.com', '1964-08-18', True)>, <Row(103, 'Sara', 'Devine', ['Men in Black III', 'Home Alone'], 43001.64, 'http://someimage.com', '1964-08-18', True)>, <Row(101, 'Robert', 'Ownes', ['Men in Black III', 'Home Alone'], 43001.64, 'http://someimage.com', '1964-08-18', True)>]


In [46]:
person_row[1]

'Robert'

In [47]:
new_persons_df = spark.createDataFrame(persons_rows_list, ["id","first_name","last_name", "fav_movies", "salary", "image_url", "date_of_birth", "active"
                                                          ])

In [48]:
new_persons_df.show()

+---+----------+---------+--------------------+--------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies|  salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+--------+--------------------+-------------+------+
|102|     Kenny|   Bobien|[Men in Black III...|43001.64|http://someimage.com|   1964-08-18|  true|
|103|      Sara|   Devine|[Men in Black III...|43001.64|http://someimage.com|   1964-08-18|  true|
|101|    Robert|    Ownes|[Men in Black III...|43001.64|http://someimage.com|   1964-08-18|  true|
+---+----------+---------+--------------------+--------+--------------------+-------------+------+



In [49]:
add_persons_df = person_df.union(new_persons_df)

In [50]:
add_persons_df.show()


+---+----------+-----------+--------------------+------------------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies|            salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+------------------+--------------------+-------------+------+
|  1|     Drucy|      Poppy|  [I giorni contati]|1463.3599853515625|http://dummyimage...|   1991-02-16|  true|
|  2|   Emelyne|      Blaza|[Musketeer, The, ...|   3006.0400390625|http://dummyimage...|   1991-11-02| false|
|  3|       Max|     Rettie|[The Forgotten Sp...|1422.8800048828125|http://dummyimage...|   1990-03-03| false|
|  4|    Ilario|       Kean|[Up Close and Per...| 3561.360107421875|http://dummyimage...|   1987-06-09|  true|
|  5|     Toddy|     Drexel|[Walk in the Clou...|   4934.8701171875|http://dummyimage...|   1992-10-28|  true|
|  6|    Oswald|   Petrolli|[Wing and the Thi...|  1153.22998046875|http://dummyimage...|   1986-09-02| false|
|

In [51]:
add_persons_df.sort(desc("id")).show()

+---+----------+---------+--------------------+------------------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies|            salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+------------------+--------------------+-------------+------+
|103|      Sara|   Devine|[Men in Black III...|          43001.64|http://someimage.com|   1964-08-18|  true|
|102|     Kenny|   Bobien|[Men in Black III...|          43001.64|http://someimage.com|   1964-08-18|  true|
|101|    Robert|    Ownes|[Men in Black III...|          43001.64|http://someimage.com|   1964-08-18|  true|
|100|    Virgie| Domanski|[Horseman, The, S...| 2165.929931640625|http://dummyimage...|   2002-01-05|  true|
| 99|   Rozalie|   Wannop|[Suddenly, The No...|1259.6400146484375|http://dummyimage...|   1997-03-25| false|
| 98|     Davin|     Labb|[Viva Riva!, Kill...| 1452.739990234375|http://dummyimage...|   1988-01-27|  true|
| 97|      Rodi|   

## Adding renaming and droppiong Column

In [52]:
from pyspark.sql.functions import round

In [53]:
aug_persons_df1 = add_persons_df.withColumn("salary_increase", expr("salary * 0.1 + salary"))

In [54]:
aug_persons_df1.columns

['id',
 'first_name',
 'last_name',
 'fav_movies',
 'salary',
 'image_url',
 'date_of_birth',
 'active',
 'salary_increase']

In [55]:
aug_persons_df2 = (aug_persons_df1
                   .withColumn("birth_year", year("date_of_birth"))
                   .withColumnRenamed("fav_movies", "movies")
                   .withColumn("salary_x10", round(col("salary_increase"), 2))
                   .drop("salary_increase"))

In [56]:
aug_persons_df2.show()

+---+----------+-----------+--------------------+------------------+--------------------+-------------+------+----------+----------+
| id|first_name|  last_name|              movies|            salary|           image_url|date_of_birth|active|birth_year|salary_x10|
+---+----------+-----------+--------------------+------------------+--------------------+-------------+------+----------+----------+
|  1|     Drucy|      Poppy|  [I giorni contati]|1463.3599853515625|http://dummyimage...|   1991-02-16|  true|      1991|    1609.7|
|  2|   Emelyne|      Blaza|[Musketeer, The, ...|   3006.0400390625|http://dummyimage...|   1991-11-02| false|      1991|   3306.64|
|  3|       Max|     Rettie|[The Forgotten Sp...|1422.8800048828125|http://dummyimage...|   1990-03-03| false|      1990|   1565.17|
|  4|    Ilario|       Kean|[Up Close and Per...| 3561.360107421875|http://dummyimage...|   1987-06-09|  true|      1987|    3917.5|
|  5|     Toddy|     Drexel|[Walk in the Clou...|   4934.8701171875|h

## Working with missing or bad data

In [57]:
bad_movies_list = [Row(None, None, None),
                   Row(None, None, 2020),
                   Row("John Doe", "Awesome Movie", None),
                   Row(None, "Awesome Movie", 2021),
                   Row("Mary Jane", None, 2019),
                   Row("Vikter Duplaix", "Not another teen movie", 2001)]

In [58]:
bad_movies_columns = ["actor_name", "movie_title", "produced_year"]

In [59]:
bad_movies_df = spark.createDataFrame(bad_movies_list, schema=bad_movies_columns)

In [60]:
bad_movies_df.show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|          null|                null|         null|
|          null|                null|         2020|
|      John Doe|       Awesome Movie|         null|
|          null|       Awesome Movie|         2021|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [61]:
bad_movies_df.na.drop().show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [62]:
bad_movies_df.na.drop("any").show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [63]:
bad_movies_df.na.drop("all").show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|          null|                null|         2020|
|      John Doe|       Awesome Movie|         null|
|          null|       Awesome Movie|         2021|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [64]:
bad_movies_df.filter(col("actor_name").isNotNull() == True).show()

+--------------+--------------------+-------------+
|    actor_name|         movie_title|produced_year|
+--------------+--------------------+-------------+
|      John Doe|       Awesome Movie|         null|
|     Mary Jane|                null|         2019|
|Vikter Duplaix|Not another teen ...|         2001|
+--------------+--------------------+-------------+



In [65]:
bad_movies_df.describe("actor_name").show()

+-------+--------------+
|summary|    actor_name|
+-------+--------------+
|  count|             3|
|   mean|          null|
| stddev|          null|
|    min|      John Doe|
|    max|Vikter Duplaix|
+-------+--------------+



## User Defined Functions 

In [66]:
from pyspark.sql.functions import udf

In [67]:
students_list = [("joe",85), ("jane",90), ("mary",55)]

In [68]:
students_columns = ["name", "score"]

In [69]:
students_df = spark.createDataFrame(students_list,students_columns )

In [70]:
students_df.show()

+----+-----+
|name|score|
+----+-----+
| joe|   85|
|jane|   90|
|mary|   55|
+----+-----+



In [71]:
def letterGrade(score:int):
    grade = ''
    if score > 100:
        grade = 'Cheating'
    elif score >= 90:
        grade = 'A'
    elif score >= 80:
        grade = 'B'
    elif score >= 70:
        grade = 'C'        
    elif score < 70:
        grade = 'D'
    return grade


In [72]:
letterGrade_udf = udf(letterGrade)

In [73]:
students_df.select("name", "score", letterGrade_udf(col("score")).alias("grade")).show()

+----+-----+-----+
|name|score|grade|
+----+-----+-----+
| joe|   85|    B|
|jane|   90|    A|
|mary|   55|    D|
+----+-----+-----+



# Aggregations 

In [74]:
flight_data = './data/flights/flight-summary.csv'

flight_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load(flight_data)

In [75]:
flight_df.count()

4693

In [76]:
flight_df.show()

+-----------+--------------------+---------------+------------+---------+--------------------+----------------+----------+-----+
|origin_code|      origin_airport|    origin_city|origin_state|dest_code|        dest_airport|       dest_city|dest_state|count|
+-----------+--------------------+---------------+------------+---------+--------------------+----------------+----------+-----+
|        BQN|Rafael Hernández ...|      Aguadilla|          PR|      MCO|Orlando Internati...|         Orlando|        FL|  441|
|        PHL|Philadelphia Inte...|   Philadelphia|          PA|      MCO|Orlando Internati...|         Orlando|        FL| 4869|
|        MCI|Kansas City Inter...|    Kansas City|          MO|      IAH|George Bush Inter...|         Houston|        TX| 1698|
|        SPI|Abraham Lincoln C...|    Springfield|          IL|      ORD|Chicago O'Hare In...|         Chicago|        IL|  998|
|        SNA|John Wayne Airpor...|      Santa Ana|          CA|      PHX|Phoenix Sky Harbo...|   

In [77]:
flight_df.printSchema()

root
 |-- origin_code: string (nullable = true)
 |-- origin_airport: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- origin_state: string (nullable = true)
 |-- dest_code: string (nullable = true)
 |-- dest_airport: string (nullable = true)
 |-- dest_city: string (nullable = true)
 |-- dest_state: string (nullable = true)
 |-- count: integer (nullable = true)



In [78]:
flight_df = flight_df.withColumnRenamed("count", "flight_count")

In [79]:
flight_df.show()

+-----------+--------------------+---------------+------------+---------+--------------------+----------------+----------+------------+
|origin_code|      origin_airport|    origin_city|origin_state|dest_code|        dest_airport|       dest_city|dest_state|flight_count|
+-----------+--------------------+---------------+------------+---------+--------------------+----------------+----------+------------+
|        BQN|Rafael Hernández ...|      Aguadilla|          PR|      MCO|Orlando Internati...|         Orlando|        FL|         441|
|        PHL|Philadelphia Inte...|   Philadelphia|          PA|      MCO|Orlando Internati...|         Orlando|        FL|        4869|
|        MCI|Kansas City Inter...|    Kansas City|          MO|      IAH|George Bush Inter...|         Houston|        TX|        1698|
|        SPI|Abraham Lincoln C...|    Springfield|          IL|      ORD|Chicago O'Hare In...|         Chicago|        IL|         998|
|        SNA|John Wayne Airpor...|      Santa An

## cCount(col) and countDistinct

In [80]:
from pyspark.sql.functions import count, countDistinct

In [81]:
# Remember aggregations do not include null values, in the bad movies exaples there are null values in actor name and other columns.

In [82]:
flight_df.select(count("origin_airport"), count("dest_airport")).show()

+---------------------+-------------------+
|count(origin_airport)|count(dest_airport)|
+---------------------+-------------------+
|                 4693|               4693|
+---------------------+-------------------+



In [83]:
bad_movies_df.select(count("actor_name"), count("movie_title"), count("produced_year"), count("*")).show()

+-----------------+------------------+--------------------+--------+
|count(actor_name)|count(movie_title)|count(produced_year)|count(1)|
+-----------------+------------------+--------------------+--------+
|                3|                 3|                   4|       6|
+-----------------+------------------+--------------------+--------+



In [84]:
flight_df.select(countDistinct("origin_airport"), countDistinct("dest_airport"), count("*")).show()

+------------------------------+----------------------------+--------+
|count(DISTINCT origin_airport)|count(DISTINCT dest_airport)|count(1)|
+------------------------------+----------------------------+--------+
|                           322|                         322|    4693|
+------------------------------+----------------------------+--------+



### min(col), max(col), sum(col), sum distinct(col) and avg

In [85]:
from pyspark.sql.functions import min, max, sum, sumDistinct, avg

In [86]:
flight_df.select(min("flight_count"), max("flight_count")).show()

+-----------------+-----------------+
|min(flight_count)|max(flight_count)|
+-----------------+-----------------+
|                1|            13744|
+-----------------+-----------------+



In [87]:
flight_df.select(sum("flight_count")).show()

+-----------------+
|sum(flight_count)|
+-----------------+
|          5332914|
+-----------------+



In [88]:
# Lets see how is sum different from sumDistinct

In [89]:
students_df_1 = students_df.union(students_df)
students_df_1.show()

+----+-----+
|name|score|
+----+-----+
| joe|   85|
|jane|   90|
|mary|   55|
| joe|   85|
|jane|   90|
|mary|   55|
+----+-----+



In [96]:
students_df.select(sum("score")).show()


+----------+
|sum(score)|
+----------+
|       230|
+----------+



In [99]:
students_df_1.select(sum("score")).show()

+----------+
|sum(score)|
+----------+
|       460|
+----------+



In [100]:
students_df_1.select(sumDistinct("score")).show()

+-------------------+
|sum(DISTINCT score)|
+-------------------+
|                230|
+-------------------+



In [101]:
# Lets do for Flight data

In [102]:
flight_df.select(sum("flight_count")).show()

+-----------------+
|sum(flight_count)|
+-----------------+
|          5332914|
+-----------------+



In [103]:
flight_df.select(sumDistinct("flight_count")).show()

+--------------------------+
|sum(DISTINCT flight_count)|
+--------------------------+
|                   3612257|
+--------------------------+



In [104]:
flight_df.select(avg("flight_count"), sum("flight_count")/count("flight_count")).show()

+------------------+-----------------------------------------+
| avg(flight_count)|(sum(flight_count) / count(flight_count))|
+------------------+-----------------------------------------+
|1136.3549968037503|                       1136.3549968037503|
+------------------+-----------------------------------------+



## Aggregation with grouping

In [106]:
flight_df.groupBy("origin_airport").count().orderBy("count", ascending=False).show(5,False)

+------------------------------------------------+-----+
|origin_airport                                  |count|
+------------------------------------------------+-----+
|Hartsfield-Jackson Atlanta International Airport|169  |
|Chicago O'Hare International Airport            |162  |
|Dallas/Fort Worth International Airport         |148  |
|Denver International Airport                    |139  |
|Minneapolis-Saint Paul International Airport    |120  |
+------------------------------------------------+-----+
only showing top 5 rows



In [110]:
flight_df.groupBy("origin_airport").agg(max("flight_count")).orderBy(max("flight_count"), ascending=False).show(5,False)

+----------------------------------------------------------------------+-----------------+
|origin_airport                                                        |max(flight_count)|
+----------------------------------------------------------------------+-----------------+
|San Francisco International Airport                                   |13744            |
|Los Angeles International Airport                                     |13457            |
|John F. Kennedy International Airport (New York International Airport)|12016            |
|McCarran International Airport                                        |9715             |
|LaGuardia Airport (Marine Air Terminal)                               |9639             |
+----------------------------------------------------------------------+-----------------+
only showing top 5 rows



In [113]:
# re-writing above with alias

flight_df.groupBy("origin_airport").agg(max("flight_count").alias("max_flight_count")).orderBy("max_flight_count", ascending=False).show(5,False)

+----------------------------------------------------------------------+----------------+
|origin_airport                                                        |max_flight_count|
+----------------------------------------------------------------------+----------------+
|San Francisco International Airport                                   |13744           |
|Los Angeles International Airport                                     |13457           |
|John F. Kennedy International Airport (New York International Airport)|12016           |
|McCarran International Airport                                        |9715            |
|LaGuardia Airport (Marine Air Terminal)                               |9639            |
+----------------------------------------------------------------------+----------------+
only showing top 5 rows



In [118]:
flight_df.groupBy("origin_state", "origin_city").count().where(col("origin_state") =="CA").orderBy("count", ascending=False).show(5)

+------------+-------------+-----+
|origin_state|  origin_city|count|
+------------+-------------+-----+
|          CA|San Francisco|   80|
|          CA|  Los Angeles|   80|
|          CA|    San Diego|   47|
|          CA|      Oakland|   35|
|          CA|   Sacramento|   27|
+------------+-------------+-----+
only showing top 5 rows



In [119]:
flight_df.groupBy("origin_airport").agg(max("flight_count"), min("flight_count"), sum("flight_count"), count("flight_count")).show(5)

+--------------------+-----------------+-----------------+-----------------+-------------------+
|      origin_airport|max(flight_count)|min(flight_count)|sum(flight_count)|count(flight_count)|
+--------------------+-----------------+-----------------+-----------------+-------------------+
|Melbourne Interna...|             1332|             1332|             1332|                  1|
|San Diego Interna...|             6942|                4|            70207|                 46|
|     Eppley Airfield|             2083|                1|            16753|                 21|
|     Kahului Airport|             8313|               67|            20627|                 18|
|Austin-Bergstrom ...|             4674|                8|            42067|                 41|
+--------------------+-----------------+-----------------+-----------------+-------------------+
only showing top 5 rows

