In [6]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Data Processing - Overview'). \
    master('yarn'). \
    getOrCreate()

### Different methods of adding schema




___DDL method___

In [2]:
ordersDf = spark. \
    read. \
    csv('/public/retail_db/orders',
        schema='''
            order_id INT, 
            order_date STRING, 
            order_customer_id INT, 
            order_status STRING
        '''
       )

In [3]:
ordersDf.show(10)

+--------+--------------------+-----------------+---------------+
|order_id|          order_date|order_customer_id|   order_status|
+--------+--------------------+-----------------+---------------+
|       1|2013-07-25 00:00:...|            11599|         CLOSED|
|       2|2013-07-25 00:00:...|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|            12111|       COMPLETE|
|       4|2013-07-25 00:00:...|             8827|         CLOSED|
|       5|2013-07-25 00:00:...|            11318|       COMPLETE|
|       6|2013-07-25 00:00:...|             7130|       COMPLETE|
|       7|2013-07-25 00:00:...|             4530|       COMPLETE|
|       8|2013-07-25 00:00:...|             2911|     PROCESSING|
|       9|2013-07-25 00:00:...|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|             5648|PENDING_PAYMENT|
+--------+--------------------+-----------------+---------------+
only showing top 10 rows



In [4]:
ordersDf.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [5]:
flightSchemaDDL = """FL_DATE DATE, OP_CARRIER STRING, OP_CARRIER_FL_NUM INT, ORIGIN STRING, 
      ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING, CRS_DEP_TIME INT, DEP_TIME INT, 
      WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT, ARR_TIME INT, CANCELLED INT, DISTANCE INT"""

flightTimeJsonDF = spark.read \
    .format("json") \
    .schema(flightSchemaDDL) \
    .option("dateFormat", "M/d/y") \
    .load("/user/itv736079/flightdata/flight*.json")

flightTimeJsonDF.printSchema()

root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)



In [6]:
flightTimeJsonDF2 = spark.read \
    .format("json") \
    .option("dateFormat", "M/d/y") \
    .load("/user/itv736079/flightdata/flight*.json")

flightTimeJsonDF2.printSchema()

root
 |-- ARR_TIME: long (nullable = true)
 |-- CANCELLED: long (nullable = true)
 |-- CRS_ARR_TIME: long (nullable = true)
 |-- CRS_DEP_TIME: long (nullable = true)
 |-- DEP_TIME: long (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DISTANCE: long (nullable = true)
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: long (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- TAXI_IN: long (nullable = true)
 |-- WHEELS_ON: long (nullable = true)



In [7]:
flightTimeJsonDF.show(5, False)

+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|FL_DATE   |OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME|DEST|DEST_CITY_NAME|CRS_DEP_TIME|DEP_TIME|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|CANCELLED|DISTANCE|
+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|2000-01-01|DL        |1451             |BOS   |Boston, MA      |ATL |Atlanta, GA   |1115        |1113    |1343     |5      |1400        |1348    |0        |946     |
|2000-01-01|DL        |1479             |BOS   |Boston, MA      |ATL |Atlanta, GA   |1315        |1311    |1536     |7      |1559        |1543    |0        |946     |
|2000-01-01|DL        |1857             |BOS   |Boston, MA      |ATL |Atlanta, GA   |1415        |1414    |1642     |9      |1721        |1651    |0        |946     

In [8]:
flightTimeJsonDF2.show(5)

+--------+---------+------------+------------+--------+----+--------------+--------+--------+----------+-----------------+------+----------------+-------+---------+
|ARR_TIME|CANCELLED|CRS_ARR_TIME|CRS_DEP_TIME|DEP_TIME|DEST|DEST_CITY_NAME|DISTANCE| FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME|TAXI_IN|WHEELS_ON|
+--------+---------+------------+------------+--------+----+--------------+--------+--------+----------+-----------------+------+----------------+-------+---------+
|    1348|        0|        1400|        1115|    1113| ATL|   Atlanta, GA|     946|1/1/2000|        DL|             1451|   BOS|      Boston, MA|      5|     1343|
|    1543|        0|        1559|        1315|    1311| ATL|   Atlanta, GA|     946|1/1/2000|        DL|             1479|   BOS|      Boston, MA|      7|     1536|
|    1651|        0|        1721|        1415|    1414| ATL|   Atlanta, GA|     946|1/1/2000|        DL|             1857|   BOS|      Boston, MA|      9|     1642|
|    2005|

In [9]:
flightTimeParquetDF = spark.read \
    .format("parquet") \
    .load("/user/itv736079/flightdata/flight*.parquet")

In [10]:
flightTimeParquetDF.printSchema()

root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)



___For JSON, parquet, Avro data, schema definition can be implicit, as can be seen from above___

### Programmatically defining schema

In [11]:
from pyspark.sql.types import *

flightSchemaStruct = StructType([
    StructField("FL_DATE", DateType(), False),
    StructField("OP_CARRIER", StringType()),
    StructField("OP_CARRIER_FL_NUM", IntegerType()),
    StructField("ORIGIN", StringType()),
    StructField("ORIGIN_CITY_NAME", StringType()),
    StructField("DEST", StringType()),
    StructField("DEST_CITY_NAME", StringType()),
    StructField("CRS_DEP_TIME", IntegerType()),
    StructField("DEP_TIME", IntegerType()),
    StructField("WHEELS_ON", IntegerType()),
    StructField("TAXI_IN", IntegerType()),
    StructField("CRS_ARR_TIME", IntegerType()),
    StructField("ARR_TIME", IntegerType()),
    StructField("CANCELLED", IntegerType()),
    StructField("DISTANCE", IntegerType())
])

flightTimeCsvDF = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(flightSchemaStruct) \
    .option("mode", "FAILFAST") \
    .option("dateFormat", "M/d/y") \
    .load("/user/itv736079/flightdata/flight*.csv")

flightTimeCsvDF.printSchema()

root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)



