# Basic Transformations

Let us deep dive about how we can perform basic transformations using Spark Dataframe APIs.

* Overview of Basic Transformations
* Overview of filtering
* Total Aggregations
* Grouped Aggregations
* Overview of Sorting
* Solutions - Problem 1
* Solutions - Problem 2
* Solutions - Problem 3


## Overview of Basic Transformations

Let us define problem statements to learn more about Data Frame APIs. We will try to cover filtering, aggregations and sorting as part of solutions for these problem statements.

* Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival. 
  * Output should contain 3 columns - **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**
* Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day. 
  * Output should contain 4 columns - **FlightDate**, **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**
  * **FlightDate** should be of **yyyy-MM-dd** format.
  * Data should be **sorted** in ascending order by **flightDate**
* Get all the flights which are departed late but arrived early (**IsArrDelayed is NO**).
  * Output should contain - **FlightCRSDepTime**, **UniqueCarrier**, **FlightNum**, **Origin**, **Dest**, **DepDelay**, **ArrDelay**
  * **FlightCRSDepTime** need to be computed using **Year**, **Month**, **DayOfMonth**, **CRSDepTime**
  * **FlightCRSDepTime** should be displayed using **yyyy-MM-dd HH:mm** format.
  * Output should be sorted by **FlightCRSDepTime** and then by the difference between **DepDelay** and **ArrDelay**
  * Also get the count of such flights
  
  ## Data Frame for basic transformations

Let us understand how to build the Data Frame to explore basic transformations. We wil be  creating data frame using air traffic data.

* Our air traffic data is in **parquet** file format.
* We can use `spark.read.parquet` to create data frame by passing appropriate path which contain air traffic data.
* We will build the Data Frame using 2008 January data. We will also preview schema as well as data using basic Data Frame functions to begin with.

## Basic Filtering of Data

Let us get started with Basic Filtering using Spark Data Frame APIs.
## Basic Filtering of Data

Let us get started with Basic Filtering using Spark Data Frame APIs.

* Filtering can be done either by using `filter` or `where`. These are like synonyms to each other.
* When it comes to the condition, we can either pass it in **SQL Style** or **Data Frame Style**.
* Example for SQL Style - `airtraffic.filter("IsArrDelayed = 'YES'").show()` or `airtraffic.where("IsArrDelayed = 'YES'").show()`
* Example for Data Frame Style - `airtraffic.filter(airtraffic["IsArrDelayed"] == 'YES').show()` or `airtraffic.filter(airtraffic.IsArrDelayed == 'YES').show()`. We can also use where instead of filter.
* Here are the other operations we can perform to filter the data - `!=`, `>`, `<`, `>=`, `<=`, `LIKE`, `BETWEEN` with `AND`    

 
## Boolean Operators

Let us understand details about boolean operators while filtering data in Spark Data Frames.
* If we have to validate against multiple columns then we need to use boolean operations such as `AND` or `OR` or both.
* Here are some of the examples where we end up using Boolean Operators.
  * Get count of flights which are departed late at origin and reach destination early or on time.
  * Get count of flights which are departed early or on time but arrive late by at least 15 minutes.
  * Get number of flights which are departed late on Saturdays as well as on Sundays.
  
  ## Using IN Operator or isin Function
Let us understand how to use `IN` operator while filtering data using a column against multiple values.

* It is alternative for Boolean `OR` where single column is compared with multiple values using equal condition.  

## Using LIKE Operator or like Function
Let us understand the usage of `LIKE` operator or `like` function while filtering the data in Data Frames.

* `like` is primarily used for partial comparison (e.g.: Search for names which starts with Sco).
* We can use `like` to get results which starts with a pattern or ends with a pattern or contain the pattern.
* We can also use negation with `like`.
* Spark also provides `rlike` to take care of partial comparison using regular expression.

## Using BETWEEN Operator
Let us understand the usage of `BETWEEN` in conjunction with `AND` while filtering data from Data Frames.
## Dealing with Nulls while Filtering
Let us understand how to deal with nulls while filtering the data using Spark.
## Total Aggregations

Let us go through the details related to total aggregations using Spark.
* We can perform total aggregations directly on Dataframe or we can perform aggregations after grouping by a key(s).
* Here are the functions which we typically use to perform aggregations.
  * `count`
  * `sum`, `avg`
  * `min`, `max`
  
  * Here are the APIs which we typically use to group the data using a key. As part of this topic, we will primarily focus on `groupBy`.
  * `groupBy`
  * `rollup`
  * `cube`
* Here are the functions which we typically use to perform aggregations.
  * `count`
  * `sum`, `avg`
  * `min`, `max`
* If we want to provide aliases to the aggregated fields then we have to use `agg` after `groupBy`.

* We can use `orderBy` or `sort` to sort the data.
* We can perform composite sorting by passing multiple columns or expressions.
* By default data is sorted in ascending order, we can change it to descending by applying `desc()` function on the column or expression.
* If the sort column contain null values those will come first. We can change the position of nulls to last.

* Our data cannot be stored in one table. It will be stored in multiple tables and the tables might be related.
  * When it comes to transactional systems, we typically define tables based on Normalization Principles.
  * When it comes to data warehousing applications, we typically define tables using Dimensional Modeling.
  * Either of the approach data is scattered into multiple tables and relationships are defined.
  * Typically tables are related with one to one, one to many, many to many relationships.
* When we have 2 Data Sets that are related based on a common key we typically perform join.
* There are different types of joins.
  * INNER JOIN
  * OUTER JOIN (LEFT or RIGHT)
  * FULL OUTER JOIN (a LEFT OUTER JOIN b UNION a RIGHT OUTER JOIN b)
  
  
  # Windowing Functions

As part of this module let us get into Windowing Functions.

## Ranking Functions

We can use ranking functions to assign ranks to a particular record within a partition.

* Sparse Rank - rank
* Dense Rank - dense_rank
* Assigning Row Numbers - row_number
* Percentage Rank - percent_rank

In [0]:
## put airtrafick file and read in databricks 
## read data in spark order csv 
# File location and type
# File location and type
file_location = "/FileStore/tables/part_00252_5cde1303_4ebf_4a12_8fad_f5d9f9c9124a_c000_snappy.parquet"
file_type = "parquet"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
airtraffic = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(airtraffic)

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,IsArrDelayed,IsDepDelayed
2008,1,16,3,1725.0,1735,1959.0,2021,OH,5367,N716CA,154.0,166,146.0,-22.0,-10.0,BGR,CVG,906,1.0,7.0,0,,0,,,,,,NO,NO
2008,1,17,4,1717.0,1701,1915.0,1855,OH,4977,N967CA,118.0,114,101.0,20.0,16.0,SYR,CVG,527,2.0,15.0,0,,0,16.0,0.0,4.0,0.0,0.0,YES,YES
2008,1,17,4,1220.0,1225,1440.0,1504,OH,5352,N709CA,140.0,159,117.0,-24.0,-5.0,SAV,BOS,901,8.0,15.0,0,,0,,,,,,NO,NO
2008,1,17,4,1530.0,1530,1645.0,1637,OH,5426,N779CA,75.0,67,45.0,8.0,0.0,CVG,GRR,268,5.0,25.0,0,,0,,,,,,YES,NO
2008,1,17,4,1203.0,1205,1429.0,1429,OH,5441,N809CA,86.0,84,58.0,0.0,-2.0,STL,CVG,307,3.0,25.0,0,,0,,,,,,NO,NO
2008,1,18,5,1150.0,1150,1457.0,1524,OH,5220,N436CA,127.0,154,102.0,-27.0,0.0,STL,JFK,892,4.0,21.0,0,,0,,,,,,NO,NO
2008,1,18,5,1215.0,1009,1540.0,1251,OH,5260,N446CA,145.0,102,140.0,169.0,126.0,MCI,CVG,539,2.0,3.0,0,,0,126.0,0.0,43.0,0.0,0.0,YES,YES
2008,1,19,6,835.0,835,1145.0,1130,OH,5276,N523CA,130.0,115,83.0,15.0,0.0,TUL,CVG,646,4.0,43.0,0,,0,0.0,0.0,15.0,0.0,0.0,YES,NO
2008,1,20,7,1925.0,1935,2148.0,2124,OH,5215,N729CA,143.0,109,34.0,24.0,-10.0,JFK,PHL,94,5.0,104.0,0,,0,0.0,0.0,24.0,0.0,0.0,YES,NO
2008,1,20,7,825.0,830,1045.0,1007,OH,5324,N933CA,140.0,97,92.0,38.0,-5.0,RDU,CVG,390,1.0,47.0,0,,0,0.0,0.0,38.0,0.0,0.0,YES,NO


In [0]:
airtraffic.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [0]:
# find show distinct year , month ,dateofMonth 
airtraffic.select('Year', 'Month', 'DayOfMonth').distinct().show(31)

