In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import to_date, year, month, dayofmonth, dayofweek, hour
from pyspark.sql.functions import round, count, mean, min, max, stddev, countDistinct, col, when, datediff, desc, avg
from pyspark.sql import Window
import math

In [None]:
spark = SparkSession.builder \
    .appName("DivvyBikes_Preprocessing") \
    .master("local[4]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64m") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .config("spark.sql.autoBroadcastJoinThreshold", "50m") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "256m") \
    .config("spark.kryoserializer.buffer", "64m") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:+UseCompressedOops -XX:+UseStringDeduplication") \
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:+UseCompressedOops") \
    .config("spark.network.timeout", "800s") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.locality.wait", "0") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .config("spark.sql.orc.compression.codec", "snappy") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .config("spark.hadoop.parquet.enable.summary-metadata", "false") \
    .config("spark.sql.parquet.mergeSchema", "false") \
    .config("spark.sql.parquet.filterPushdown", "true") \
    .config("spark.sql.hive.convertMetastoreParquet", "false") \
    .getOrCreate()

In [None]:
schema = StructType([
    StructField("ride_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_name", StringType(), True),
    StructField("start_station_id", StringType(), True),
    StructField("end_station_name", StringType(), True),
    StructField("end_station_id", StringType(), True),
    StructField("start_lat", DoubleType(), True),
    StructField("start_lng", DoubleType(), True),
    StructField("end_lat", DoubleType(), True),
    StructField("end_lng", DoubleType(), True),
    StructField("member_casual", StringType(), True)
])

# Загружаем данные
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
    .option("encoding", "UTF-8") \
    .option("mode", "DROPMALFORMED") \
    .csv("output.csv")
df_with_features = df.withColumn("ride_date", to_date(col("started_at"))) \
    .withColumn("ride_year", year(col("started_at"))) \
    .withColumn("ride_month", month(col("started_at"))) \
    .withColumn("ride_day", dayofmonth(col("started_at"))) \
    .withColumn("ride_dayofweek", dayofweek(col("started_at"))) \
    .withColumn("ride_hour", hour(col("started_at"))) \
    .withColumn("ride_duration_seconds", 
               (col("ended_at").cast("long") - col("started_at").cast("long"))) \
    .withColumn("ride_duration_minutes", 
               (col("ended_at").cast("long") - col("started_at").cast("long")) / 60.0)

# Функция для расчета расстояния (формула Гаверсинуса)
from pyspark.sql.functions import radians, sin, cos, atan2, sqrt, asin, lit

