In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NYC taxi 2020").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
df = spark.read.csv("2020_taxi_trips.csv", header=True, inferSchema=True)

In [6]:
df.show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+---------+--------------------+
|       2| 2020-01-01 06:47:28|  2020-01-01 06:52:54|                 N|         1|          74|          75|              1|         1.47|        6.5|  0.0|    0.5|       0.0|         0.0|    

In [20]:
df.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge',
 'pickup_date',
 'pickup_time',
 'dropoff_date',
 'dropoff_time']

In [12]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import to_date, date_format


In [30]:
filtered_df = df.filter(df.total_amount >= 0)
filtered_df = filtered_df.filter(filtered_df.trip_distance > 0)

filtered_df = filtered_df.filter(
    ~filtered_df.PULocationID.isin([264, 265]) &
    ~filtered_df.DOLocationID.isin([264, 265]))

In [None]:
filtered_df = filtered_df.dropna()

In [31]:
# Create new columns for dropoff date and dropoff time

df = df.withColumn("dropoff_date", to_date(df.lpep_dropoff_datetime)).withColumn("dropoff_time", date_format(df.lpep_dropoff_datetime, "HH:mm:ss"))
df_ = df.withColumn("pickup_date", to_date(df.lpep_pickup_datetime)).withColumn("pickup_time", date_format(df.lpep_pickup_datetime, "HH:mm:ss"))

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
#create column for index
# Create an index column
df = df.withColumn("index", monotonically_increasing_id())

In [33]:
# Specify the columns to drop
columns_to_drop = ['VendorID',"lpep_pickup_datetime","lpep_dropoff_datetime" ,'store_and_fwd_flag', 'RatecodeID', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'payment_type', 'trip_type', 'congestion_surcharge']

# Drop the specified columns from the DataFrame
filtered_df = df.drop(*columns_to_drop)

# Display the updated DataFrame (optional)
filtered_df.show()

+------------+------------+---------------+-------------+------------+-----------+-----------+------------+------------+-----+
|PULocationID|DOLocationID|passenger_count|trip_distance|total_amount|pickup_date|pickup_time|dropoff_date|dropoff_time|index|
+------------+------------+---------------+-------------+------------+-----------+-----------+------------+------------+-----+
|          74|          75|              1|         1.47|         7.3| 2020-01-01|   06:47:28|  2020-01-01|    06:52:54|    0|
|          74|          75|              1|         1.49|         7.3| 2020-01-01|   13:25:34|  2020-01-01|    13:30:43|    1|
|          74|          75|              1|         1.31|         7.3| 2020-01-01|   14:20:35|  2020-01-01|    14:26:25|    2|
|          74|          75|              1|         1.43|         7.3| 2020-01-02|   06:56:47|  2020-01-02|    07:03:03|    3|
|          74|          75|              1|          1.1|         7.3| 2020-01-02|   09:34:46|  2020-01-02|    

In [48]:
# Specify the output path for the CSV file (directory only)

output_path = "file:////home/talentum/output/filter_df"  # No .csv at the end


# Write the filtered DataFrame to CSV

filtered_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)