In [None]:
import os


In [None]:
os.environ['PYARROW_IGNORE_TIMEZONE'] = '1'

In [None]:
import pyspark
import pyspark.pandas as ps
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [None]:
# Read the partitioned data
data = spark.read.parquet('data/taxi_partitioned')

In [None]:
data.show(2)

In [None]:
data.groupBy('member_casual').count().show()

In [None]:
data.printSchema()

In [None]:
data_ps = ps.DataFrame(data)


In [None]:
data_ps["rideable_type"].unique()

In [None]:
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import year, month, date_format, to_timestamp, dayofweek, hour
import pyspark.sql.functions as F
from pyspark.sql.functions import col, sum, desc, mean, stddev, min, max, avg, when, count
from pyspark.sql.functions import col, radians, sin, cos, atan2, sqrt

In [None]:
# Define the Imputer
imputer = Imputer(inputCols=['end_lat', 'end_lng'], outputCols=['end_lat_imputed', 'end_lng_imputed'])

In [None]:
# Fit the Imputer model
imputer_model = imputer.fit(data)

In [None]:
# Transform the data
data = imputer_model.transform(data)

In [None]:
data = data.drop('end_lat', 'end_lng')

In [None]:
data.show(2)

In [None]:
# Count nulls in each column to confirm the result
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])
null_counts.show()

In [None]:
# Count the rows where any of the latitude or longitude values are 0
count_invalid_rows = data.filter(
    (col("start_lat") == 0) | 
    (col("start_lng") == 0) | 
    (col("end_lat") == 0) | 
    (col("end_lng") == 0)
).count()
print(f"Number of rows with invalid latitude or longitude: {count_invalid_rows}")

In [None]:
data = data.filter(
    (col("start_lat") != 0) & (col("start_lng") != 0) & (col("end_lat") != 0) & (col("end_lng") != 0)
)

In [None]:
# Count the rows where any of the latitude or longitude values are 0
count_invalid_rows = data.filter(
    (col("start_lat") == 0) | 
    (col("start_lng") == 0) | 
    (col("end_lat") == 0) | 
    (col("end_lng") == 0)
).count()
print(f"Number of rows with invalid latitude or longitude: {count_invalid_rows}")

In [None]:
data = data.withColumn(
    "ride_duration",
    (unix_timestamp("ended_at") - unix_timestamp("started_at")) / 60  # Duration in minutes
)

In [None]:
data.show(2)

In [None]:
data = data.withColumn(
    "time_of_day",
    hour(col("started_at")).alias("hour")
)

In [None]:
data.show(2)

In [None]:
trips_per_month_year_member = data.groupBy("year", "month", "member_casual").count()
trips_per_month_year_member.orderBy("count", ascending=False).show(10)

In [None]:
# Convert 'started_at' to timestamp if not already
data = data.withColumn("started_at", col("started_at").cast("timestamp"))

# Create the 'day_of_week' column (1 = Sunday, 7 = Saturday in Spark)
data = data.withColumn("day_of_week", dayofweek(col("started_at")))

data = data.withColumn("day_of_week",
                                       when(col("day_of_week") == 1, "Sunday")
                                       .when(col("day_of_week") == 2, "Monday")
                                       .when(col("day_of_week") == 3, "Tuesday")
                                       .when(col("day_of_week") == 4, "Wednesday")
                                       .when(col("day_of_week") == 5, "Thursday")
                                       .when(col("day_of_week") == 6, "Friday")
                                       .when(col("day_of_week") == 7, "Saturday"))

In [None]:
data.show(2)

In [None]:
# Count nulls in each column to confirm the result
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])
null_counts.show()

In [None]:
# Drop rows where 'start_station_name' or 'end_station_id' have null values
data = data.dropna(subset=["start_station_name", "end_station_id" , "start_station_id"])


In [None]:
# Count nulls in each column to confirm the result
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])
null_counts.show()

In [None]:
data.count()

In [None]:
data.printSchema()

In [None]:
data1 = data

In [None]:
data_repartitioned = data1.repartition("year", "month")

In [None]:
# Check the number of partitions
num_partitions = data_repartitioned.rdd.getNumPartitions()
print(f"Number of partitions: {num_partitions}")

In [None]:
# Save as Parquet with compression (Snappy) and partition by 'year' and 'month'
data_repartitioned.write.option("compression", "snappy") \
    .partitionBy("year", "month") \
    .parquet("eda_final.parquet", mode="overwrite")


In [None]:
data = spark.read.parquet('eda_final.parquet')

In [None]:
data.show(2)

In [None]:
# Count nulls in each column to confirm the result
null_counts = data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns])
null_counts.show()