### Create DataFrame from RDD

___Using `rdd.toDF()`___

In [12]:
dept = [("Finance",10),("Marketing",20),("Sales",30),("IT",40)]

deptRdd = spark.sparkContext.parallelize(dept)

In [13]:
deptRdd.collect()

[('Finance', 10), ('Marketing', 20), ('Sales', 30), ('IT', 40)]

In [14]:
deptDf = deptRdd.toDF()

In [15]:
deptDf.show()

+---------+---+
|       _1| _2|
+---------+---+
|  Finance| 10|
|Marketing| 20|
|    Sales| 30|
|       IT| 40|
+---------+---+



In [16]:
deptDf2 = deptRdd.toDF(["dept_name", "dept_id"]) 

In [17]:
deptDf2.show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



___Using `spark.createDataframe(rdd, schema)`___

In [18]:
deptDf3 = spark.createDataFrame(deptRdd, schema = ["dept_name", "dept_id"])

In [19]:
deptDf3.show()

+---------+-------+
|dept_name|dept_id|
+---------+-------+
|  Finance|     10|
|Marketing|     20|
|    Sales|     30|
|       IT|     40|
+---------+-------+



### Create DataFrame from a local list

In [20]:
data_list = [("Ravi", "28", "1", "2002"),
             ("Abdul", "23", "5", "81"),
             ("John", "12", "12", "6"),
             ("Rosy", "7", "8", "63"),
             ("Abdul", "23", "5", "81")
            ]

raw_df = spark.createDataFrame(data_list).toDF("Name", "Day", "Month", "Year")

raw_df.show()

+-----+---+-----+----+
| Name|Day|Month|Year|
+-----+---+-----+----+
| Ravi| 28|    1|2002|
|Abdul| 23|    5|  81|
| John| 12|   12|   6|
| Rosy|  7|    8|  63|
|Abdul| 23|    5|  81|
+-----+---+-----+----+



In [15]:
normal_list = [10, 20, 30, 40, 50]

In [16]:
from pyspark.sql.types import IntegerType

dffromList = spark.createDataFrame(normal_list, IntegerType())

In [17]:
dffromList.show()

+-----+
|value|
+-----+
|   10|
|   20|
|   30|
|   40|
|   50|
+-----+



### Count the number of records in each partition

In [21]:
%%sh

hdfs dfs -cat /public/airtraffic_all/airtraffic/part-00000 | head

Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay,IsArrDelayed,IsDepDelayed
1987,10,14,3,741,730,912,849,PS,1451,NA,91,79,NA,23,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,YES
1987,10,15,4,729,730,903,849,PS,1451,NA,94,79,NA,14,-1,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,NO
1987,10,17,6,741,730,918,849,PS,1451,NA,97,79,NA,29,11,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,YES
1987,10,18,7,729,730,847,849,PS,1451,NA,78,79,NA,-2,-1,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,NO,NO
1987,10,19,1,749,730,922,849,PS,1451,NA,93,79,NA,33,19,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,YES,YES
1987,10,21,3,728,730,848,849,PS,1451,NA,80,79,NA,-1,-2,SAN,SFO,447,NA,NA,0,NA,0,NA,NA,NA,NA,NA,NO,NO
1987,10,22,4,728,730,852,849,PS,1451,NA,84,79,NA,3,-2,SAN,SFO

