DataFrames 

Built on top of RDDs

    - In - memory, partitioned, rad-only & resilient 

Imposes a tabular structure on the data 


In [116]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
from IPython.display import *


data_path = '../data/'
display(HTML('<style>pre {white-space: pre !important; }</style>'))

In [117]:
spark = (
    
            SparkSession
            .builder
            .appName('SparkCourse') 
            .master('local[*]') 
            .config('spark.dynamicAllocation.enabled', 'false')
            .config('spark.sql.adaptive.enabled', 'false')
            .getOrCreate()
    )

sc = spark.sparkContext

spark

CREATE DF

In [118]:
data = [
    [1, 'Neha' ,10000],
    [2, 'Steve', 20000],
    [3, 'Kari', 30000],
    [4, 'Ivan', 40000],
    [5, 'Mohit', 50000],
]
    

#RDD
employees_rdd = sc.parallelize(data)

In [119]:
#DF
employees_df =  employees_rdd.toDF(['id', 'name', 'salary'])

In [120]:
employees_df = (spark
                    .createDataFrame
                    (
                        data,
                        "id: long, name: string, salary: long"
                    ))

employees_df.show()

+---+-----+------+
| id| name|salary|
+---+-----+------+
|  1| Neha| 10000|
|  2|Steve| 20000|
|  3| Kari| 30000|
|  4| Ivan| 40000|
|  5|Mohit| 50000|
+---+-----+------+



READ DATA

In [122]:
taxi_cabs_df = spark.read.option('header', 'true').csv( data_path + 'Cabs.csv')

AGGREGATION

In [123]:
cabs_by_ws_df = taxi_cabs_df.groupBy('Website').count()

In [124]:
cabs_by_ws_df.show(10)

+--------------------+-----+
|             Website|count|
+--------------------+-----+
|WWW.ECONOMYCARSER...|    2|
|    WWW.AREASTWO.COM|    6|
|WWW.ACCREDITEDLIM...|    1|
|     WWW.VRWWINC.COM|    2|
|     WWW.CAR8888.COM|   57|
|         ZWEINYC.COM|   52|
|NEWTOENPRIVATECAR...|    1|
|WWW.CAPRICECARSER...|   17|
|      PARADALIMO.COM|    3|
|WWW.NYINSURANCEB.COM|   18|
+--------------------+-----+
only showing top 10 rows



In [125]:
cabs_by_ws_df_filtered = cabs_by_ws_df.where('Website=="ZWEINYC.COM"')

In [126]:
cabs_by_ws_df_filtered.show(10, truncate=False)

+-----------+-----+
|Website    |count|
+-----------+-----+
|ZWEINYC.COM|52   |
+-----------+-----+



In [127]:
yellow_taxi_df = (
    spark
        .read
        .option('header', 'true')
        .option('inferSchema', 'true')
        .csv(path=(data_path 
            + 'YellowTaxis_202210.csv'))
            
)

                                                                                


CREATE SCHEMA:

prod - manually  
test - infer(dynamic) 

    (StructType([\
        StructField('name', Type(), Nullable),\
        ....\
    ]))



In [129]:
yellow_taxi_df = (
    yellow_taxi_df
        .where('passenger_count > 0')
        .filter(F.col('trip_distance') > 0.0)
)

QUICK ANALYZE

In [130]:
yellow_taxi__analyze_df = (
    yellow_taxi_df.describe(
        'passenger_count',
        'trip_distance'
    ))

DROP NA

In [81]:
yellow_taxi_df = yellow_taxi_df.na.drop('all')

FILL NA

In [82]:
default_value_map = {
    'payment_type': 5,
    'RateCodeID': 1 
}

yellow_taxi_df = yellow_taxi_df.na.fill(default_value_map)

In [83]:
yellow_taxi_df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = false)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = false)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [84]:
yellow_taxi_df = yellow_taxi_df.where(
    "tpep_pickup_datetime >= '2022-10-01' AND tpep_dropoff_datetime < '2022-11-01' "
)

In [85]:
yellow_taxi_df.count()

                                                                                

3414348

TRANSFORMATIONS

In [86]:
yellow_taxi_df = (
    yellow_taxi_df.select(
        'VendorID',
        F.col('passenger_count').cast(IntegerType()),
        F.column('trip_distance').alias('tripDistrance'),
        yellow_taxi_df.tpep_pickup_datetime,
        'tpep_dropoff_datetime',
        'PULocationID',
        'DOLocationID',
        'RatecodeID',
        'total_amount',
        'payment_type'
    )
)

In [114]:
yellow_taxi_df = (
    yellow_taxi_df.withColumn('TripYear', F.year(F.col('tpep_pickup_datetime')))
    .select(
        '*',
        F.expr('month(tpep_pickup_datetime) AS TripMonth'),
        F.dayofmonth(F.col('tpep_pickup_datetime')).alias('Tripday')
    )
)


In [115]:
yellow_taxi_df.show(10, truncate=False)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------+---------+-------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|TripYear|TripMonth|Tripday|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------+---------+-------+
|1       |2022-10-01 03:03:41 |2022-10-01 03:18:39  |1.0            |1.7          |1.0       |N           

In [96]:
trip_time_in_second = F.unix_timestamp(F.col('tpep_dropoff_datetime')) - F.unix_timestamp(F.col('tpep_pickup_datetime'))

trip_time_in_minutes = F.round(trip_time_in_second)/60

yellow_taxi_df = (
    yellow_taxi_df.withColumn('TripTimeMinutes', trip_time_in_minutes)
)

