In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
spark = SparkSession.builder.appName("Practice_DataFrames").getOrCreate()

## Working with DataFrames

In [3]:
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

In [4]:
schema = StructType([
    StructField("firstname",StringType(),True),
    StructField("middlename",StringType(),True),
    StructField("lastname",StringType(),True),
    StructField("id",StringType(),True),
    StructField("gender",StringType(),True),
    StructField("salary",IntegerType(),True),
])

In [5]:
df = spark.createDataFrame(data=data, schema = schema)
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [6]:
df.show(truncate=False)

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|id   |gender|salary|
+---------+----------+--------+-----+------+------+
|James    |          |Smith   |36636|M     |3000  |
|Michael  |Rose      |        |40288|M     |4000  |
|Robert   |          |Williams|42114|M     |4000  |
|Maria    |Anne      |Jones   |39192|F     |4000  |
|Jen      |Mary      |Brown   |     |F     |-1    |
+---------+----------+--------+-----+------+------+



## Reading From CSV File

In [7]:
file_path = "./data/sales_records.csv"

df = spark.read.format("csv").option("header",True).option("inferSchema",True).load(file_path)

In [8]:
df.printSchema()

root
 |-- Region: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Item Type: string (nullable = true)
 |-- Sales Channel: string (nullable = true)
 |-- Order Priority: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Units Sold: integer (nullable = true)
 |-- Unit Price: double (nullable = true)
 |-- Unit Cost: double (nullable = true)
 |-- Total Revenue: double (nullable = true)
 |-- Total Cost: double (nullable = true)
 |-- Total Profit: double (nullable = true)



In [9]:
df.columns

['Region',
 'Country',
 'Item Type',
 'Sales Channel',
 'Order Priority',
 'Order Date',
 'Order ID',
 'Ship Date',
 'Units Sold',
 'Unit Price',
 'Unit Cost',
 'Total Revenue',
 'Total Cost',
 'Total Profit']

In [10]:
df["Region","Country","Total Profit"].show(10,truncate=False)

+---------------------------------+---------------------+------------+
|Region                           |Country              |Total Profit|
+---------------------------------+---------------------+------------+
|Middle East and North Africa     |Azerbaijan           |51500.76    |
|Central America and the Caribbean|Panama               |791282.37   |
|Sub-Saharan Africa               |Sao Tome and Principe|24066.26    |
|Sub-Saharan Africa               |Sao Tome and Principe|228497.08   |
|Central America and the Caribbean|Belize               |970846.34   |
|Europe                           |Denmark              |84382.56    |
|Europe                           |Germany              |1384700.68  |
|Middle East and North Africa     |Turkey               |15199.87    |
|Europe                           |United Kingdom       |453085.38   |
|Asia                             |Kazakhstan           |479533.46   |
+---------------------------------+---------------------+------------+
only s

In [11]:
output_path = "./data/output"

df.write.format("parquet").mode("overwrite").save(output_path)

## Reading JSON File

In [12]:
from pyspark.sql.types import ArrayType, FloatType, DateType, BooleanType

In [13]:
persons_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("fav_movies", ArrayType(StringType()), True),
    StructField("salary", FloatType(), True),
    StructField("image_url", StringType(), True),
    StructField("date_of_birth", DateType(), True),
    StructField("active", BooleanType(), True),
])

In [14]:
json_file_path = "./data/persons.json"

df_json = spark.read.json(json_file_path,persons_schema,multiLine="True")

In [15]:
df_json.show(10,truncate=False)

+---+----------+---------+-------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|id |first_name|last_name|fav_movies                                                   |salary |image_url                                      |date_of_birth|active|
+---+----------+---------+-------------------------------------------------------------+-------+-----------------------------------------------+-------------+------+
|1  |Drucy     |Poppy    |[I giorni contati]                                           |1463.36|http://dummyimage.com/126x166.png/cc0000/ffffff|1991-02-16   |true  |
|2  |Emelyne   |Blaza    |[Musketeer, The, Topralli]                                   |3006.04|http://dummyimage.com/158x106.bmp/cc0000/ffffff|1991-11-02   |false |
|3  |Max       |Rettie   |[The Forgotten Space, Make It Happen]                        |1422.88|http://dummyimage.com/237x140.jpg/ff4444/ffffff|1990-03-03   |false |
|4  

## Columns and Expressions

In [16]:
from pyspark.sql.functions import col,expr

In [17]:
df_json.select(col("first_name"),col("last_name"),col("date_of_birth")).show(5)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
+----------+---------+-------------+
only showing top 5 rows



In [18]:
df_json.select(expr("first_name"),expr("last_name"),expr("date_of_birth")).show(5)

