In [0]:
from pyspark.sql.functions import col, lit, expr, when
from pyspark.sql.types import *
from datetime import datetime
import time

In [0]:
# A list of folders containing sample datasets we can use
# display(dbutils.fs.ls("samples")
# dbutils.fs.head("/Volumes/main/default/my-volume/data.csv", 25)
# dbutils.fs.ls("/Volumes/main")
# Define schema
nyc_schema = StructType([
  StructField('Vendor', StringType(), True),
  StructField('Pickup_DateTime', TimestampType(), True),
  StructField('Dropoff_DateTime', TimestampType(), True),
  StructField('Passenger_Count', IntegerType(), True),
  StructField('Trip_Distance', DoubleType(), True),
  StructField('Pickup_Longitude', DoubleType(), True),
  StructField('Pickup_Latitude', DoubleType(), True),
  StructField('Rate_Code', StringType(), True),
  StructField('Store_And_Forward', StringType(), True),
  StructField('Dropoff_Longitude', DoubleType(), True),
  StructField('Dropoff_Latitude', DoubleType(), True),
  StructField('Payment_Type', StringType(), True),
  StructField('Fare_Amount', DoubleType(), True),
  StructField('Surcharge', DoubleType(), True),
  StructField('MTA_Tax', DoubleType(), True),
  StructField('Tip_Amount', DoubleType(), True),
  StructField('Tolls_Amount', DoubleType(), True),
  StructField('Total_Amount', DoubleType(), True)
])
 
rawDF = spark.read.format('csv').options(header=True).schema(nyc_schema).load("dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-12.csv.gz")
 


In [0]:
rawDF.printSchema()