+----+-----+----------+
|Year|Month|DayOfMonth|
+----+-----+----------+
|2008|    1|        28|
|2008|    1|        25|
|2008|    1|        20|
|2008|    1|        11|
|2008|    1|         4|
|2008|    1|         5|
|2008|    1|        15|
|2008|    1|         3|
|2008|    1|        16|
|2008|    1|         9|
|2008|    1|        17|
|2008|    1|        19|
|2008|    1|        12|
|2008|    1|         6|
|2008|    1|        21|
|2008|    1|        18|
|2008|    1|         7|
|2008|    1|         1|
|2008|    1|        26|
|2008|    1|        24|
+----+-----+----------+
only showing top 20 rows



In [0]:
## count distinct rows
airtraffic.select('Year', 'Month', 'DayOfMonth').distinct().count()
## give distinct rows ie disctinct year,month,dayofMonth

Out[5]: 31

In [0]:
airtraffic.count()
## note : pyspark.sql.DataFrame.count() function is used to get the number of rows present in the DataFrame.


Out[6]: 605659

In [0]:
airtraffic.describe()
## note : describe give the overall idea of discription 


Out[8]: DataFrame[summary: string, Year: string, Month: string, DayofMonth: string, DayOfWeek: string, DepTime: string, CRSDepTime: string, ArrTime: string, CRSArrTime: string, UniqueCarrier: string, FlightNum: string, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: string, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: string, CancellationCode: string, Diverted: string, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string, IsArrDelayed: string, IsDepDelayed: string]

In [0]:
# * Get count of cancelled flights.
airtraffic.select('Cancelled').distinct().show()
## by analysis we know that 1 stand fro gflight that got cancelled
## dataFrame/pandas style
airtraffic.filter(airtraffic['Cancelled']==1).count()
airtraffic.filter(airtraffic.Cancelled == 1).count()
## sql style 
airtraffic.filter('Cancelled = 1').count()

#Q * Get number of flights scheduled for departure from SFO airport.
airtraffic.filter("Origin = 'SFO'").count()
airtraffic.filter(airtraffic['Origin'] == 'SFO').count()
airtraffic.filter(airtraffic.Origin == 'SFO').count()

# Q * Get number of flights that are departed from airport with out any delay.
airtraffic.filter(airtraffic['IsDepDelayed'] == 'YES').count()
airtraffic.filter(airtraffic.IsDepDelayed == "YES").count()

# airtraffic.select('IsDepDelayed').show()

+---------+
|Cancelled|
+---------+
|        1|
|        0|
+---------+

Out[22]: 265198

In [0]:
from pyspark.sql.functions import concat,col,lpad,date_format,to_date
airtraffic. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter((col("IsDepDelayed") == "YES") &
           (date_format(
               to_date("FlightDate", "yyyyMMdd"), "EEEE"
           ) == "Sunday")
          ). \
    count()

Out[27]: 34708

In [0]:
# note: use of and , OR with filter 
## sql style 
airtraffic. \
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO' AND Cancelled = 0"). \
    count()

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|2008|    1|    

In [0]:
## with api style  we use & function
from pyspark.sql.functions import col

airtraffic. \
    filter((col("IsDepDelayed") == "NO") & 
           (col("IsArrDelayed") == "NO")
          ). \
    count()

Out[29]: 253470

In [0]:
from pyspark.sql.functions import col, concat, lpad, date_format, to_date

airtraffic. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter((col("IsDepDelayed") == "YES") & (col("Cancelled") == 0) &
           ((date_format(
               to_date("FlightDate", "yyyyMMdd"), "EEEE"
           ) == "Saturday") |
            (date_format(
               to_date("FlightDate", "yyyyMMdd"), "EEEE"
           ) == "Sunday")
           )
          ). \
    count()

In [0]:
# * Get count of flights departed from following major airports - ORD, DFW, ATL, LAX, SFO.
#  sql style 
airtraffic.filter("Origin IN ('ORD', 'DFW', 'ATL', 'LAX', 'SFO')").count()

In [0]:
## api style 
from pyspark.sql.functions import col
from pyspark.sql.functions import col

airtraffic.filter(col("Origin").isin("ORD", "DFW", "ATL", "LAX", "SFO")).count()
## isin(*cols)  === use for filters 

In [0]:
from pyspark.sql.functions import col, concat, lpad, date_format, to_date