+----------+---------+-------------+
|first_name|last_name|date_of_birth|
+----------+---------+-------------+
|     Drucy|    Poppy|   1991-02-16|
|   Emelyne|    Blaza|   1991-11-02|
|       Max|   Rettie|   1990-03-03|
|    Ilario|     Kean|   1987-06-09|
|     Toddy|   Drexel|   1992-10-28|
+----------+---------+-------------+
only showing top 5 rows



In [19]:
from pyspark.sql.functions import concat_ws

In [20]:
df_json.select(concat_ws(' ',col('first_name'),col('last_name')).alias('full_name'),
               col('salary'),
               (col('salary')*0.10+col('salary')).alias('salary_after_increment')).show(10)

+----------------+-------+----------------------+
|       full_name| salary|salary_after_increment|
+----------------+-------+----------------------+
|     Drucy Poppy|1463.36|    1609.6959838867188|
|   Emelyne Blaza|3006.04|      3306.64404296875|
|      Max Rettie|1422.88|    1565.1680053710938|
|     Ilario Kean|3561.36|    3917.4961181640624|
|    Toddy Drexel|4934.87|      5428.35712890625|
| Oswald Petrolli|1153.23|     1268.552978515625|
|   Adrian Clarey|1044.73|     1149.202978515625|
|Dominica Goodnow|1147.76|    1262.5360107421875|
|   Emory Slocomb|1082.11|    1190.3209838867188|
|   Jeremias Bode|3472.63|      3819.89287109375|
+----------------+-------+----------------------+
only showing top 10 rows



In [21]:
df_json.select(concat_ws(' ',col('first_name'),col('last_name')).alias('full_name'),
               col('salary'),
               (expr('salary *0.10 + salary')).alias('salary_after_increment')).show(10)

+----------------+-------+----------------------+
|       full_name| salary|salary_after_increment|
+----------------+-------+----------------------+
|     Drucy Poppy|1463.36|    1609.6959838867188|
|   Emelyne Blaza|3006.04|      3306.64404296875|
|      Max Rettie|1422.88|    1565.1680053710938|
|     Ilario Kean|3561.36|    3917.4961181640624|
|    Toddy Drexel|4934.87|      5428.35712890625|
| Oswald Petrolli|1153.23|     1268.552978515625|
|   Adrian Clarey|1044.73|     1149.202978515625|
|Dominica Goodnow|1147.76|    1262.5360107421875|
|   Emory Slocomb|1082.11|    1190.3209838867188|
|   Jeremias Bode|3472.63|      3819.89287109375|
+----------------+-------+----------------------+
only showing top 10 rows



## Filter and Where Conditions