root
 |-- Vendor: string (nullable = true)
 |-- Pickup_DateTime: timestamp (nullable = true)
 |-- Dropoff_DateTime: timestamp (nullable = true)
 |-- Passenger_Count: integer (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Pickup_Longitude: double (nullable = true)
 |-- Pickup_Latitude: double (nullable = true)
 |-- Rate_Code: string (nullable = true)
 |-- Store_And_Forward: string (nullable = true)
 |-- Dropoff_Longitude: double (nullable = true)
 |-- Dropoff_Latitude: double (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Fare_Amount: double (nullable = true)
 |-- Surcharge: double (nullable = true)
 |-- MTA_Tax: double (nullable = true)
 |-- Tip_Amount: double (nullable = true)
 |-- Tolls_Amount: double (nullable = true)
 |-- Total_Amount: double (nullable = true)



In [0]:
%sql
CREATE DATABASE IF NOT EXISTS taxidata;
DROP TABLE IF EXISTS taxidata.taxi_2019_12;

In [0]:
rawDF.write.mode("overwrite").saveAsTable("taxidata.taxi_2019_12")

In [0]:
%fs ls dbfs:/user/hive/warehouse/taxidata.db/taxi_2019_12/

path,name,size,modificationTime
dbfs:/user/hive/warehouse/taxidata.db/taxi_2019_12/_delta_log/,_delta_log/,0,0
dbfs:/user/hive/warehouse/taxidata.db/taxi_2019_12/part-00000-0488a82c-aa58-4710-b473-56a559122c3d-c000.snappy.parquet,part-00000-0488a82c-aa58-4710-b473-56a559122c3d-c000.snappy.parquet,144401955,1731223064000


In [0]:
%fs
rm -r /delta/taxi

In [0]:

processedDF = rawDF.withColumn('Year', expr('cast(year(Pickup_DateTime) as int)')).withColumn('Month', expr('cast(month(Pickup_DateTime) as int)')) 
processedDF.write.format('delta').mode('append').partitionBy('Year','Month').save("/delta/taxi")

In [0]:
%fs
ls dbfs:/delta/taxi

path,name,size,modificationTime
dbfs:/delta/taxi/Year=2008/,Year=2008/,0,0
dbfs:/delta/taxi/Year=2009/,Year=2009/,0,0
dbfs:/delta/taxi/Year=2019/,Year=2019/,0,0
dbfs:/delta/taxi/Year=2020/,Year=2020/,0,0
dbfs:/delta/taxi/Year=2026/,Year=2026/,0,0
dbfs:/delta/taxi/Year=2058/,Year=2058/,0,0
dbfs:/delta/taxi/Year=2066/,Year=2066/,0,0
dbfs:/delta/taxi/Year=2090/,Year=2090/,0,0
dbfs:/delta/taxi/_delta_log/,_delta_log/,0,0


In [0]:
%fs
ls dbfs:/delta/taxi/Year=2019/Month=12/

path,name,size,modificationTime
dbfs:/delta/taxi/Year=2019/Month=12/part-00000-249ddcc9-619a-4fa6-a0ad-dc008d25ff88.c000.snappy.parquet,part-00000-249ddcc9-619a-4fa6-a0ad-dc008d25ff88.c000.snappy.parquet,144376119,1731223317000


In [0]:
%fs
ls dbfs:/delta/taxi/Year=2019/Month=11/

path,name,size,modificationTime
dbfs:/delta/taxi/Year=2019/Month=11/part-00000-eb52d0e4-a3f4-429f-95a9-6df04620ca68.c000.snappy.parquet,part-00000-eb52d0e4-a3f4-429f-95a9-6df04620ca68.c000.snappy.parquet,11124,1731223283000


In [0]:
#So we found some dirty data in our dataframe! we can filter it.
processedDF.filter("year=2019").count() #the SQL way!

Out[18]: 6896093

In [0]:
#from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, expr, when
from pyspark.sql.types import *
 
'''
pyspark.sql.SparkSession Main entry point for DataFrame and SQL functionality.
pyspark.sql.DataFrame A distributed collection of data grouped into named columns.
pyspark.sql.Column A column expression in a DataFrame.
pyspark.sql.Row A row of data in a DataFrame.
pyspark.sql.GroupedData Aggregation methods, returned by DataFrame.groupBy().
pyspark.sql.DataFrameNaFunctions Methods for handling missing data (null values).
pyspark.sql.DataFrameStatFunctions Methods for statistics functionality.
pyspark.sql.functions List of built-in functions available for DataFrame.
pyspark.sql.types List of data types available.
pyspark.sql.Window For working with window functions.
'''
 
 
processedDF.filter((col('Year')==2019) & (col('Month')==12)).count() #Dataframe way
#processedDF.filter($year===2019).count()
#processedDF.filter(df(year)==2019).count()

Out[19]: 6895933

In [0]:
processedDF.filter((col('Year')!=2019) & (col('Month')!=12)).count() #Dataframe way


Out[20]: 188

In [0]:
processedDF.filter("Year <> 2019 and Month <> 12").count() #The SQL Way

Out[21]: 188

In [0]:
%sql
use taxidata;
show tables;

database,tableName,isTemporary
taxidata,taxi_2019_12,False


In [0]:
%fs
rm -r /delta/taxiclean

In [0]:
%python
#processedDF.filter("Year <> 2019 and Month <> 12").partitionBy('Year','Month').saveAsTable()
processedDF.filter("Year = 2019 and Month = 12").write.format('delta').mode('overwrite').partitionBy('Year','Month').save("/delta/taxiclean")


In [0]:
%fs
ls dbfs:/delta/taxiclean/

path,name,size,modificationTime
dbfs:/delta/taxiclean/Year=2019/,Year=2019/,0,0
dbfs:/delta/taxiclean/_delta_log/,_delta_log/,0,0


In [0]:
rawDF2 = spark.read.format('csv').options(header=True).schema(nyc_schema).load("dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-11.csv.gz") #Lazy Execution


In [0]:
rawDF2.createOrReplaceTempView("taxi_2019_11_tmp")


In [0]:
%sql
show tables

database,tableName,isTemporary
taxidata,taxi_2019_12,False
,taxi_2019_11_tmp,True


In [0]:
%python
rawDF2.take(1)

Out[27]: [Row(Vendor='1', Pickup_DateTime=datetime.datetime(2019, 11, 1, 0, 30, 41), Dropoff_DateTime=datetime.datetime(2019, 11, 1, 0, 32, 25), Passenger_Count=1, Trip_Distance=0.0, Pickup_Longitude=1.0, Pickup_Latitude=None, Rate_Code='145', Store_And_Forward='145', Dropoff_Longitude=2.0, Dropoff_Latitude=3.0, Payment_Type='0.5', Fare_Amount=0.5, Surcharge=0.0, MTA_Tax=0.0, Tip_Amount=0.3, Tolls_Amount=4.3, Total_Amount=0.0)]

In [0]:
%sql
drop table if exists taxidata.taxi;
create TABLE taxidata.taxi as select * from taxidata.taxi_2019_12 limit 1;


num_affected_rows,num_inserted_rows


In [0]:
%sql
select count (*) from taxidata.taxi;


count(1)
1


In [0]:
%sql
select * from taxidata.taxi;

Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude,Dropoff_Latitude,Payment_Type,Fare_Amount,Surcharge,MTA_Tax,Tip_Amount,Tolls_Amount,Total_Amount
1,2019-12-01T00:26:58.000+0000,2019-12-01T00:41:45.000+0000,1,4.2,1.0,,142,116,2.0,14.5,3,0.5,0.0,0.0,0.3,18.3,2.5


In [0]:
%sql
UPDATE taxidata.taxi set vendor=0 where vendor =1;

num_affected_rows
1


In [0]:
%sql
select * from taxidata.taxi;

Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude,Dropoff_Latitude,Payment_Type,Fare_Amount,Surcharge,MTA_Tax,Tip_Amount,Tolls_Amount,Total_Amount
0,2019-12-01T00:26:58.000+0000,2019-12-01T00:41:45.000+0000,1,4.2,1.0,,142,116,2.0,14.5,3,0.5,0.0,0.0,0.3,18.3,2.5


In [0]:
%sql
UPDATE taxi_2019_11_tmp set vendor=0 where vendor =1; -- Not working. Only DELTA tables can be updated. Need to create derivate dataframes or other temp tables and persist them as delta

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1216632588547722>:7[0m
[1;32m      5[0m     display(df)
[1;32m      6[0m     [38;5;28;01mreturn[39;00m df
[0;32m----> 7[0m   _sqldf [38;5;241m=[39m [43m____databricks_percent_sql[49m[43m([49m[43m)[49m
[1;32m      8[0m [38;5;28;01mfinally[39;00m:
[1;32m      9[0m   [38;5;28;01mdel[39;00m ____databricks_percent_sql

File [0;32m<command-1216632588547722>:4[0m, in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m [38;5;28;01mdef[39;00m [38;5;21m____databricks_percent_sql[39m():
[1;32m      3[0m   [38;5;28;01mimport[39;00m [38;5;21;01mbase64[39;00m
[0;32m----> 4[0m   df [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43msql[49m[43m([49m[43mbase64[49m[38;5;241;43m.[39;49m[43mstandard_b64decode[49m[43m([49m[38;5;124;43m"[39;