airtraffic.withColumn("FlightDate",concat(col("Year"),lpad(col("Month"), 2, "0"),lpad(col("DayOfMonth"), 2, "0"))).\
            filter((col("IsDepDelayed") == "YES") & (col("Cancelled") == 0) & (date_format(to_date("FlightDate", "yyyyMMdd"), "EEEE")\
                                                                               .isin("Saturday", "Sunday")).count()

In [0]:
## note: rlike  used in spark ,
## like  used in sql style 

employees = [(1, "Scott", "Tiger", 1000.0, 10,
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, None,
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, '',
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 10,
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                ]
employeesDF = spark.createDataFrame(employees,schema="""employee_id INT, first_name STRING, last_name STRING
                                                       , salary FLOAT, bonus STRING, nationality STRING,phone_number STRING, ssn STRING""")

In [0]:
employeesDF.filter("first_name LIKE 'Sco%'").show()

+-----------+----------+---------+------+-----+-------------+---------------+-----------+
|employee_id|first_name|last_name|salary|bonus|  nationality|   phone_number|        ssn|
+-----------+----------+---------+------+-----+-------------+---------------+-----------+
|          1|     Scott|    Tiger|1000.0|   10|united states|+1 123 456 7890|123 45 6789|
+-----------+----------+---------+------+-----+-------------+---------------+-----------+



In [0]:
## api style use rlike === regular expressionlike , like 
from pyspark.sql.functions import col
employeesDF.filter(col('first_name').like('Sco%')).show()

In [0]:
## sql style
airtraffic. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter("""
           IsDepDelayed = 'YES' AND 
           Cancelled = 0 AND
           FlightDate BETWEEN 20080101 AND 20080109
          """). \
    count()

In [0]:
# * API Style col have between 
airtraffic. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter((col("IsDepDelayed") == "YES") & 
           (col("Cancelled") == 0) &
           (col("FlightDate").between(20080101, 20080109))
          ). \
    count()

Out[33]: 86180

In [0]:
## sql style null 
employeesDF.filter("bonus IS NOT NULL AND bonus <> ''").show()

+-----------+----------+---------+------+-----+-------------+----------------+-----------+
|employee_id|first_name|last_name|salary|bonus|  nationality|    phone_number|        ssn|
+-----------+----------+---------+------+-----+-------------+----------------+-----------+
|          1|     Scott|    Tiger|1000.0|   10|united states| +1 123 456 7890|123 45 6789|
|          4|      Bill|    Gomes|1500.0|   10|    AUSTRALIA|+61 987 654 3210|789 12 6118|
+-----------+----------+---------+------+-----+-------------+----------------+-----------+



In [0]:
## api style 
## col have isNotNull 
employeesDF.filter((col('bonus').isNotNull()) & (col('bonus') != '')).show())

In [0]:
employeesDF.filter((col('bonus').cast('int').isNull())).show()

In [0]:
## DataFrame.summary(*statistics)[source]
## summery gives mean ,max , 
airtraffic.select('Year', 'Month', 'DayOfMonth').summary().show()

In [0]:
employeesDF.selectExpr('sum((coalesce(cast(bonus AS INT), 0) * salary) / 100) AS total_bonus').show()

In [0]:
# File location and type
file_location = "/FileStore/tables/orders.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
order_items = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
order_items.show()
## here file extension anmoly

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
|             7130|2013-07-25 00:00:...|       6|       COMPLETE|
|             4530|2013-07-25 00:00:...|       7|       COMPLETE|
|             2911|2013-07-25 00:00:...|       8|     PROCESSING|
|             5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|             5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|              918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|             1837|2013-07-25 00:00:...|      12|         CLOSED|
|         

In [0]:
from pyspark.sql.functions import col,lit
order_items.filter(col('order_item_order_id') == lit(int("order_id"))).select(sum('order_item_subtotal').alias('order_revenue')).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mValueError[0m                                Traceback (most recent call last)
[0;32m<command-4097723314929750>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      1[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0mcol[0m[0;34m,[0m[0mlit[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0morder_items[0m[0;34m.[0m[0mfilter[0m[0;34m([0m[0mcol[0m[0;34m([0m[0;34m'order_item_order_id'[0m[0;34m)[0m [0;34m==[0m [0mlit[0m[0;34m([0m[0mint[0m[0;34m([0m[0;34m"order_id"[0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0msum[0m[0;34m([0m[0;34m'order_item_subtotal'[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m'order_revenue'[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mValueError[0m: invalid l

In [0]:
## Grouping of data 
order_items. \
    groupBy('order_item_order_id'). \
    agg(
        round(sum('order_item_subtotal'), 2).alias('revenue_per_order'),
        min('order_item_subtotal').alias('order_item_subtotal_min'),
        max('order_item_subtotal').alias('order_item_subtotal_max')
    ). \
    show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<command-4097723314929751>[0m in [0;36m<cell line: 2>[0;34m()[0m
[1;32m      3[0m     [0mgroupBy[0m[0;34m([0m[0;34m'order_item_order_id'[0m[0;34m)[0m[0;34m.[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m     agg(
[0;32m----> 5[0;31m         [0mround[0m[0;34m([0m[0msum[0m[0;34m([0m[0;34m'order_item_subtotal'[0m[0;34m)[0m[0;34m,[0m [0;36m2[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m'revenue_per_order'[0m[0;34m)[0m[0;34m,[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      6[0m         [0mmin[0m[0;34m([0m[0;34m'order_item_subtotal'[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m'order_item_subtotal_min'[0m[0;34m)[0m[0;34m,[0m[0;34m[0m[0;34m[0m[0m
[1;32m      7[0m         [0mmax[0m[0;34m([0m[0;34m'order_it

In [0]:
help(order_items.rollup)

Help on method rollup in module pyspark.sql.dataframe:

rollup(*cols: 'ColumnOrName') -> 'GroupedData' method of pyspark.sql.dataframe.DataFrame instance
    Create a multi-dimensional rollup for the current :class:`DataFrame` using
    the specified columns, so we can run aggregation on them.
    
    .. versionadded:: 1.4.0
    
    Examples
    --------
    >>> df.rollup("name", df.age).count().orderBy("name", "age").show()
    +-----+----+-----+
    | name| age|count|
    +-----+----+-----+
    | null|null|    2|
    |Alice|null|    1|
    |Alice|   2|    1|
    |  Bob|null|    1|
    |  Bob|   5|    1|
    +-----+----+-----+



In [0]:
##note : roll up 
# Rollup and cubes are in function in Spark dataframe it creates multi-dimensional grouping.

from pyspark.sql.functions import year,date_format,filter,count

order_items. \
    rollup(
        year('order_date').alias('order_year'),
        date_format('order_date', 'yyyyMM').alias('order_month'), 
        'order_date'
    ). \
    agg(count(lit(1)).alias('order_count')). \
    filter("order_month = 201401"). \
    orderBy('order_year', 'order_month', 'order_date').show()

+----------+-----------+--------------------+-----------+
|order_year|order_month|          order_date|order_count|
+----------+-----------+--------------------+-----------+
|      2014|     201401|                null|       5908|
|      2014|     201401|2014-01-01 00:00:...|        135|
|      2014|     201401|2014-01-02 00:00:...|        111|
|      2014|     201401|2014-01-03 00:00:...|        250|
|      2014|     201401|2014-01-04 00:00:...|        129|
|      2014|     201401|2014-01-05 00:00:...|        266|
|      2014|     201401|2014-01-06 00:00:...|        155|
|      2014|     201401|2014-01-07 00:00:...|        163|
|      2014|     201401|2014-01-08 00:00:...|        122|
|      2014|     201401|2014-01-09 00:00:...|        207|
|      2014|     201401|2014-01-10 00:00:...|        241|
|      2014|     201401|2014-01-11 00:00:...|        281|
|      2014|     201401|2014-01-12 00:00:...|        215|
|      2014|     201401|2014-01-13 00:00:...|        179|
|      2014|  

In [0]:
### note:cube , rollup are like  grouping set fuction its like grouping with different combination 
## read from sql notes 

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
[0;32m<command-4097723314929766>[0m in [0;36m<cell line: 3>[0;34m()[0m
[1;32m      1[0m [0;31m### note:[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;34m[0m[0m
[0;32m----> 3[0;31m [0morders[0m[0;34m.[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      4[0m     [0mrollup[0m[0;34m([0m[0mdate_format[0m[0;34m([0m[0;34m'order_date'[0m[0;34m,[0m [0;34m'yyyyMM'[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m'order_month'[0m[0;34m)[0m[0;34m,[0m [0;34m'order_date'[0m[0;34m)[0m[0;34m.[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m     [0magg[0m[0;34m([0m[0mcount[0m[0;34m([0m[0mlit[0m[0;34m([0m[0;36m1[0m[0;34m)[0m[0;34m)[0m[0;34m.[0m[0malias[0m[0;34m([0m[0;34m'order_count'[0m[0;34m)[0m

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-624447106221780>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mhelp[0m[0;34m([0m[0morder_items[0m[0;34m.[0m[0masc_null_last[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     47[0m             [0;32mtry[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0;32m---> 48[0;31m                 [0mres[0m [0;34m=[0m [0mfunc[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m     49[0m                 logger.log_success(
[1;32m 

In [0]:
## functions 
## orderby () 
## sort () 
## .desc()
##  .asc_nulls_last() === sort acording to ascending values and null values are shown at last 
order_items. \
    orderBy(order_items.order_date.cast('int').asc_nulls_last()). \
    show()

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|             7205|2014-03-20 00:00:...|   38686|        ON_HOLD|
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|             6783|2014-03-20 00:00:...|   38687|       CANCELED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|             9226|2014-03-20 00:00:...|   38688|       COMPLETE|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             5485|2014-03-20 00:00:...|   38689|       CANCELED|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|             6971|2014-03-20 00:00:...|   38690|       COMPLETE|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
|             2799|2014-03-20 00:00:...|   38691|       COMPLETE|
|             7130|2013-07-25 00:00:...|       6|       COMPLETE|
|         

In [0]:
## note all this with spark api style 
### do joins api and sql styles  with dataframes 
### aggregate functions like , lead ,lag,denserank() ,rank()
## windowing operations like row in between 
### creating partitions 
## writing data partitons style 
##  concat_ws function
## projecting struct and mapcolumns 



In [0]:
## joining of columns
## airtraffic data , airport codes data(Json)
## order.json , orderItems.json (loaded )
##

In [0]:
## create tables 
# File location and type
file_location = "/FileStore/tables/airtrafic_all_snappy.parquet"
file_type = "parquet"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
airport-codes = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(airport-codes)

[0;36m  File [0;32m"<command-2703131455458365>"[0;36m, line [0;32m12[0m
[0;31m    airport-codes = spark.read.format(file_type) \[0m
[0m    ^[0m
[0;31mSyntaxError[0m[0;31m:[0m cannot assign to operator


In [0]:
# File location and type
file_location = "/FileStore/tables/orderItems.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
orderItem = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(orderItem)

order_item_id,order_item_order_id,order_item_product_id,order_item_product_price,order_item_quantity,order_item_subtotal
1,1,957,299.98,1,299.98
2,2,1073,199.99,1,199.99
3,2,502,50.0,5,250.0
4,2,403,129.99,1,129.99
5,4,897,24.99,2,49.98
6,4,365,59.99,5,299.95
7,4,502,50.0,3,150.0
8,4,1014,49.98,4,199.92
9,5,957,299.98,1,299.98
10,5,365,59.99,5,299.95


In [0]:
# File location and type
file_location = "/FileStore/tables/orders.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
orders = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(orders)

order_customer_id,order_date,order_id,order_status
11599,2013-07-25 00:00:00.0,1,CLOSED
256,2013-07-25 00:00:00.0,2,PENDING_PAYMENT
12111,2013-07-25 00:00:00.0,3,COMPLETE
8827,2013-07-25 00:00:00.0,4,CLOSED
11318,2013-07-25 00:00:00.0,5,COMPLETE
7130,2013-07-25 00:00:00.0,6,COMPLETE
4530,2013-07-25 00:00:00.0,7,COMPLETE
2911,2013-07-25 00:00:00.0,8,PROCESSING
5657,2013-07-25 00:00:00.0,9,PENDING_PAYMENT
5648,2013-07-25 00:00:00.0,10,PENDING_PAYMENT


In [0]:

orders.columns


Out[10]: ['order_customer_id', 'order_date', 'order_id', 'order_status']

In [0]:
orderItem.columns

Out[20]: ['order_item_id',
 'order_item_order_id',
 'order_item_product_id',
 'order_item_product_price',
 'order_item_quantity',
 'order_item_subtotal']

In [0]:
## no of rows in orders 
print(orders.count())
## no of rows in orderItems 
print(orderItem.count())
## print columns 
print(orders.columns)
print(orderItem.columns)

68883
172198
['order_customer_id', 'order_date', 'order_id', 'order_status']
['order_item_id', 'order_item_order_id', 'order_item_product_id', 'order_item_product_price', 'order_item_quantity', 'order_item_subtotal']


In [0]:
## joining the orders and orderItems 
orders_join = orders.join(
    orderItems, 
    on=orders['order_id'] == orderItems['order_item_order_id'],
    how='inner'
)

In [0]:
orders_join.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_item_id: long (nullable = true)
 |-- order_item_order_id: long (nullable = true)
 |-- order_item_product_id: long (nullable = true)
 |-- order_item_product_price: double (nullable = true)
 |-- order_item_quantity: long (nullable = true)
 |-- order_item_subtotal: double (nullable = true)



In [0]:
## reading airtraffic_all , airport code 
# File location and type
file_location = "/FileStore/tables/airport_codes_na.txt"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = "\t"

# The applied options are for CSV files. For other file types, these will be ignored.
airport_codes = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(airport_codes)

City,State,Country,IATA
Abbotsford,BC,Canada,YXX
Aberdeen,SD,USA,ABR
Abilene,TX,USA,ABI
Akron,OH,USA,CAK
Alamosa,CO,USA,ALS
Albany,GA,USA,ABY
Albany,NY,USA,ALB
Albuquerque,NM,USA,ABQ
Alexandria,LA,USA,AEX
Allentown,PA,USA,ABE


In [0]:
# File location and type
file_location = "/FileStore/tables/airtrafic_all_snappy-1.parquet"
file_type = "parquet"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
airtraffic = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(airtraffic)

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,IsArrDelayed,IsDepDelayed
2008,1,16,3,1725.0,1735,1959.0,2021,OH,5367,N716CA,154.0,166,146.0,-22.0,-10.0,BGR,CVG,906,1.0,7.0,0,,0,,,,,,NO,NO
2008,1,17,4,1717.0,1701,1915.0,1855,OH,4977,N967CA,118.0,114,101.0,20.0,16.0,SYR,CVG,527,2.0,15.0,0,,0,16.0,0.0,4.0,0.0,0.0,YES,YES
2008,1,17,4,1220.0,1225,1440.0,1504,OH,5352,N709CA,140.0,159,117.0,-24.0,-5.0,SAV,BOS,901,8.0,15.0,0,,0,,,,,,NO,NO
2008,1,17,4,1530.0,1530,1645.0,1637,OH,5426,N779CA,75.0,67,45.0,8.0,0.0,CVG,GRR,268,5.0,25.0,0,,0,,,,,,YES,NO
2008,1,17,4,1203.0,1205,1429.0,1429,OH,5441,N809CA,86.0,84,58.0,0.0,-2.0,STL,CVG,307,3.0,25.0,0,,0,,,,,,NO,NO
2008,1,18,5,1150.0,1150,1457.0,1524,OH,5220,N436CA,127.0,154,102.0,-27.0,0.0,STL,JFK,892,4.0,21.0,0,,0,,,,,,NO,NO
2008,1,18,5,1215.0,1009,1540.0,1251,OH,5260,N446CA,145.0,102,140.0,169.0,126.0,MCI,CVG,539,2.0,3.0,0,,0,126.0,0.0,43.0,0.0,0.0,YES,YES
2008,1,19,6,835.0,835,1145.0,1130,OH,5276,N523CA,130.0,115,83.0,15.0,0.0,TUL,CVG,646,4.0,43.0,0,,0,0.0,0.0,15.0,0.0,0.0,YES,NO
2008,1,20,7,1925.0,1935,2148.0,2124,OH,5215,N729CA,143.0,109,34.0,24.0,-10.0,JFK,PHL,94,5.0,104.0,0,,0,0.0,0.0,24.0,0.0,0.0,YES,NO
2008,1,20,7,825.0,830,1045.0,1007,OH,5324,N933CA,140.0,97,92.0,38.0,-5.0,RDU,CVG,390,1.0,47.0,0,,0,0.0,0.0,38.0,0.0,0.0,YES,NO


In [0]:
##IATA == airport code 
## get unique count of airport codes
from pyspark.sql.functions import col,count,lit
print(airport_codes.count())
print(airport_codes.select(col("IATA")).distinct().count())
# * If they are not equal, analyze the data and identify IATA codes which are repeated more than once.
print(airport_codes.select(col("IATA")).groupby(col("IATA")).agg(count(lit(1)).alias("iata_count")). \
    filter("iata_count > 1").show())

## here grouped data on col IATA , agg function used count type aggregation ,lit(1) used to assign 1 to dataframe ,then filtering is happening 

526
524
+----+----------+
|IATA|iata_count|
+----+----------+
| Big|         3|
+----+----------+

None


In [0]:
# * Filter out the duplicates using the most appropriate one and discard others.
airport_codes.filter("IATA = 'Big'").show()
## 
## note: in filtering when we are passing column it should be only name where as filtering condition should be in string 

+-----------+------+-------+----+
|       City| State|Country|IATA|
+-----------+------+-------+----+
|       Hilo|    HI|    USA| Big|
|Kailua-Kona|Hawaii|    USA| Big|
|    Kamuela|Hawaii|    USA| Big|
+-----------+------+-------+----+



In [0]:
## find the  count the no of airports by country and states 
airportCountByState = airport_codes. \
    groupBy("Country", "State"). \
    agg(count(lit(1)).alias("IATACount")). \
    orderBy(col("IATACount").desc()).show()

## sql 
# select country,states,count(*) as IATACount from 
# airport_codes
# groupby country,states
# orderby IATACount ;

+-------+-----+---------+
|Country|State|IATACount|
+-------+-----+---------+
|    USA|   CA|       29|
|    USA|   TX|       26|
|    USA|   AK|       25|
| Canada|   BC|       22|
|    USA|   MI|       18|
|    USA|   NY|       18|
|    USA|   FL|       18|
| Canada|   ON|       18|
|    USA|   MT|       14|
| Canada|   PQ|       13|
|    USA|   PA|       13|
|    USA|   CO|       12|
|    USA|   IL|       12|
|    USA|   WY|       10|
|    USA|   NC|       10|
|    USA|   NM|        9|
|    USA|   WA|        9|
|    USA|   WI|        9|
|    USA|   GA|        9|
|    USA|   KS|        9|
+-------+-----+---------+
only showing top 20 rows



In [0]:
orders
orderItem

## join orders and orderItems 
orders_join = orders.join(
    orderItem, 
    on=orders['order_id'] == orderItem['order_item_order_id'],
    how='inner'
).show()

## note [] the use ==
## when use "" then use = only 

+-----------------+--------------------+--------+---------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_customer_id|          order_date|order_id|   order_status|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-----------------+--------------------+--------+---------------+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|            1|                  1|                  957|                  299.98|                  1|             299.98|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|            2|                  2|                 1073|                  199.99|                  1|             199.99|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|

In [0]:
#* Project all the fields from orders and then order_item_subtotal from orderitem.
orders.show(2)
orderItem.show(2)

order_item_sub=orders.join(orderItem,\
                           on=orders["order_id"] == orderItem["order_item_order_id"],\
                          how="inner").\
                          select(orders['*'], orderItem['order_item_subtotal']). \
                          show()

## its similar tp

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
+-----------------+--------------------+--------+---------------+
only showing top 2 rows

+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_product_price|order_item_quantity|order_item_subtotal|
+-------------+-------------------+---------------------+------------------------+-------------------+-------------------+
|            1|                  1|                  957|                  299.98|                  1|             299.98|
|            2|                  2|                 1073|                  199.99|     

In [0]:
# File location and type
file_location = "/FileStore/tables/customer.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
customers = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(customers)

customer_city,customer_email,customer_fname,customer_id,customer_lname,customer_password,customer_state,customer_street,customer_zipcode
Brownsville,XXXXXXXXX,Richard,1,Hernandez,XXXXXXXXX,TX,6303 Heather Plaza,78521
Littleton,XXXXXXXXX,Mary,2,Barrett,XXXXXXXXX,CO,9526 Noble Embers Ridge,80126
Caguas,XXXXXXXXX,Ann,3,Smith,XXXXXXXXX,PR,3422 Blue Pioneer Bend,725
San Marcos,XXXXXXXXX,Mary,4,Jones,XXXXXXXXX,CA,8324 Little Common,92069
Caguas,XXXXXXXXX,Robert,5,Hudson,XXXXXXXXX,PR,"""10 Crystal River Mall """,725
Passaic,XXXXXXXXX,Mary,6,Smith,XXXXXXXXX,NJ,3151 Sleepy Quail Promenade,7055
Caguas,XXXXXXXXX,Melissa,7,Wilcox,XXXXXXXXX,PR,9453 High Concession,725
Lawrence,XXXXXXXXX,Megan,8,Smith,XXXXXXXXX,MA,3047 Foggy Forest Plaza,1841
Caguas,XXXXXXXXX,Mary,9,Perez,XXXXXXXXX,PR,3616 Quaking Street,725
Stafford,XXXXXXXXX,Melissa,10,Smith,XXXXXXXXX,VA,8598 Harvest Beacon Plaza,22554


In [0]:
## left outer join 
customer_order_details_left = customers.join(
    orders, 
    on=customers['customer_id'] == orders['order_customer_id'],
    how='left_outer'
)

In [0]:
from pyspark.sql.functions import expr,sum,aggregate
customer_order_details_left. \
    groupBy('customer_id', 'customer_email'). \
    agg(sum(expr('CASE WHEN order_id IS NULL THEN 0 ELSE 1 END')).alias('order_count')). \
    orderBy('order_count', 'customer_id'). \
    count()

## case when then with agg and sum 


Out[15]: 12435

In [0]:
* Let us say we want to compare individual salary with department wise salary expense.
* Here is one of the approach which require self join.
  * Compute department wise expense usig `groupBy` and `agg`.
  * Join with **employees** again on department_id.

In [0]:
# File location and type
file_location = "/FileStore/tables/part_00000.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
employees = spark. \
    read. \
    format('csv'). \
    option('sep', '\t'). \
    schema('''employee_id INT, 
              first_name STRING, 
              last_name STRING, 
              email STRING,
              phone_number STRING, 
              hire_date STRING, 
              job_id STRING, 
              salary FLOAT,
              commission_pct STRING,
              manager_id STRING, 
              department_id STRING
            '''). \
    load(file_location)
display(employees)

employee_id,first_name,last_name,email,phone_number,hire_date,job_id,salary,commission_pct,manager_id,department_id
100,Steven,King,SKING,515.123.4567,1987-06-17,AD_PRES,24000.0,,,90.0
101,Neena,Kochhar,NKOCHHAR,515.123.4568,1989-09-21,AD_VP,17000.0,,100.0,90.0
102,Lex,De Haan,LDEHAAN,515.123.4569,1993-01-13,AD_VP,17000.0,,100.0,90.0
103,Alexander,Hunold,AHUNOLD,590.423.4567,1990-01-03,IT_PROG,9000.0,,102.0,60.0
104,Bruce,Ernst,BERNST,590.423.4568,1991-05-21,IT_PROG,6000.0,,103.0,60.0
105,David,Austin,DAUSTIN,590.423.4569,1997-06-25,IT_PROG,4800.0,,103.0,60.0
106,Valli,Pataballa,VPATABAL,590.423.4560,1998-02-05,IT_PROG,4800.0,,103.0,60.0
107,Diana,Lorentz,DLORENTZ,590.423.5567,1999-02-07,IT_PROG,4200.0,,103.0,60.0
108,Nancy,Greenberg,NGREENBE,515.124.4569,1994-08-17,FI_MGR,12000.0,,101.0,100.0
109,Daniel,Faviet,DFAVIET,515.124.4169,1994-08-16,FI_ACCOUNT,9000.0,,108.0,100.0


In [0]:
customers
employees

Out[24]: DataFrame[employee_id: int, first_name: string, last_name: string, email: string, phone_number: string, hire_date: string, job_id: string, salary: float, commission_pct: string, manager_id: string, department_id: string]

In [0]:
## question * Let us say we want to compare individual salary with department wise salary expense.
# * Here is one of the approach which require self join.
#   * Compute department wise expense usig `groupBy` and `agg`.
#   * Join with **employees** again on department_id.

In [0]:
##solution 
from pyspark.sql.functions import sum, col
department_expense = employees. \
    groupBy('department_id'). \
    agg(sum('salary').alias('expense'))
department_expense.show()

## here auto matically taken department_id gorup and shown only that group along with aggregate function 
## department wise salry expense is sum of salary departmentwise

+-------------+--------+
|department_id| expense|
+-------------+--------+
|           30| 24900.0|
|          110| 20300.0|
|          100| 51600.0|
|           70| 10000.0|
|           90| 58000.0|
|           60| 28800.0|
|           40|  6500.0|
|           20| 19000.0|
|           10|  4400.0|
|           80|304500.0|
|         null|  7000.0|
|           50|156400.0|
+-------------+--------+



In [0]:
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import rank, dense_rank
from pyspark.sql.functions import percent_rank, row_number, round
from pyspark.sql.window import Windowemployees. \
    select('employee_id', 'department_id', 'salary'). \
    join(department_expense, employees.department_id == department_expense.department_id). \
    orderBy(employees.department_id, col('salary')). \
    show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-2703131455458412>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0memployees[0m[0;34m.[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m     [0mselect[0m[0;34m([0m[0;34m'employee_id'[0m[0;34m,[0m [0;34m'department_id'[0m[0;34m,[0m [0;34m'salary'[0m[0;34m)[0m[0;34m.[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0mjoin[0m[0;34m([0m[0mdepartment_expense[0m[0;34m,[0m [0memployees[0m[0;34m.[0m[0mdepartment_id[0m [0;34m==[0m [0mdepartment_expense[0m[0;34m.[0m[0mdepartment_id[0m[0;34m)[0m[0;34m.[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m     [0morderBy[0m[0;34m([0m[0memployees[0m[0;34m.[0m[0mdepartment_id[0m[0;34m,[0m [0mcol[0m[0;34m([0m[0;34m'sala

In [0]:
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import rank, dense_rank
from pyspark.sql.functions import percent_rank, row_number, round
from pyspark.sql.window import Window

spec = Window. \
    partitionBy('department_id'). \
    orderBy(col('salary').desc())

In [0]:
from pyspark.sql.functions import rank,dense_rank,round,row_number,col
employees. \
    select('employee_id', 
           col('department_id').cast('int').alias('department_id'), 
           'salary'
          ). \
    withColumn("srank", rank().over(spec)). \
    withColumn("drank", dense_rank().over(spec)). \
    withColumn("prank", round(percent_rank().over(spec), 2)). \
    withColumn("rn", row_number().over(spec)). \
    orderBy("department_id", col("salary").desc()). \
    show(107)

+-----------+-------------+-------+-----+-----+-----+---+
|employee_id|department_id| salary|srank|drank|prank| rn|
+-----------+-------------+-------+-----+-----+-----+---+
|        178|         null| 7000.0|    1|    1|  0.0|  1|
|        200|           10| 4400.0|    1|    1|  0.0|  1|
|        201|           20|13000.0|    1|    1|  0.0|  1|
|        202|           20| 6000.0|    2|    2|  1.0|  2|
|        114|           30|11000.0|    1|    1|  0.0|  1|
|        115|           30| 3100.0|    2|    2|  0.2|  2|
|        116|           30| 2900.0|    3|    3|  0.4|  3|
|        117|           30| 2800.0|    4|    4|  0.6|  4|
|        118|           30| 2600.0|    5|    5|  0.8|  5|
|        119|           30| 2500.0|    6|    6|  1.0|  6|
|        203|           40| 6500.0|    1|    1|  0.0|  1|
|        121|           50| 8200.0|    1|    1|  0.0|  1|
|        120|           50| 8000.0|    2|    2| 0.02|  2|
|        122|           50| 7900.0|    3|    3| 0.05|  3|
|        123| 

In [0]:
## put airtrafick file and read in databricks 
## read data in spark order csv 
# File location and type
# File location and type
file_location = "/FileStore/tables/airtrafic_all_snappy-1.parquet"
file_type = "parquet"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
airtraffic = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(airtraffic)

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,IsArrDelayed,IsDepDelayed
2008,1,16,3,1725.0,1735,1959.0,2021,OH,5367,N716CA,154.0,166,146.0,-22.0,-10.0,BGR,CVG,906,1.0,7.0,0,,0,,,,,,NO,NO
2008,1,17,4,1717.0,1701,1915.0,1855,OH,4977,N967CA,118.0,114,101.0,20.0,16.0,SYR,CVG,527,2.0,15.0,0,,0,16.0,0.0,4.0,0.0,0.0,YES,YES
2008,1,17,4,1220.0,1225,1440.0,1504,OH,5352,N709CA,140.0,159,117.0,-24.0,-5.0,SAV,BOS,901,8.0,15.0,0,,0,,,,,,NO,NO
2008,1,17,4,1530.0,1530,1645.0,1637,OH,5426,N779CA,75.0,67,45.0,8.0,0.0,CVG,GRR,268,5.0,25.0,0,,0,,,,,,YES,NO
2008,1,17,4,1203.0,1205,1429.0,1429,OH,5441,N809CA,86.0,84,58.0,0.0,-2.0,STL,CVG,307,3.0,25.0,0,,0,,,,,,NO,NO
2008,1,18,5,1150.0,1150,1457.0,1524,OH,5220,N436CA,127.0,154,102.0,-27.0,0.0,STL,JFK,892,4.0,21.0,0,,0,,,,,,NO,NO
2008,1,18,5,1215.0,1009,1540.0,1251,OH,5260,N446CA,145.0,102,140.0,169.0,126.0,MCI,CVG,539,2.0,3.0,0,,0,126.0,0.0,43.0,0.0,0.0,YES,YES
2008,1,19,6,835.0,835,1145.0,1130,OH,5276,N523CA,130.0,115,83.0,15.0,0.0,TUL,CVG,646,4.0,43.0,0,,0,0.0,0.0,15.0,0.0,0.0,YES,NO
2008,1,20,7,1925.0,1935,2148.0,2124,OH,5215,N729CA,143.0,109,34.0,24.0,-10.0,JFK,PHL,94,5.0,104.0,0,,0,0.0,0.0,24.0,0.0,0.0,YES,NO
2008,1,20,7,825.0,830,1045.0,1007,OH,5324,N933CA,140.0,97,92.0,38.0,-5.0,RDU,CVG,390,1.0,47.0,0,,0,0.0,0.0,38.0,0.0,0.0,YES,NO


In [0]:
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import rank, dense_rank
from pyspark.sql.functions import percent_rank, row_number, round
from pyspark.sql.window import Window

In [0]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy(col("DepDelay").desc())

## note window the partionby  then orderby 
airtraffic. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("srank", rank().over(spec)). \
    withColumn("drank", dense_rank().over(spec)). \
    withColumn("prank", round(percent_rank().over(spec), 2)). \
    withColumn("rn", row_number().over(spec)). \
    orderBy("FlightDate", "Origin", col("DepDelay").desc()). \
    show()


#  note ranking functions are used over WIndow thing 

Out[5]: <pyspark.sql.window.WindowSpec at 0x7f470cc276a0>

In [0]:
# # note:  pyspark.sql.functions.lead(col, offset=1, default=None)[source]

#     Window function: returns the value that is offset rows after the current row, 
# and default if there is less than offset rows after the current row.
# For example, an offset of one will return the next row at any given point in the window partition.

In [0]:
from pyspark.sql.functions import col, lit, lpad, concat,lead,window,sum
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy(col("CRSDepTime"))


airtraffic. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("LeadUniqueCarrier", lead("UniqueCarrier").over(spec)). \
    withColumn("LeadFlightNum", lead("FlightNum").over(spec)). \
    withColumn("LeadCRSDepTime", lead("CRSDepTime").over(spec)). \
    withColumn("LeadDepDelay", lead("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|LeadUniqueCarrier|LeadFlightNum|LeadCRSDepTime|LeadDepDelay|
+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|               9E|         2940|          1215|          70|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|               YV|         7263|          1230|         137|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|               XE|         2578|          1410|          22|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|               9E|         2936|          1615|          34|
|  20080101|   ABE|           9E|     293

In [0]:
airtraffic. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('Origin = "ORD" AND FlightDate BETWEEN 20080101 AND 20080107'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-2937882845117736>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mairtraffic[0m[0;34m.[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m     filter("""IsDepDelayed = 'YES' 
[1;32m      3[0m               [0mAND[0m [0mCancelled[0m [0;34m=[0m [0;36m0[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m               AND concat(Year, 
[1;32m      5[0m                          [0mlpad[0m[0;34m([0m[0mMonth[0m[0;34m,[0m [0;36m2[0m[0;34m,[0m [0;34m'0'[0m[0;34m)[0m[0;34m,[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/sql/dataframe.py[0m in [0;36mwithColumn[0;34m(self, colName, col)[0m
[1;32m   2652[0m         [0;32mif[0m [0;32mnot[0m [0misinstance[0m[0;34m([0m[0mcol[0m[0;34m,[0m [0mColumn[0m[0;34m)

In [0]:
# * The default functionality of last function is to use the rows between unbounded preceding to current row. 
# We need to change the rows between to unbounded preceding to unbounded following.

In [0]:
# note: run employee df before 
from pyspark.sql.functions import last
spec = Window. \
    partitionBy('department_id'). \
    orderBy(col('salary').desc())
employees. \
    select('employee_id', 
           col('department_id').cast('int').alias('department_id'), 
           'salary'
          ). \
    withColumn("highest_salary", last('salary').over(spec)). \
    withColumn("highest_employee_id", last('employee_id').over(spec)). \
    orderBy("department_id", col("salary").desc()). \
    show()

+-----------+-------------+-------+--------------+-------------------+
|employee_id|department_id| salary|highest_salary|highest_employee_id|
+-----------+-------------+-------+--------------+-------------------+
|        178|         null| 7000.0|        7000.0|                178|
|        200|           10| 4400.0|        4400.0|                200|
|        201|           20|13000.0|       13000.0|                201|
|        202|           20| 6000.0|        6000.0|                202|
|        114|           30|11000.0|       11000.0|                114|
|        115|           30| 3100.0|        3100.0|                115|
|        116|           30| 2900.0|        2900.0|                116|
|        117|           30| 2800.0|        2800.0|                117|
|        118|           30| 2600.0|        2600.0|                118|
|        119|           30| 2500.0|        2500.0|                119|
|        203|           40| 6500.0|        6500.0|                203|
|     

Now let us get into the details related to Windowing functions.
 * Main package `pyspark.sql.window`
 * It has classes such as `Window` and `WindowSpec`
 * `Window` have APIs such as `partitionBy`, `orderBy` etc
 * These APIs (such as `partitionBy`) return `WindowSpec` object. We can pass `WindowSpec` object to over on functions such as `rank()`, `dense_rank()`, `sum()` etc
 * Syntax: `sum().over(spec)` where `spec = Window.partitionBy('ColumnName')`

## Overview of Spark Metastore

Let us get an overview of Spark Metastore and how we can leverage it to manage databases and tables on top of Big Data based file systems such as HDFS, s3 etc.

* Quite often we need to deal with structured data and the most popular way of processing structured data is by using Databases, Tables and then SQL.
* Spark Metastore (similar to Hive Metastore) will facilitate us to manage databases and tables.
* Typically Metastore is setup using traditional relational database technologies such as **Oracle**, **MySQL**, **Postgres** etc.

## Exploring Spark Catalog

Let us get an overview of Spark Catalog to manage Spark Metastore tables as well as temporary views.

 
* Let us say `spark` is of type `SparkSession`. There is an attribute as part of `spark` called as catalog and it is of type pyspark.sql.catalog.Catalog.
* We can access catalog using `spark.catalog`.
* We can permanently or temporarily create tables or views on top of data in a Data Frame.
* Metadata such as table names, column names, data types etc for the permanent tables or views will be stored in Metastore. We can access the metadata using `spark.catalog` which is exposed as part of SparkSession object.
* `spark.catalog` also provide us the details related to temporary views that are being created. Metadata of these temporary views will not be stored in Spark Metastore.
* Permanent tables are typically created using databases in spark metastore. If not specified, the tables will be created in **default** database.
* There are several methods that are part of `spark.catalog`. We will explore them in the later topics.
* Following are some of the tasks that can be performed using `spark.catalog` object.
  * Check current database and switch to different databases.
  * Create permanent table in metastore.
  * Create or drop temporary views.
  * Register functions.
* All the above tasks can be performed using SQL style commands passed to `spark.sql`.

In [0]:
help(spark.catalog)

d
## Creating Metastore Tables using catalog

Data Frames can be written into Metastore Tables using APIs such as `saveAsTable` and `insertInto` available as part of write on top of objects of type Data Frame.

* We can create a new table using Data Frame using `saveAsTable`. We can also create an empty table by using `spark.catalog.createTable` or `spark.catalog.createExternalTable`.
* We can also prefix the database name to write data into tables belonging to a particular database. If the database is not specified then the session will be attached to default database.
* We can also attach or connect the current session to a specific database using `spark.catalog.setCurrentDatabase`.
* Databases can be created using `spark.sql("CREATE DATABASE database_name")`. We can list Databases using `spark.sql` or `spark.catalog.listDatabases()`
* We can use modes such as `append`, `overwrite` and `error` with `saveAsTable`. Default is error.
* We can use modes such as `append` and `overwrite` with `insertInto`. Default is append.
* When we use `saveAsTable`, following happens:
  * Check for table if the table already exists. By default `saveAsTable` will throw exception.
  * If the table does not exists the table will be created.
  * Data from Data Frame will be copied into the table.
  * We can alter the behavior by using mode. We can overwrite the existing table or we can append into it.
* We can list the tables using `spark.catalog.listTables` after switching to appropriate database using `spark.catalog.setCurrentDatabase`.
* We can also switch the database and list tables using `spark.sql`.


spark.catalog.createTable(
    tableName,
    path=None,
    source=None,
    schema=None,
    **options,
)

In [0]:
l = [("X", )]
df = spark.createDataFrame(l, schema="dummy STRING")

In [0]:
## list the tables in catlogue 
spark.catalog.listTables()
## list databses in catalogue 
spark.catalog.currentDatabase()
print(df.show())

+-----+
|dummy|
+-----+
|    X|
+-----+

None


In [0]:
## saving tables in catlogue metastore 
## note: metastore in datbricks tied to cluster 
df.write.saveAsTable("dual", mode='overwrite')

In [0]:
## listing the tables in catalog
spark.catalog.listTables()

Out[25]: [Table(name='dual', database='default', description=None, tableType='MANAGED', isTemporary=False)]

In [0]:
## quering table is sparkcatlogue 
spark.sql('SELECT * FROM dual').show()

+-----+
|dummy|
+-----+
|    X|
+-----+



In [0]:
## writing table to catlogue 
df.write.insertInto('dual')

In [0]:
## syntax ffor creating external table 
# spark.catalog.createExternalTable(
#     tableName,
#     path=None,
#     source=None,
#     schema=None,
#     **options,
# )

In [0]:
## here we created external table in spark where path is databricks file path  and format paraquate 
airport_codes_path="/FileStore/tables/airtrafic_all_snappy-1.parquet"
spark.catalog. \
    createExternalTable("airport_codes_",
                        path=airport_codes_path,
                        source="parquet",
                        sep=",",
                        header="true",
                        inferSchema="true"
                       )

#  Create table is used for spark 2.4 onwords 

Out[39]: DataFrame[Year: int, Month: int, DayofMonth: int, DayOfWeek: int, DepTime: string, CRSDepTime: int, ArrTime: string, CRSArrTime: int, UniqueCarrier: string, FlightNum: int, TailNum: string, ActualElapsedTime: string, CRSElapsedTime: int, AirTime: string, ArrDelay: string, DepDelay: string, Origin: string, Dest: string, Distance: string, TaxiIn: string, TaxiOut: string, Cancelled: int, CancellationCode: string, Diverted: int, CarrierDelay: string, WeatherDelay: string, NASDelay: string, SecurityDelay: string, LateAircraftDelay: string, IsArrDelayed: string, IsDepDelayed: string]

In [0]:
## creating database in metastore via passing  variable of python 
username ="piyush"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {username}_hr_db")

Out[34]: DataFrame[]

In [0]:
## get current database 
spark.catalog.currentDatabase()

Out[40]: 'default'

In [0]:
## create table in metastore  with struct types 

from pyspark.sql.types import StructField, StructType, \
    IntegerType, StringType, FloatType

employeesSchema = StructType([
    StructField("employee_id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("salary", FloatType()),
    StructField("nationality", StringType())
])

spark.catalog.createTable("employees", schema=employeesSchema)

## it have created empty table employee with schema 

Out[42]: DataFrame[employee_id: int, first_name: string, last_name: string, salary: float, nationality: string]

In [0]:
## inserting the data in table 
employees = [(1, "Scott", "Tiger", 1000.0, "united states"),
             (2, "Henry", "Ford", 1250.0, "India"),
             (3, "Nick", "Junior", 750.0, "united KINGDOM"),
             (4, "Bill", "Gomes", 1500.0, "AUSTRALIA")
            ]
spark.read.table('employees').schema

employeesDF = spark.createDataFrame(employees,
    schema="""employee_id INT, first_name STRING, last_name STRING,
              salary FLOAT, nationality STRING
           """
)

employeesDF.write.insertInto("employees", overwrite=True)

spark.sql('SELECT * FROM employees').show()

+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary|   nationality|
+-----------+----------+---------+------+--------------+
|          3|      Nick|   Junior| 750.0|united KINGDOM|
|          1|     Scott|    Tiger|1000.0| united states|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|
|          2|     Henry|     Ford|1250.0|         India|
+-----------+----------+---------+------+--------------+



In [0]:
## reading the data from table metastore  and creating the dataframe 
employees_df = spark.read.table("employees")
employees_df.show()

+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary|   nationality|
+-----------+----------+---------+------+--------------+
|          3|      Nick|   Junior| 750.0|united KINGDOM|
|          1|     Scott|    Tiger|1000.0| united states|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|
|          2|     Henry|     Ford|1250.0|         India|
+-----------+----------+---------+------+--------------+



In [0]:
## Creating Partitioned Tables

We can also create partitioned tables as part of Spark Metastore Tables.
* There are some challenges in creating partitioned tables directly using `spark.catalog.createTable`.
* But if the directories are similar to partitioned tables with data, we should be able to create partitioned tables.

Q
* Let us create partitioned table for `orders` by `order_month`.

### Tasks

Let us perform tasks related to partitioned tables.
* Read data from file into data frame.
* Add additional column which will be used to partition the data.
* Use `saveAsTable` to write the data in the Dataframe to a new table in the database we are attached to. The folder related to the table will be created using default location.

#  note: creating the partition table , saving the partition table , writing the partion table to files


In [0]:

userrname="Piyush"
spark.sql(f'CREATE DATABASE IF NOT EXISTS {username}_retail')
spark.catalog.currentDatabase()

Out[51]: 'default'

In [0]:
## here we are reading orders table , then partioniong it to save the file and to the table 


# File location and type
file_location = "/FileStore/tables/orders.json"
file_type = "json"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
orders = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(orders)


order_customer_id,order_date,order_id,order_status
11599,2013-07-25 00:00:00.0,1,CLOSED
256,2013-07-25 00:00:00.0,2,PENDING_PAYMENT
12111,2013-07-25 00:00:00.0,3,COMPLETE
8827,2013-07-25 00:00:00.0,4,CLOSED
11318,2013-07-25 00:00:00.0,5,COMPLETE
7130,2013-07-25 00:00:00.0,6,COMPLETE
4530,2013-07-25 00:00:00.0,7,COMPLETE
2911,2013-07-25 00:00:00.0,8,PROCESSING
5657,2013-07-25 00:00:00.0,9,PENDING_PAYMENT
5648,2013-07-25 00:00:00.0,10,PENDING_PAYMENT


In [0]:
orders. \
    write. \
    saveAsTable(
        'orders_part_partitoned',
        mode='overwrite',
        partitionBy='orders_part_partitoned'
    )


## note: this command will create the orders_part_partitoned table with partions based on order_status you can see dbfs --> hive --> warehouse --> orders_part_partitoned files 


[0;36m  File [0;32m"<command-2997790020331999>"[0;36m, line [0;32m13[0m
[0;31m    describe table orders_part_partitoned[0m
[0m             ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
%sql
describe table orders_part_partitoned



[0;31m---------------------------------------------------------------------------[0m
[0;31mParseException[0m                            Traceback (most recent call last)
[0;32m<command-2997790020332000>[0m in [0;36m<module>[0;34m[0m
[1;32m      5[0m     [0mdisplay[0m[0;34m([0m[0mdf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      6[0m     [0;32mreturn[0m [0mdf[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 7[0;31m   [0m_sqldf[0m [0;34m=[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      8[0m [0;32mfinally[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      9[0m   [0;32mdel[0m [0m____databricks_percent_sql[0m[0;34m[0m[0;34m[0m[0m

[0;32m<command-2997790020332000>[0m in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m   [0;32mdef[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0;32mimport[0m [0m

In [0]:
spark.sql('SHOW PARTITIONS orders_part_partitoned').show()

+---------------+
|   order_status|
+---------------+
|         CLOSED|
|       CANCELED|
|       COMPLETE|
|        ON_HOLD|
| PAYMENT_REVIEW|
|        PENDING|
|PENDING_PAYMENT|
|     PROCESSING|
|SUSPECTED_FRAUD|
+---------------+



In [0]:
## creating table with partitions 
## need to find about it 
# spark. \
#     catalog. \
#     createTable('orders_part22',
#                 path=f'/user/hive/warehouse/orders_part_partitoned/order_status=COMPLETE/part-00001-459b1ee5-1147-422b-a0e1-5aafa8c28f18.c000.snappy.parquet',
#                 source='json'
#                )

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-2997790020332002>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m## creating table with partitions[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mspark[0m[0;34m.[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      3[0m     [0mcatalog[0m[0;34m.[0m[0;31m [0m[0;31m\[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m     createTable('orders_part22',
[1;32m      5[0m                 [0mpath[0m[0;34m=[0m[0;34mf'/user/hive/warehouse/orders_part_partitoned/order_status=COMPLETE/part-00001-459b1ee5-1147-422b-a0e1-5aafa8c28f18.c000.snappy.parquet'[0m[0;34m,[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/sql/catalog.py[0m in [0;36mcreateTable[0;34m(self, tableName, path, source, schema, description, **options)[0m


[0;36m  File [0;32m"<command-2997790020332001>"[0;36m, line [0;32m3[0m
[0;31m    spark.read.table(f'/user/hive/warehouse/orders_part_partitoned/order_status=COMPLETE/part-00001-459b1ee5-1147-422b-a0e1-5aafa8c28f18.c000.snappy.parquet')show()[0m
[0m                                                                                                                                                              ^[0m
[0;31mSyntaxError[0m[0;31m:[0m invalid syntax


In [0]:
## creating spark table using sql 
spark.sql("""
    CREATE TABLE orders (
        order_id INT,
        order_date STRING,
        order_customer_id INT,
        order_status STRING
    ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")


In [0]:
## using spark sql

## loading data to table using spark sql 
spark.sql("""
    LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
""")

# note: cheack dbfs path 

In [0]:
## creating temp view from dataframe  , it create temperoryu view for dataFrame 
orders.createOrReplaceTempView("orders_temp_view ")
spark.catalog.listTables()

## listing of all table in spark 

Out[72]: [Table(name='airport_codes1', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='airport_codes_', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='dual', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='employees', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='orders_part22', database='default', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='orders_part_partitoned', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='orders_temp_view', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

## Create Data Frame with Special Types
As part of this topic we will create Data Frame with special types such as `ARRAY`, `STRUCT` and `MAP`.
* We will also see how we can insert data in the data frame with special types into the Metastore table.

In [0]:
## here we are using map and struct datatype 
employees = [
     (2, "Henry", "Ford", 1250.0, 
      "India", ['henry@ford.com', 'hford@companyx.com'], 
      {"Home": "+91 234 567 8901", "Office": "+91 345 678 9012"}, 
      "456 78 9123", ('111 BCD Cir', 'Some City', 'Some State', 500091)
     ),
     (3, "Nick", "Junior", 750.0, 
      "United Kingdom", ['nick@junior.com', 'njunior@companyx.com'], 
      {"Home": "+44 111 111 1111", "Office": "+44 222 222 2222"}, 
      "222 33 4444", ('222 Giant Cly', 'UK City', 'UK Province', None)
     ),
     (4, "Bill", "Gomes", 1500.0, 
      "Australia", ['bill@gomes.com', 'bgomes@companyx.com'], 
      {"Home": "+61 987 654 3210", "Office": "+61 876 543 2109"}, 
      "789 12 6118", None
     ),
     (5, 'Harry', 'Potter', 1800.0,
      'United States', None, None, None, None
     )
]

employees_df = spark.createDataFrame(
    employees,
    schema="""employee_id INT, employee_first_name STRING, employee_last_name STRING,
        employee_salary FLOAT, employee_nationality STRING, employee_email_ids ARRAY<STRING>,
        employee_phone_numbers MAP<STRING, STRING>, employee_ssn STRING,
        employee_address STRUCT<street: STRING, city: STRING, state: STRING, postal_code: INT>
    """
)

employees_df.show(2,truncate=False)

+-----------+-------------------+------------------+---------------+--------------------+---------------------------------------+------------------------------------------------------+------------+--------------------------------------------+
|employee_id|employee_first_name|employee_last_name|employee_salary|employee_nationality|employee_email_ids                     |employee_phone_numbers                                |employee_ssn|employee_address                            |
+-----------+-------------------+------------------+---------------+--------------------+---------------------------------------+------------------------------------------------------+------------+--------------------------------------------+
|2          |Henry              |Ford              |1250.0         |India               |[henry@ford.com, hford@companyx.com]   |{Office -> +91 345 678 9012, Home -> +91 234 567 8901}|456 78 9123 |{111 BCD Cir, Some City, Some State, 500091}|
|3          |Nick           

In [0]:
## functions explode, concat_ws ,explode_outer,split,map_keys, map_values

from pyspark.sql.functions import explode,explode_outer,concat_ws,split

employees_df.select('employee_id', explode('employee_email_ids').alias('employee_id')).show(truncate=False)

## explode convert array to rows 
employees_df.select('employee_id', explode_outer('employee_email_ids').alias('employee_id')).show(truncate=False)

##* We can use `explode_outer` to get the rows where array type column have null value .
employees_df.select('employee_id', concat_ws(', ', 'employee_email_ids').alias('employee_email_ids')).show(truncate=False)
# * We can use `concat_ws` on top of email ids to convert array into delimited string ie list of email_id to comma seperated email_ids in columns .

#* We can convert delimited string into array using `split` function. We are recreating employees dataframe with email ids as string where multiple email ids are delimited #using `,`. example not given



+-----------+--------------------+
|employee_id|employee_id         |
+-----------+--------------------+
|2          |henry@ford.com      |
|2          |hford@companyx.com  |
|3          |nick@junior.com     |
|3          |njunior@companyx.com|
|4          |bill@gomes.com      |
|4          |bgomes@companyx.com |
+-----------+--------------------+

+-----------+--------------------+
|employee_id|employee_id         |
+-----------+--------------------+
|2          |henry@ford.com      |
|2          |hford@companyx.com  |
|3          |nick@junior.com     |
|3          |njunior@companyx.com|
|4          |bill@gomes.com      |
|4          |bgomes@companyx.com |
|5          |null                |
+-----------+--------------------+

+-----------+-------------------------------------+
|employee_id|employee_email_ids                   |
+-----------+-------------------------------------+
|2          |henry@ford.com, hford@companyx.com   |
|3          |nick@junior.com, njunior@companyx.com|
|4 

## Projecting Struct and Map Columns

As part of this topic we will see how to project `STRUCT` and `MAP`.

* Create list with appropriate types.
* Create Data Frame using list and define schema with relevant types.
* We will print schema as well as preview the data.
* We will then project the fields in structs and maps

In [0]:
employees = [
     (2, "Henry", "Ford", 1250.0, 
      "India", ['henry@ford.com', 'hford@companyx.com'], 
      {"Home": "+91 234 567 8901", "Office": "+91 345 678 9012"}, 
      "456 78 9123", ('111 BCD Cir', 'Some City', 'Some State', 500091)
     ),
     (3, "Nick", "Junior", 750.0, 
      "United Kingdom", ['nick@junior.com', 'njunior@companyx.com'], 
      {"Home": "+44 111 111 1111", "Office": "+44 222 222 2222"}, 
      "222 33 4444", ('222 Giant Cly', 'UK City', 'UK Province', None)
     ),
     (4, "Bill", "Gomes", 1500.0, 
      "Australia", ['bill@gomes.com', 'bgomes@companyx.com'], 
      {"Home": "+61 987 654 3210", "Office": "+61 876 543 2109"}, 
      "789 12 6118", None
     )
]

employees_df = spark.createDataFrame(
    employees,
    schema="""employee_id INT, employee_first_name STRING, employee_last_name STRING,
        employee_salary FLOAT, employee_nationality STRING, employee_email_ids ARRAY<STRING>,
        employee_phone_numbers MAP<STRING, STRING>, employee_ssn STRING,
        employee_address STRUCT<street: STRING, city: STRING, state: STRING, postal_code: INT>
    """
)

employees_df.printSchema()
employees_df.show(2,truncate=False)

root
 |-- employee_id: integer (nullable = true)
 |-- employee_first_name: string (nullable = true)
 |-- employee_last_name: string (nullable = true)
 |-- employee_salary: float (nullable = true)
 |-- employee_nationality: string (nullable = true)
 |-- employee_email_ids: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- employee_phone_numbers: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- employee_ssn: string (nullable = true)
 |-- employee_address: struct (nullable = true)
 |    |-- street: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- postal_code: integer (nullable = true)

+-----------+-------------------+------------------+---------------+--------------------+---------------------------------------+------------------------------------------------------+------------+--------------------------------------------+
|employee_id|employe

In [0]:
## how to wuery map datatype 
employees_df.select('employee_phone_numbers.Office', 'employee_phone_numbers.Home').show()

+----------------+----------------+
|          Office|            Home|
+----------------+----------------+
|+91 345 678 9012|+91 234 567 8901|
|+44 222 222 2222|+44 111 111 1111|
|+61 876 543 2109|+61 987 654 3210|
+----------------+----------------+



In [0]:
from pyspark.sql.functions import map_keys, map_values
## map_keys use to get key and map_value use to get value 
employees_df.select('employee_id', map_keys('employee_phone_numbers').alias('employee_phone_numbers_keys')).show(2)
employees_df.select('employee_id', map_values('employee_phone_numbers').alias('employee_phone_numbers_values')).show()

+-----------+---------------------------+
|employee_id|employee_phone_numbers_keys|
+-----------+---------------------------+
|          2|             [Office, Home]|
|          3|             [Office, Home]|
+-----------+---------------------------+
only showing top 2 rows

+-----------+-----------------------------+
|employee_id|employee_phone_numbers_values|
+-----------+-----------------------------+
|          2|         [+91 345 678 9012...|
|          3|         [+44 222 222 2222...|
|          4|         [+61 876 543 2109...|
+-----------+-----------------------------+



In [0]:
## we can write this table to spark metastore 
# spark.catalog.createTable('employees_complex_schema',schema="""employee_id INT, employee_first_name STRING, employee_last_name STRING,
#         employee_salary FLOAT, employee_nationality STRING, employee_email_ids ARRAY<STRING>,
#         employee_phone_numbers MAP<STRING, STRING>, employee_ssn STRING,
#         employee_address STRUCT<street: STRING, city: STRING, state: STRING, postal_code: INT>
#     """)
# employees_df.write.insertInto('employees1').option("mergeSchema", "true")

## note: we need to create employee1 table in spark, ie delta table in case of databricks 

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-529767584512684>[0m in [0;36m<module>[0;34m[0m
[1;32m      5[0m [0;31m#         employee_address STRUCT<street: STRING, city: STRING, state: STRING, postal_code: INT>[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[1;32m      6[0m [0;31m#     """)[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 7[0;31m [0memployees_df[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0minsertInto[0m[0;34m([0m[0;34m'employees1'[0m[0;34m)[0m[0;34m.[0m[0moption[0m[0;34m([0m[0;34m"mergeSchema"[0m[0;34m,[0m [0;34m"true"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/readwriter.py[0m in [0;36minsertInto[0;34m(self, tableName, overwrite)[0m
[1;32m    760[0m         [0;32mif[0m [0moverwrite[0m [0;32mis[0m [0;32mnot[0m [0;32mNone[0m[0;34m:[0m