In [56]:
import tempfile
from pyspark.sql import SparkSession
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.functions import cos
from pyspark.sql.functions import mean, stddev, mode

In [2]:
# Create SparkSession
spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()

# Read CSV File
df = spark.read.option("header", True).csv("archive/daily_rent_detail.csv")

df.printSchema()

root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: string (nullable = true)
 |-- ended_at: string (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_id: string (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_id: string (nullable = true)
 |-- start_lat: string (nullable = true)
 |-- start_lng: string (nullable = true)
 |-- end_lat: string (nullable = true)
 |-- end_lng: string (nullable = true)
 |-- member_casual: string (nullable = true)



In [3]:
df.show()
df.count()

+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+------------------+------------------+-------------+
|         ride_id|rideable_type|         started_at|           ended_at|  start_station_name|start_station_id|    end_station_name|end_station_id|start_lat| start_lng|           end_lat|           end_lng|member_casual|
+----------------+-------------+-------------------+-------------------+--------------------+----------------+--------------------+--------------+---------+----------+------------------+------------------+-------------+
|946D42AD89539210|  docked_bike|2020-05-30 17:25:29|2020-05-31 18:25:22|   Anacostia Library|           31804|      11th & H St NE|       31614.0|38.865784|  -76.9784|         38.899983|        -76.991383|       casual|
|CC46FAAB662B8613|  docked_bike|2020-05-09 14:42:04|2020-05-09 15:06:33|      10th & E St NW|           31256|21st St & 

16086672

In [4]:
df.select('member_casual').distinct().collect()

[Row(member_casual='casual'), Row(member_casual='member')]

In [5]:
df.select('rideable_type').distinct().collect()

[Row(rideable_type='docked_bike'),
 Row(rideable_type='electric_bike'),
 Row(rideable_type='classic_bike')]

In [6]:
dftyped = df.withColumn("started_at", df["started_at"].cast("timestamp"))
dftyped = dftyped.withColumn("ended_at", df["ended_at"].cast("timestamp"))
dftyped = dftyped.withColumn("start_station_id", df["start_station_id"].cast("int"))
dftyped = dftyped.withColumn("end_station_id", df["end_station_id"].cast("int"))
dftyped = dftyped.withColumn("start_lat", df["start_lat"].cast("double"))
dftyped = dftyped.withColumn("start_lng", df["start_lng"].cast("double"))
dftyped = dftyped.withColumn("end_lat", df["end_lat"].cast("double"))
dftyped = dftyped.withColumn("end_lng", df["end_lng"].cast("double"))

dftyped = dftyped.withColumn("is_member", when(df["member_casual"] == "member", True).otherwise(False))

dftyped = dftyped.drop("member_casual")
dftyped = dftyped.drop("start_station_name")
dftyped = dftyped.drop("end_station_name")

dftyped.printSchema()

root
 |-- ride_id: string (nullable = true)
 |-- rideable_type: string (nullable = true)
 |-- started_at: timestamp (nullable = true)
 |-- ended_at: timestamp (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- start_lat: double (nullable = true)
 |-- start_lng: double (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- end_lng: double (nullable = true)
 |-- is_member: boolean (nullable = false)



In [7]:
dftyped.show()

+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+------------------+------------------+---------+
|         ride_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|start_lat| start_lng|           end_lat|           end_lng|is_member|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+------------------+------------------+---------+
|946D42AD89539210|  docked_bike|2020-05-30 17:25:29|2020-05-31 18:25:22|           31804|         31614|38.865784|  -76.9784|         38.899983|        -76.991383|    false|
|CC46FAAB662B8613|  docked_bike|2020-05-09 14:42:04|2020-05-09 15:06:33|           31256|         31261|38.895914|-77.026064|         38.892459|        -77.046567|     true|
|72F00B2FB833D6ED|  docked_bike|2020-05-24 17:27:19|2020-05-24 17:43:51|           31305|         31268|38.934267|-77.057979|     

In [None]:
for c in dftyped.columns:
    print(f"{c}: {dftyped.filter(dftyped[c].isNull()).count()}")
    

ride_id: 0
rideable_type: 0
started_at: 0
ended_at: 0
start_station_id: 1450152
end_station_id: 1558919
start_lat: 10
start_lng: 10
end_lat: 25929
end_lng: 25929
is_member: 0


In [48]:
df_daily = dftyped.withColumn("time", (col("ended_at") - col("started_at")).cast("int") / 3600)
df_daily = df_daily.filter(df_daily["time"] > 0).\
    filter(df["start_station_id"].isNotNull()).\
    filter(df["start_lat"].isNotNull()).\
    filter(df["end_lat"].isNotNull())

df_daily = df_daily.withColumn("distance", (((col("end_lat") - col("start_lat")) * 110.574) ** 2 + ((col("end_lng") - col("start_lng")) * 111.320 * cos(col("start_lat"))) ** 2) ** 0.5)
df_daily = df_daily.withColumn("min_speed", when(col("time") > 0, col("distance") / col("time")).otherwise(0))

df_daily.show()

+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+------------------+------------------+---------+--------------------+------------------+-------------------+
|         ride_id|rideable_type|         started_at|           ended_at|start_station_id|end_station_id|start_lat| start_lng|           end_lat|           end_lng|is_member|                time|          distance|          min_speed|
+----------------+-------------+-------------------+-------------------+----------------+--------------+---------+----------+------------------+------------------+---------+--------------------+------------------+-------------------+
|946D42AD89539210|  docked_bike|2020-05-30 17:25:29|2020-05-31 18:25:22|           31804|         31614|38.865784|  -76.9784|         38.899983|        -76.991383|    false|  24.998055555555556|  3.82398468438306|0.15297128514194455|
|CC46FAAB662B8613|  docked_bike|2020-05-09 14:42:04|2020-05-09 1

In [49]:
num = ["start_lat", "start_lng", "end_lat", "end_lng", "time", "distance", "min_speed"]

df_daily.select(num).describe().show()

+-------+--------------------+--------------------+--------------------+-------------------+--------------------+------------------+------------------+
|summary|           start_lat|           start_lng|             end_lat|            end_lng|                time|          distance|         min_speed|
+-------+--------------------+--------------------+--------------------+-------------------+--------------------+------------------+------------------+
|  count|            14601274|            14601274|            14601274|           14601274|            14601274|          14601274|          14601274|
|   mean|   38.90289793067572|  -77.03250917569827|   38.90188333690376| -77.03205067056088|   0.381198270914204|1.4155378893442456|   6.9891893628246|
| stddev|0.026761264920492394|0.033828457299891763|0.048086112710553346|0.08522995373118246|   9.158297787226783| 5.539258060732394|17.626923618874713|
|    min|  38.765070183333336|  -77.38259231666666|                 0.0|             -77

In [50]:
def z_score_outlier_treatment(df, columns, threshold=3.0):
    """
    Detects and removes outliers from a PySpark DataFrame using the z-score method.

    :param df: PySpark DataFrame
    :param columns: list of column names for which outliers should be removed
    :param threshold: z-score threshold to determine outliers (default is 3.0)
    :return: PySpark DataFrame with outliers removed
    """
    for column in columns:
        # Calculate mean and standard deviation
        stats = df.select(mean(col(column)).alias('mean'), stddev(col(column)).alias('stddev')).collect()[0]

        # Calculate z-scores and filter outliers
        df = df.withColumn(f'{column}_z_score', (col(column) - stats['mean']) / stats['stddev']) \
               .filter(f'abs({column}_z_score) <= {threshold}') \
               .drop(f'{column}_z_score')

    return df


In [51]:
df_cleared = z_score_outlier_treatment(df_daily, num)

In [52]:
df_cleared.select(num).describe().show()

+-------+-------------------+-------------------+--------------------+--------------------+--------------------+------------------+-----------------+
|summary|          start_lat|          start_lng|             end_lat|             end_lng|                time|          distance|        min_speed|
+-------+-------------------+-------------------+--------------------+--------------------+--------------------+------------------+-----------------+
|  count|           13647485|           13647485|            13647485|            13647485|            13647485|          13647485|         13647485|
|   mean| 38.903228852436385| -77.02937463614234|  38.902423915079815|  -77.02897301162663| 0.31185262699806304|1.3156415095007494|6.715429807746666|
| stddev|0.01924249843778637|0.02459502819571631|0.019262730970838318|0.024030800119054178|  0.7201401801379906|0.9671522189586372|4.115256965840871|
|    min|       38.822614312|       -77.12808466|           38.782633|          -77.107766|2.7777777

In [57]:
df_cleared.agg(*[mode(c).alias(c) for c in num]).show()

+---------+----------+--------+---------+-------------------+--------+---------+
|start_lat| start_lng| end_lat|  end_lng|               time|distance|min_speed|
+---------+----------+--------+---------+-------------------+--------+---------+
|38.915544|-77.038252|38.89696|-77.00493|0.11472222222222223|     0.0|      0.0|
+---------+----------+--------+---------+-------------------+--------+---------+

