In [0]:
%fs ls /databricks-datasets/airlines

In [0]:
%fs head /databricks-datasets/airlines/part-00000

In [0]:
airlines_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("samplingRation", "0.0001") \
    .load("/databricks-datasets/airlines/part-00000")

In [0]:
airlines_df.select("Origin","Dest", "Year").show(20)

In [0]:
#ways of using a column
from pyspark.sql.functions import *
airlines_df.select(column("Origin"), col("Dest"), "Year", airlines_df.Distance).show(10)

In [0]:
airlines_df.select("Origin", "Dest", expr("to_date(concat(Year, Month, DayofMonth), 'yyyMMdd') as FlightDate")).show(10)
#can also be written as
#airlines_df.select("Origin", "Dest", to_date(concat('Year', 'Month', 'DayofMonth'), 'yyyMMdd').alias('FlightDate')).show(10)

In [0]:
!pip install findspark      

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-ae226c24-d2e5-4218-b1cd-1a66253d5e10/bin/python -m pip install --upgrade pip' command.[0m


In [0]:
import findspark
findspark.init()

import pyspark
import re
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder \
        .master("local[3]") \
        .appName("transformations Practice") \
        .getOrCreate()
    
data_list = [("Rat", "21", "1", "2009"),
                ("jess", "22", "11", "89"),
                ("John", "22", "2", "4"), 
                ("Rosy", "33", "8", "68"), 
                ("Abdul", "53", "5", "91"),
                ("Max", "22", "8", "01"),
                ("Niel", "28", "2", "21")]

raw_df = spark.createDataFrame(data_list).toDF("name", "day", "month", "year")
raw_df.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)



In [0]:
df1 = raw_df.withColumn("Id", monotonically_increasing_id())
df1.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         Id|
+-----+---+-----+----+-----------+
|  Rat| 21|    1|2009| 8589934592|
| jess| 22|   11|  89|17179869184|
| John| 22|    2|   4|25769803776|
| Rosy| 33|    8|  68|34359738368|
|Abdul| 53|    5|  91|42949672960|
|  Max| 22|    8|  01|51539607552|
| Niel| 28|    2|  21|60129542144|
+-----+---+-----+----+-----------+



In [0]:
#transforming year column to get full value
df2 = df1.withColumn("year", expr("""
                     case when year<21 then year+2000
                     when year<100 then year+1900
                     else year
                     end
                     """))

df2.show()

+-----+---+-----+------+-----------+
| name|day|month|  year|         Id|
+-----+---+-----+------+-----------+
|  Rat| 21|    1|  2009| 8589934592|
| jess| 22|   11|1989.0|17179869184|
| John| 22|    2|2004.0|25769803776|
| Rosy| 33|    8|1968.0|34359738368|
|Abdul| 53|    5|1991.0|42949672960|
|  Max| 22|    8|2001.0|51539607552|
| Niel| 28|    2|1921.0|60129542144|
+-----+---+-----+------+-----------+



In [0]:
#casting the year column to int inline, decimals are not desired output
df3 = df1.withColumn("year", expr("""
                     case when year<21 then cast(year as int)+2000
                     when year<100 then cast(year as int)+1900
                     else year
                     end
                     """))

df3.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         Id|
+-----+---+-----+----+-----------+
|  Rat| 21|    1|2009| 8589934592|
| jess| 22|   11|1989|17179869184|
| John| 22|    2|2004|25769803776|
| Rosy| 33|    8|1968|34359738368|
|Abdul| 53|    5|1991|42949672960|
|  Max| 22|    8|2001|51539607552|
| Niel| 28|    2|1921|60129542144|
+-----+---+-----+----+-----------+



In [0]:
#casting the year column to int by changing schema, decimals are not desired output
df4 = df1.withColumn("year", expr("""
                     case when year<21 then year+2000
                     when year<100 then year+1900
                     else year
                     end
                     """).cast(IntegerType()))

df4.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         Id|
+-----+---+-----+----+-----------+
|  Rat| 21|    1|2009| 8589934592|
| jess| 22|   11|1989|17179869184|
| John| 22|    2|2004|25769803776|
| Rosy| 33|    8|1968|34359738368|
|Abdul| 53|    5|1991|42949672960|
|  Max| 22|    8|2001|51539607552|
| Niel| 28|    2|1921|60129542144|
+-----+---+-----+----+-----------+



In [0]:
df5 = df4.withColumn("day", col("day").cast(IntegerType())) \
        .withColumn("month", col("month").cast(IntegerType()))

df5.printSchema()

root
 |-- name: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- Id: long (nullable = false)



In [0]:
df6 = df5.withColumn("DOB", expr("to_date(concat(month, '/', day, '/', year), 'm/d/y')"))
df7 = df5.withColumn("DOB", to_date(expr("concat(month, '/', day, '/', year)"), "M/d/y"))
df6.show()
df7.show()

+-----+---+-----+----+-----------+----------+
| name|day|month|year|         Id|       DOB|
+-----+---+-----+----+-----------+----------+
|  Rat| 21|    1|2009| 8589934592|2009-01-21|
| jess| 22|   11|1989|17179869184|1989-01-22|
| John| 22|    2|2004|25769803776|2004-01-22|
| Rosy| 33|    8|1968|34359738368|      null|
|Abdul| 53|    5|1991|42949672960|      null|
|  Max| 22|    8|2001|51539607552|2001-01-22|
| Niel| 28|    2|1921|60129542144|1921-01-28|
+-----+---+-----+----+-----------+----------+

+-----+---+-----+----+-----------+----------+
| name|day|month|year|         Id|       DOB|
+-----+---+-----+----+-----------+----------+
|  Rat| 21|    1|2009| 8589934592|2009-01-21|
| jess| 22|   11|1989|17179869184|1989-11-22|
| John| 22|    2|2004|25769803776|2004-02-22|
| Rosy| 33|    8|1968|34359738368|      null|
|Abdul| 53|    5|1991|42949672960|      null|
|  Max| 22|    8|2001|51539607552|2001-08-22|
| Niel| 28|    2|1921|60129542144|1921-02-28|
+-----+---+-----+----+-----------

In [0]:
df9 = df6 \
    .withColumn("DOB", expr("to_date(concat(month, '/', day, '/', year), 'M/d/y')")) \
    .drop("day", "month", "year") \
    .dropDuplicates(["name", "DOB"]) \
    .sort("DOB", expr("DOB desc"))

df9.show()

+-----+-----------+----------+
| name|         Id|       DOB|
+-----+-----------+----------+
| Rosy|34359738368|      null|
|Abdul|42949672960|      null|
| Niel|60129542144|1921-02-28|
| jess|17179869184|1989-11-22|
|  Max|51539607552|2001-08-22|
| John|25769803776|2004-02-22|
|  Rat| 8589934592|2009-01-21|
+-----+-----------+----------+