# Используем встроенные функции Spark вместо Python UDF
def haversine_distance_spark(lat1, lon1, lat2, lon2):
    """Расчет расстояния с использованием функций Spark"""
    R = 6371  # Радиус Земли в км
    
    lat1_rad = radians(lat1)
    lon1_rad = radians(lon1)
    lat2_rad = radians(lat2)
    lon2_rad = radians(lon2)
    
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad
    
    a = sin(dlat/2) ** 2 + cos(lat1_rad) * cos(lat2_rad) * sin(dlon/2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    
    return R * c

# Используем функцию напрямую без UDF
df_with_geo_features = df_with_features \
    .withColumn("distance_km", haversine_distance_spark(
        col("start_lat"), col("start_lng"), col("end_lat"), col("end_lng")))\
    .withColumn("lat_grid", round(col("start_lat"), 2)) \
    .withColumn("lng_grid", round(col("start_lng"), 2)) \
    .withColumn("is_weekend", 
                when((col("ride_dayofweek") == 1) | 
                     (col("ride_dayofweek") == 7), True).otherwise(False)) \
    .withColumn("lat_diff", abs(col("end_lat") - col("start_lat"))) \
    .withColumn("lng_diff", abs(col("end_lng") - col("start_lng"))) \
    .withColumn("coord_diff", col("lat_diff") + col("lng_diff")) \
    .withColumn("same_station", 
                when(col("start_station_name") == col("end_station_name"), True).otherwise(False))


In [None]:
# Пробуем упрощенный подход без проверки скорости
filtered_simple = df_with_geo_features.filter(
    # 1. Базовая валидация
    (col("ride_id").isNotNull()) &
    (col("started_at").isNotNull()) &
    (col("ended_at").isNotNull()) &
    (col("member_casual").isNotNull()) &
    (col("rideable_type").isNotNull()) &
    
    # 2. Даты
    (col("ended_at") > col("started_at")) &
    
    # 3. Длительность
    (col("ride_duration_minutes") >= 0) &
    (col("ride_duration_minutes") <= 1440) &
    
    # 4. Координаты
    (col("start_lat").between(41.6, 42.1)) &
    (col("start_lng").between(-87.95, -87.5)) &
    (col("end_lat").between(41.6, 42.1)) &
    (col("end_lng").between(-87.95, -87.5)) &
    
    # 5. Не нулевые координаты
    (col("start_lat") != 0) &
    (col("start_lng") != 0) &
    
    # 6. Не тестовые станции
    (~lower(col("start_station_name")).contains("test")) &
    (~lower(col("start_station_name")).contains("hubbard")) &
    (~lower(col("start_station_name")).contains("watson"))
)

# Удаляем дубликаты
filtered_simple = filtered_simple.dropDuplicates(["ride_id"])

# Используем этот DataFrame для дальнейшей работы
filtered_df = filtered_simple

In [None]:
filtered_df = filtered_df.withColumn(
    "start_geo_hash", 
    concat(round(col("start_lat"), 3).cast("string"), lit("_"), round(col("start_lng"), 3).cast("string"))
).withColumn(
    "end_geo_hash", 
    concat(round(col("end_lat"), 3).cast("string"), lit("_"), round(col("end_lng"), 3).cast("string"))
)

In [None]:
# Функция для заполнения пропущенных станций
def fill_missing_stations(df, station_col, geo_hash_col):
    """Заполняет пропущенные названия станций наиболее частыми в той же геозоне"""
    
    # Создаем оконную функцию для нахождения наиболее частой станции в геозоне
    window_spec = Window.partitionBy(geo_hash_col).orderBy(desc("count"))
    
    # Создаем DataFrame с наиболее частыми станциями
    common_stations = df.filter(col(station_col).isNotNull()) \
        .groupBy(geo_hash_col, station_col) \
        .agg(count("*").alias("count")) \
        .withColumn("rank", row_number().over(window_spec)) \
        .filter(col("rank") == 1) \
        .select(geo_hash_col, col(station_col).alias(f"common_{station_col}"))
    
    # Присоединяем наиболее частые станции
    df_filled = df.join(common_stations, on=geo_hash_col, how="left") \
        .withColumn(f"{station_col}_clean", 
                   coalesce(col(station_col), col(f"common_{station_col}"))) \
        .drop(f"common_{station_col}")
    
    return df_filled

# Заполняем пропущенные стартовые станции
filtered_df = fill_missing_stations(filtered_df, "start_station_name", "start_geo_hash")

# Заполняем пропущенные конечные станции
filtered_df = fill_missing_stations(filtered_df, "end_station_name", "end_geo_hash")

# Заменяем оригинальные колонки очищенными
filtered_df = filtered_df \
    .drop("start_station_name", "end_station_name") \
    .withColumnRenamed("start_station_name_clean", "start_station_name") \
    .withColumnRenamed("end_station_name_clean", "end_station_name") \
    .drop("start_geo_hash", "end_geo_hash")


In [None]:
# 1. Удаляем поездки с экстремально малым расстоянием при большой длительности
filtered_df = filtered_df.filter(
    ~((col("distance_km") < 0.01) & (col("ride_duration_minutes") > 40))
)

# 2. Фильтр для поездок на ту же станцию с аномальной длительностью
filtered_df = filtered_df.filter(
    ~(
        (col("start_station_name") == col("end_station_name")) &
        (col("ride_duration_minutes") > 1440)
    )
)


# 4. Удаляем поездки с нереальной скоростью
filtered_df = filtered_df.withColumn(
    "speed_kmh",
    when(col("ride_duration_minutes") > 0,
         col("distance_km") / (col("ride_duration_minutes") / 60.0)).otherwise(0)
)

filtered_df = filtered_df.filter(
    (col("speed_kmh") <= 45) | (col("speed_kmh").isNull())
)


In [None]:
cleaned_df_with_features = filtered_df \
    .withColumn("lat_grid", round(col("start_lat"), 2)) \
    .withColumn("lng_grid", round(col("start_lng"), 2)) \
    .withColumn("is_weekend", 
                when((col("ride_dayofweek") == 1) | 
                     (col("ride_dayofweek") == 7), True).otherwise(False)) \
    .withColumn("lat_diff", abs(col("end_lat") - col("start_lat"))) \
    .withColumn("lng_diff", abs(col("end_lng") - col("start_lng"))) \
    .withColumn("coord_diff", col("lat_diff") + col("lng_diff")) \
    .withColumn("speed_kmh", 
                when(col("ride_duration_minutes") > 0, 
                     col("distance_km") / (col("ride_duration_minutes") / 60.0)).otherwise(0))

In [None]:
# 1. Общая статистика
print("\n1. ОБЩАЯ СТАТИСТИКА:")
cleaned_df_with_features.select(
    count("*").alias("Всего_записей"),
    round(mean("ride_duration_minutes"), 2).alias("Ср_длит_мин"),
    round(min("ride_duration_minutes"), 2).alias("Мин_длит"),
    round(max("ride_duration_minutes"), 2).alias("Макс_длит"),
    round(mean("distance_km"), 3).alias("Ср_расст_км"),
    round(max("distance_km"), 3).alias("Макс_расст_км")
).show()

In [None]:
print("\n2. СТАТИСТИКА ПО ПОЛЬЗОВАТЕЛЯМ (member_casual):")
cleaned_df_with_features.groupBy("member_casual").agg(
    count("*").alias("Поездок"),
    round(count("*") * 100.0 / cleaned_df_with_features.count(), 1).alias("Доля_%"),
    round(mean("ride_duration_minutes"), 1).alias("Ср_длит_мин"),
    round(mean("distance_km"), 2).alias("Ср_расст_км")
).orderBy(desc("Поездок")).show()

# 3. Статистика по типам велосипедов
print("\n3. СТАТИСТИКА ПО ТИПАМ ВЕЛОСИПЕДОВ (rideable_type):")
cleaned_df_with_features.groupBy("rideable_type").agg(
    count("*").alias("Поездок"),
    round(count("*") * 100.0 / cleaned_df_with_features.count(), 1).alias("Доля_%"),
    round(mean("ride_duration_minutes"), 1).alias("Ср_длит_мин"),
    round(mean("distance_km"), 2).alias("Ср_расст_км")
).orderBy(desc("Поездок")).show()

In [None]:
print("\n4. СТАТИСТИКА ПО ДНЯМ НЕДЕЛИ:")
cleaned_df_with_features.groupBy("ride_dayofweek").agg(
    count("*").alias("Поездок"),
    round(count("*") * 100.0 / cleaned_df_with_features.count(), 1).alias("Доля_%"),
    round(mean("ride_duration_minutes"), 1).alias("Ср_длит_мин"),
    round(mean("distance_km"), 2).alias("Ср_расст_км")
).orderBy("ride_dayofweek").show(7)

# 5. Статистика по часам
print("\n5. СТАТИСТИКА ПО ЧАСАМ СУТОК:")
cleaned_df_with_features.groupBy("ride_hour").agg(
    count("*").alias("Поездок"),
    round(count("*") * 100.0 / cleaned_df_with_features.count(), 1).alias("Доля_%"),
    round(mean("ride_duration_minutes"), 1).alias("Ср_длит_мин")
).orderBy("ride_hour").show(24)

In [None]:
# 6. Описательная статистика
print("\n6. ОПИСАТЕЛЬНАЯ СТАТИСТИКА (describe):")
cleaned_df_with_features.select(
    "ride_duration_minutes", "distance_km", "speed_kmh"
).describe().show()

# 7. Квантили распределения
print("\n7. КВАНТИЛИ РАСПРЕДЕЛЕНИЯ ДЛИТЕЛЬНОСТИ:")
quantiles = cleaned_df_with_features.approxQuantile(
    "ride_duration_minutes", 
    [0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99], 
    0.01
)

# Создаем DataFrame для квантилей
from pyspark.sql import Row
quantile_data = [(f"{p*100:.0f}%", v) for p, v in zip(
    [0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99], 
    quantiles
)]

quantile_df = spark.createDataFrame(
    [Row(Квантиль=k, Длительность_мин=v) for k, v in quantile_data]
)
quantile_df.show()

In [None]:
# 8. Географическая статистика
print("\n8. ГЕОГРАФИЧЕСКАЯ СТАТИСТИКА:")
cleaned_df_with_features.select(
    round(min("start_lat"), 4).alias("Мин_широта"),
    round(max("start_lat"), 4).alias("Макс_широта"),
    round(mean("start_lat"), 4).alias("Ср_широта"),
    round(min("start_lng"), 4).alias("Мин_долгота"),
    round(max("start_lng"), 4).alias("Макс_долгота"),
    round(mean("start_lng"), 4).alias("Ср_долгота"),
    countDistinct("start_station_name").alias("Уник_старт_станций"),
    countDistinct("end_station_name").alias("Уник_конеч_станций")
).show()

In [None]:
# 9. Статистика по скорости
print("\n9. СТАТИСТИКА ПО СКОРОСТИ:")
cleaned_df_with_features.select(
    round(mean("speed_kmh"), 1).alias("Ср_скорость_кмч"),
    round(stddev("speed_kmh"), 1).alias("Стд_отклонение"),
    round(min("speed_kmh"), 1).alias("Мин_скорость"),
    round(max("speed_kmh"), 1).alias("Макс_скорость"),
    count(when(col("speed_kmh") > 25, True)).alias("Поездок_25кмч"),
    count(when(col("speed_kmh") > 30, True)).alias("Поездок_30кмч")
).show()

# 10. Статистика по возвратам на станцию
print("\n10. СТАТИСТИКА ПО ВОЗВРАТАМ:")
cleaned_df_with_features.select(
    round(mean(when(col("same_station") == True, 1).otherwise(0)) * 100, 1).alias("Процент_возвратов_%"),
    round(mean(when(col("same_station") == True, col("ride_duration_minutes"))), 1).alias("Ср_длит_возврат_мин"),
    round(mean(when(col("same_station") == False, col("ride_duration_minutes"))), 1).alias("Ср_длит_перемещение_мин"),
    round(mean(when(col("same_station") == True, col("distance_km"))), 2).alias("Ср_расст_возврат_км"),
    round(mean(when(col("same_station") == False, col("distance_km"))), 2).alias("Ср_расст_перемещение_км")
).show()

In [None]:
# 11. Временной диапазон
print("\n11. ВРЕМЕННОЙ ДИАПАЗОН ДАННЫХ:")
cleaned_df_with_features.select(
    min("started_at").alias("Первая_поездка"),
    max("started_at").alias("Последняя_поездка"),
    datediff(max("started_at"), min("started_at")).alias("Дней_в_данных"),
    countDistinct("ride_date").alias("Уник_дней")
).show()

# 12. Топ-10 популярных стартовых станций
print("\n12. ТОП-10 ПОПУЛЯРНЫХ СТАРТОВЫХ СТАНЦИЙ:")
cleaned_df_with_features.groupBy("start_station_name").agg(
    count("*").alias("Поездок"),
    round(mean("ride_duration_minutes"), 1).alias("Ср_длит_мин"),
    round(mean("distance_km"), 2).alias("Ср_расст_км")
).orderBy(desc("Поездок")).limit(10).show()

In [None]:
cleaned_df_with_features.printSchema()

In [None]:
spark.stop()