In [22]:
df_json.filter('salary <= 3000').show(10)

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|      Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  3|       Max|     Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|
|  6|    Oswald|   Petrolli|[Wing and the Thi...|1153.23|http://dummyimage...|   1986-09-02| false|
|  7|    Adrian|     Clarey|[Walking Tall, Pa...|1044.73|http://dummyimage...|   1971-08-24| false|
|  8|  Dominica|    Goodnow|    [Hearts Divided]|1147.76|http://dummyimage...|   1973-08-27| false|
|  9|     Emory|    Slocomb|[Snake and Crane ...|1082.11|http://dummyimage...|   1974-06-08|  true|
| 11|   Timothy|     Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|


In [23]:
df_json.where('salary <= 3000').show(10)

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|      Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  3|       Max|     Rettie|[The Forgotten Sp...|1422.88|http://dummyimage...|   1990-03-03| false|
|  6|    Oswald|   Petrolli|[Wing and the Thi...|1153.23|http://dummyimage...|   1986-09-02| false|
|  7|    Adrian|     Clarey|[Walking Tall, Pa...|1044.73|http://dummyimage...|   1971-08-24| false|
|  8|  Dominica|    Goodnow|    [Hearts Divided]|1147.76|http://dummyimage...|   1973-08-27| false|
|  9|     Emory|    Slocomb|[Snake and Crane ...|1082.11|http://dummyimage...|   1974-06-08|  true|
| 11|   Timothy|     Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|


In [24]:
df_json.where((col('salary') <= 3000) & (col('active')==True)).show(10)

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
|  1|     Drucy|    Poppy|  [I giorni contati]|1463.36|http://dummyimage...|   1991-02-16|  true|
|  9|     Emory|  Slocomb|[Snake and Crane ...|1082.11|http://dummyimage...|   1974-06-08|  true|
| 16|   Margaux| Archbold|[And Now a Word f...|1013.75|http://dummyimage...|   1988-07-29|  true|
| 26|     Clive|      Lax|             [Rabid]|2126.87|http://dummyimage...|   1981-10-26|  true|
| 33|  Sherline|  Primett|   [Jungle Fighters]|2309.39|http://dummyimage...|   1972-07-23|  true|
| 34|     Davis|    Pinks|          [Hounddog]|1337.14|http://dummyimage...|   1989-07-27|  true|
| 37|    Carlen|  Sharply|[Dr. Jekyll and M...|2051.85|http://dummyimage...|   2002-06-01|  true|
| 40|    Jordan|   L

In [25]:
from pyspark.sql.functions import year

In [26]:
df_json.filter((year("date_of_birth") == 2000) | (year("date_of_birth") == 1989)).show(10)

+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| id|first_name|  last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+-----------+--------------------+-------+--------------------+-------------+------+
| 14|   Ambrosi|   Vidineev|[Wall Street: Mon...|4550.88|http://dummyimage...|   1989-07-20|  true|
| 15|    Feodor|Nancekivell|   [Monsoon Wedding]|2218.46|http://dummyimage...|   2000-10-07| false|
| 18|     Alfie|   Hatliffe|     [Lord of Tears]| 3893.1|http://dummyimage...|   1989-06-21|  true|
| 25|     Kelcy|     Wogdon|    [Iron Mask, The]|4512.51|http://dummyimage...|   2000-10-20|  true|
| 32|      Redd|   Akenhead|[Century of the D...| 2470.9|http://dummyimage...|   2000-06-05| false|
| 34|     Davis|      Pinks|          [Hounddog]|1337.14|http://dummyimage...|   1989-07-27|  true|
| 61|    Shanna|    Samples|[Thomas in Love (...| 2703.0|http://dummyimage...|   1989-07-07| false|


In [27]:
from pyspark.sql.functions import array_contains

In [28]:
df_json.where(array_contains(df_json.fav_movies,"Land of the Lost")).show()

+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| id|first_name|last_name|          fav_movies| salary|           image_url|date_of_birth|active|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+
| 11|   Timothy|   Ervine|[Land of the Lost...|1147.61|http://dummyimage...|   1971-06-02| false|
+---+----------+---------+--------------------+-------+--------------------+-------------+------+



## Distinct, Drop Duplicates, Order BY

In [29]:
from pyspark.sql.functions import count,desc

df_json.select("active").show(10)

+------+
|active|
+------+
|  true|
| false|
| false|
|  true|
|  true|
| false|
| false|
| false|
|  true|
|  true|
+------+
only showing top 10 rows



In [31]:
df_json.select(col("first_name"),
               year(col("date_of_birth")).alias("year"),
               col("active")).orderBy("year","first_name").show(10)

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Adrian|1971| false|
|   Feodora|1971|  true|
|       Sky|1971| false|
|   Timothy|1971| false|
|    Lucita|1972|  true|
|      Rodi|1972| false|
|  Sherline|1972|  true|
|     Toddy|1972|  true|
|  Dominica|1973| false|
|    Kelila|1973|  true|
+----------+----+------+
only showing top 10 rows



In [37]:
dropped_df = (df_json.select(col("first_name"),
               year(col("date_of_birth")).alias("year"),
               col("active")).dropDuplicates(["year","active"])).orderBy("year","first_name")

In [38]:
dropped_df.show()

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Adrian|1971| false|
|   Feodora|1971|  true|
|      Rodi|1972| false|
|  Sherline|1972|  true|
|  Dominica|1973| false|
|    Kelila|1973|  true|
|   Balduin|1974| false|
|     Emory|1974|  true|
|    Janean|1975|  true|
|       Bev|1976|  true|
| Franciska|1976| false|
|     Johny|1977| false|
|    Daveta|1978| false|
|   Guthrie|1978|  true|
|      Maxi|1979| false|
|   Melinda|1979|  true|
|    Carter|1980| false|
|   Loralyn|1980|  true|
|     Clive|1981|  true|
|   Leanora|1981| false|
+----------+----+------+
only showing top 20 rows



In [39]:
df_json.select(col("first_name"),
               year(col("date_of_birth")).alias("year"),
               col("active")).orderBy("year",ascending=False).show(10)

+----------+----+------+
|first_name|year|active|
+----------+----+------+
|    Carlen|2002|  true|
|   Lorilee|2002| false|
|     Daron|2002|  true|
|    Virgie|2002|  true|
|    Maxine|2001| false|
|    Feodor|2000| false|
|     Kelcy|2000|  true|
|      Redd|2000| false|
|  Annabell|2000|  true|
|     Jobie|2000| false|
+----------+----+------+
only showing top 10 rows



## Rows and Union

In [40]:
from pyspark.sql import Row