In [97]:
yellow_taxi_df.show(10, truncate=False)

+--------+---------------+-------------+--------------------+---------------------+------------+------------+----------+------------+------------+--------+---------+-------+------------------+
|VendorID|passenger_count|tripDistrance|tpep_pickup_datetime|tpep_dropoff_datetime|PULocationID|DOLocationID|RatecodeID|total_amount|payment_type|TripYear|TripMonth|Tripday|TripTimeMinutes   |
+--------+---------------+-------------+--------------------+---------------------+------------+------------+----------+------------+------------+--------+---------+-------+------------------+
|1       |1              |1.7          |2022-10-01 03:03:41 |2022-10-01 03:18:39  |249         |107         |1.0       |15.95       |1           |2022    |10       |1      |14.966666666666667|
|2       |2              |0.72         |2022-10-01 03:14:30 |2022-10-01 03:19:48  |151         |238         |1.0       |9.3         |2           |2022    |10       |1      |5.3               |
|2       |1              |1.74     

    -*unix_timestamp returns second

In [106]:
trip_type_column = (
    F.when(
        F.col('RatecodeID') == 6,
        'SharedTrip'
    ).otherwise('SoloTrip')
)

yellow_taxi_df = (
    yellow_taxi_df.withColumn('TripType', trip_type_column)
)

FUNC OVERVIEW \
    Date & TIME  
        - date_add, date_fromat, next_day, to_date \
        - current_timestamp, to_timestamp  \
    Mathematical  
        - round, ceil, floor, log, sqrt \
    String \
        - lower, upper, length, substring, trim, split \
    Array \
        - concat, array_contains, array_join, filter, explode \
    More \
        - when, least, greatest, isnull


In [107]:
yellow_taxi_df.where('TripType != "SoloTrip"').show()



+--------+---------------+-------------+--------------------+---------------------+------------+------------+----------+------------+------------+--------+---------+-------+-------------------+----------+
|VendorID|passenger_count|tripDistrance|tpep_pickup_datetime|tpep_dropoff_datetime|PULocationID|DOLocationID|RatecodeID|total_amount|payment_type|TripYear|TripMonth|Tripday|    TripTimeMinutes|  TripType|
+--------+---------------+-------------+--------------------+---------------------+------------+------------+----------+------------+------------+--------+---------+-------+-------------------+----------+
|       1|              1|          0.2| 2022-10-01 23:26:48|  2022-10-01 23:27:11|         138|         138|       6.0|         6.3|           2|    2022|       10|      1|0.38333333333333336|SharedTrip|
|       2|              1|         0.02| 2022-10-04 01:00:19|  2022-10-04 01:00:27|         181|          97|       6.0|        -6.3|           4|    2022|       10|      4|0.13333

                                                                                

CHECK EXECUTION PLANS

In [109]:
yellow_taxi_df.explain( mode = 'extended')

### MODES - simple, codegen, cost, fromatted

== Parsed Logical Plan ==
'Project [VendorID#1890, passenger_count#2090, tripDistrance#2089, tpep_pickup_datetime#1891, tpep_dropoff_datetime#1892, PULocationID#1897, DOLocationID#1898, RatecodeID#2025, total_amount#1906, payment_type#2026, TripYear#2101, TripMonth#2113, Tripday#2114, TripTimeMinutes#2293, CASE WHEN ('RatecodeID = 6) THEN SharedTrip ELSE SoloTrip END AS TripType#2576]
+- Project [VendorID#1890, passenger_count#2090, tripDistrance#2089, tpep_pickup_datetime#1891, tpep_dropoff_datetime#1892, PULocationID#1897, DOLocationID#1898, RatecodeID#2025, total_amount#1906, payment_type#2026, TripYear#2101, TripMonth#2113, Tripday#2114, TripTimeMinutes#2293, CASE WHEN (VendorID#1890 = 6) THEN SharedTrip ELSE SoloTrip END AS TripType#2400]
   +- Project [VendorID#1890, passenger_count#2090, tripDistrance#2089, tpep_pickup_datetime#1891, tpep_dropoff_datetime#1892, PULocationID#1897, DOLocationID#1898, RatecodeID#2025, total_amount#1906, payment_type#2026, TripYear#2101, TripMonth#2

HANDLING CORRUPT DATA

In [140]:
rates_codes_json_df = (
    spark.read
    #.option('mode', 'PERMISSIVE')
    #.option('mode', 'FAILFAST')
    .option('mode', 'DROPMALFORMED')
    .option('columnNameOfCorruptRecord', 'CorruptData')
    .json(data_path + 'RateCodes.json')
)

In [141]:
rates_codes_json_df.show(truncate=False)

+---------------+----------+
|RateCode       |RateCodeID|
+---------------+----------+
|Standard rate  |1         |
|JFK            |2         |
|Newark         |3         |
|Negotiated fare|5         |
|Group ride     |6         |
+---------------+----------+



DATA TO FILES

In [144]:
yellow_taxi_df

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: int, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

In [145]:
yellow_taxi_df.rdd.getNumPartitions()

8

In [146]:
yellow_taxi_df = yellow_taxi_df.coalesce(1)

In [147]:
yellow_taxi_df.rdd.getNumPartitions()

1

In [148]:
(
    yellow_taxi_df
        .write
        #.partitionBy('column')
        .option('header', 'true')
        .option('dateFormat', 'yyyy-MM-dd HH:mm:ss.S')
        .mode('overwrite') # options - Append, ErroIfExists, Ignore(do nothing if exists)
        .csv(data_path + 'results/yellow_taxi_out.csv')
)

                                                                                