In [0]:
file_path = 'dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-12.csv.gz'
df = spark.read.options(header=True,inferSchema=True).csv(file_path)

In [0]:
# Get the number of partitions currently
df.rdd.getNumPartitions()

# 1 Partition because our dataset is smalll

1

In [0]:
# Repartition our dataframe
df = df.repartition(200)

# Shuffle operations (like joins) MIGHT set our partiions to 200 by default

In [0]:
# Repartition our dataframe
df = df.repartition(4)

In [0]:
# Repartition will ALWAYS shuffle all of our partitions
# Repartition -> Full Shuffle

# Coalesce -> Joins adjacent partitions, will not fully shuffle our data
df = df.coalesce(4)

In [0]:
# When we join we will do a shuffle -> 200
# If the partitions are too small it will automatically coalesce

In [0]:
display(dbutils.fs.ls('/databricks-datasets/nyctaxi/taxizone/taxi_payment_type.csv'))

path,name,size,modificationTime
dbfs:/databricks-datasets/nyctaxi/taxizone/taxi_payment_type.csv,taxi_payment_type.csv,93,1590524947000


In [0]:
payment_types = spark.read.options(header=True,inferSchema=True).csv('dbfs:/databricks-datasets/nyctaxi/taxizone/taxi_payment_type.csv')

In [0]:
payment_types.show(5)

+------------+------------+
|payment_type|payment_desc|
+------------+------------+
|           1| Credit card|
|           2|        Cash|
|           3|   No Charge|
|           4|     Dispute|
|           5|     Unknown|
+------------+------------+
only showing top 5 rows



In [0]:
df = df.join(other=payment_types, on='payment_type', how='inner')
# left = df   right = payment_types
# Default join is inner join anyways

# Same as above for on=...
# f = df.join(other=payment_types, on='df.payment_type = payment_types.payment_type', how='inner')

# If our df was bigger with like 200 partitions. Spark would automatically broadcast the payment_types

In [0]:
# Aggregating is like in SQL
df.groupBy('payment_desc').sum().show(5)

# Can use toPandas() for neater
df.groupBy('payment_desc').sum().toPandas()

+------------+-----------------+-------------+--------------------+--------------------+---------------+-----------------+-----------------+-------------------+-----------------+------------------+--------------------+------------------+--------------------------+-------------------+-------------------------+
|payment_desc|sum(payment_type)|sum(VendorID)|sum(passenger_count)|  sum(trip_distance)|sum(RatecodeID)|sum(PULocationID)|sum(DOLocationID)|   sum(fare_amount)|       sum(extra)|      sum(mta_tax)|     sum(tip_amount)| sum(tolls_amount)|sum(improvement_surcharge)|  sum(total_amount)|sum(congestion_surcharge)|
+------------+-----------------+-------------+--------------------+--------------------+---------------+-----------------+-----------------+-------------------+-----------------+------------------+--------------------+------------------+--------------------------+-------------------+-------------------------+
|   No Charge|           111267|        44524|               48326|

Unnamed: 0,payment_desc,sum(payment_type),sum(VendorID),sum(passenger_count),sum(trip_distance),sum(RatecodeID),sum(PULocationID),sum(DOLocationID),sum(fare_amount),sum(extra),sum(mta_tax),sum(tip_amount),sum(tolls_amount),sum(improvement_surcharge),sum(total_amount),sum(congestion_surcharge)
0,No Charge,111267,44524,48326,92758.82,46002,5850554,5790347,319730.9,63596.0,10322.48,-341.35,11060.25,6576.3,397468.2,41632.75
1,Cash,3824200,3212374,3068364,5438873.87,2035117,307650413,302849698,24618660.0,1955915.55,947670.1,564.51,555522.9,570659.1,31477520.0,4188929.25
2,Dispute,80552,29799,27384,50700.73,23846,3206112,3133372,421144.7,20321.0,264.32,-59.13,1909.98,237.9,423252.1,-187.5
3,Credit card,4875971,8120698,7472144,14589040.69,5190452,804931041,795250415,66690870.0,5461100.4,2420148.26,15475620.0,2017400.0,1462245.0,101212800.0,11460154.5
4,Unknown,5,1,1,1.3,1,246,161,10.0,2.5,0.5,2.0,0.0,0.3,15.3,2.5


In [0]:
# Aggregate by multiple columns

# You can order by in here unlike SQL because theres no weird order of operation stuff

df.groupBy(['payment_desc', 'VendorID']).sum('fare_amount').orderBy('sum(fare_amount)').show()

# Can rename columnss and use name for orderBy
# withColumn() can add or replace a column
df.groupBy(['payment_desc', 'VendorID']).sum('fare_amount').withColumnRenamed('sum(fare_amount)','revenue').withColumn('VendorID 2', df.VendorID + 10).orderBy('revenue', ascending=False).show()

+------------+--------+-------------------+
|payment_desc|VendorID|   sum(fare_amount)|
+------------+--------+-------------------+
|     Dispute|       2|         -121688.98|
|   No Charge|       2|          -73507.19|
|     Unknown|       1|               10.0|
|   No Charge|       1| 393238.07999999856|
|     Dispute|       1|  542833.7099999997|
|        Cash|       1|  7645364.220000005|
|        Cash|       2|      1.697329263E7|
| Credit card|       1|2.186538855000178E7|
| Credit card|       2|4.482548143999999E7|
+------------+--------+-------------------+

+------------+--------+-------------------+----------+
|payment_desc|VendorID|            revenue|VendorID 2|
+------------+--------+-------------------+----------+
| Credit card|       2|4.482548143999999E7|        12|
| Credit card|       1|2.186538855000178E7|        11|
|        Cash|       2|      1.697329263E7|        12|
|        Cash|       1|  7645364.220000005|        11|
|     Dispute|       1|  542833.7099999997

In [0]:
# agg() - Global aggregate
# Aggregate on the entire DataFrame without groups (shorthand for df.groupBy().agg()).
df.agg({'fare_amount':'max'}).show()

+----------------+
|max(fare_amount)|
+----------------+
|       398468.38|
+----------------+



In [0]:
from pyspark.sql import functions as sf

df.agg(sf.approx_count_distinct(df.PULocationID)).show()

+-----------------------------------+
|approx_count_distinct(PULocationID)|
+-----------------------------------+
|                                262|
+-----------------------------------+



In [0]:
df.agg(sf.countDistinct(df.PULocationID)).show()

+----------------------------+
|count(DISTINCT PULocationID)|
+----------------------------+
|                         261|
+----------------------------+