cat: Unable to write to output stream.


In [22]:
airtraffic_path = "/public/airtraffic_all/airtraffic-part/flightmonth=200801"

airtrafficSchema = spark. \
    read. \
    parquet(airtraffic_path).\
    schema

In [23]:
airtrafficDf = spark.read. \
    format("csv"). \
    option("header", "true"). \
    schema(airtrafficSchema). \
    option("path", "/public/airtraffic_all/airtraffic/").\
    load()

In [24]:
# airtrafficDf.count()

1235347780

In [25]:
# from pyspark.sql.functions import spark_partition_id

# airtrafficDf. \
#     groupBy(spark_partition_id()). \
#     count()

SPARK_PARTITION_ID(),count
1580,648743
1645,667411
463,644980
1088,664832
833,627038
471,667936
1591,669356
148,659779
496,665848
1238,645313


### How to refer a column in a Dataframe/Dataset?

___Column string notation___

In [26]:
ordersDf.select("order_id", "order_status").show(5)

+--------+---------------+
|order_id|   order_status|
+--------+---------------+
|       1|         CLOSED|
|       2|PENDING_PAYMENT|
|       3|       COMPLETE|
|       4|         CLOSED|
|       5|       COMPLETE|
+--------+---------------+
only showing top 5 rows



___Column object notation___

In [27]:
from pyspark.sql.functions import *

ordersDf.select(col("order_id"), col("order_status")).show(5)

+--------+---------------+
|order_id|   order_status|
+--------+---------------+
|       1|         CLOSED|
|       2|PENDING_PAYMENT|
|       3|       COMPLETE|
|       4|         CLOSED|
|       5|       COMPLETE|
+--------+---------------+
only showing top 5 rows



___Column object expressions method___

In [28]:
airtrafficDf.select("Origin", "Dest", 
                    to_date(
                            concat("year", 
                                   lpad("Month", 2, "0"), 
                                   lpad("DayOfMonth", 2, "0")
                                   ) , 'yyyyMMdd'
                            ).alias("FlightDate")) \
            .show(5)

+------+----+----------+
|Origin|Dest|FlightDate|
+------+----+----------+
|   MCI| ORD|2001-08-03|
|   MCI| ORD|2001-08-04|
|   MCI| ORD|2001-08-05|
|   MCI| ORD|2001-08-06|
|   MCI| ORD|2001-08-07|
+------+----+----------+
only showing top 5 rows



___String expression or SQL expression method___

In [29]:
airtrafficDf.select("Origin", "Dest", expr("""to_date(
                                                      concat(Year, 
                                                             lpad(Month, 2, 0), 
                                                             lpad(DayofMonth, 2, 0)
                                                             ), 'yyyyMMdd') as FlightDate""")) \
            .show(5)

+------+----+----------+
|Origin|Dest|FlightDate|
+------+----+----------+
|   MCI| ORD|2001-08-03|
|   MCI| ORD|2001-08-04|
|   MCI| ORD|2001-08-05|
|   MCI| ORD|2001-08-06|
|   MCI| ORD|2001-08-07|
+------+----+----------+
only showing top 5 rows



### Find duplicates in a DataFrame

In [8]:
data1 = [10, 20, 20, 30, 30, 30, 40]

In [12]:
from pyspark.sql.types import IntegerType

dfWithDuplData = spark.createDataFrame(data1, IntegerType())

In [13]:
dfWithDuplData.show()

+-----+
|value|
+-----+
|   10|
|   20|
|   20|
|   30|
|   30|
|   30|
|   40|
+-----+



In [21]:
duplicateCount = dfWithDuplData. \
    groupBy("value"). \
    count(). \
    filter("count > 1")

duplicateCount.show()

+-----+-----+
|value|count|
+-----+-----+
|   20|    2|
|   30|    3|
+-----+-----+



In [22]:
dfWithOutDup = dfWithDuplData. \
    dropDuplicates()

dfWithOutDup.show()

+-----+
|value|
+-----+
|   40|
|   20|
|   10|
|   30|
+-----+

